Coverage for pySDC/implementations/sweeper_classes/imex_1st_order.py: 99%
73 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
1import numpy as np
3from pySDC.core.sweeper import Sweeper
6class imex_1st_order(Sweeper):
7 """
8 Custom sweeper class, implements Sweeper.py
10 First-order IMEX sweeper using implicit/explicit Euler as base integrator
12 Attributes:
13 QI: implicit Euler integration matrix
14 QE: explicit Euler integration matrix
15 """
17 def __init__(self, params):
18 """
19 Initialization routine for the custom sweeper
21 Args:
22 params: parameters for the sweeper
23 """
25 if 'QI' not in params:
26 params['QI'] = 'IE'
27 if 'QE' not in params:
28 params['QE'] = 'EE'
30 # call parent's initialization routine
31 super().__init__(params)
33 # IMEX integration matrices
34 self.QI = self.get_Qdelta_implicit(qd_type=self.params.QI)
35 self.QE = self.get_Qdelta_explicit(qd_type=self.params.QE)
37 def integrate(self):
38 """
39 Integrates the right-hand side (here impl + expl)
41 Returns:
42 list of dtype_u: containing the integral as values
43 """
45 L = self.level
46 P = L.prob
48 me = []
49 # integrate RHS over all collocation nodes
50 for m in range(1, self.coll.num_nodes + 1):
51 me.append(P.dtype_u(P.init, val=0.0))
52 for j in range(1, self.coll.num_nodes + 1):
53 me[m - 1] += L.dt * self.coll.Qmat[m, j] * (L.f[j].impl + L.f[j].expl)
55 return me
57 def update_nodes(self):
58 """
59 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
61 Returns:
62 None
63 """
65 # get current level and problem description
66 L = self.level
67 P = L.prob
69 # only if the level has been touched before
70 assert L.status.unlocked
72 # get number of collocation nodes for easier access
73 M = self.coll.num_nodes
75 # gather all terms which are known already (e.g. from the previous iteration)
76 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau
78 # get QF(u^k)
79 integral = self.integrate()
80 for m in range(M):
81 # subtract QIFI(u^k)_m + QEFE(u^k)_m
82 for j in range(1, M + 1):
83 integral[m] -= L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl)
84 # add initial value
85 integral[m] += L.u[0]
86 # add tau if associated
87 if L.tau[m] is not None:
88 integral[m] += L.tau[m]
90 # do the sweep
91 for m in range(0, M):
92 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
93 rhs = P.dtype_u(integral[m])
94 for j in range(1, m + 1):
95 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl)
97 # implicit solve with prefactor stemming from QI
98 L.u[m + 1] = P.solve_system(
99 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
100 )
102 # update function values
103 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
105 # indicate presence of new values at this level
106 L.status.updated = True
108 return None
110 def compute_end_point(self):
111 """
112 Compute u at the right point of the interval
114 The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False
116 Returns:
117 None
118 """
120 # get current level and problem description
121 L = self.level
122 P = L.prob
124 # check if Mth node is equal to right point and do_coll_update is false, perform a simple copy
125 if self.coll.right_is_node and not self.params.do_coll_update:
126 # a copy is sufficient
127 L.uend = P.dtype_u(L.u[-1])
128 else:
129 # start with u0 and add integral over the full interval (using coll.weights)
130 L.uend = P.dtype_u(L.u[0])
131 for m in range(self.coll.num_nodes):
132 L.uend += L.dt * self.coll.weights[m] * (L.f[m + 1].impl + L.f[m + 1].expl)
133 # add up tau correction of the full interval (last entry)
134 if L.tau[-1] is not None:
135 L.uend += L.tau[-1]
137 return None
139 def get_sweeper_mats(self):
140 """
141 Returns the three matrices Q, QI, QE which define the sweeper.
143 The first row and column, corresponding to the left starting value, are removed to correspond to the notation
144 introduced in Ruprecht & Speck, Spectral deferred corrections with fast-wave slow-wave splitting, 2016
145 """
146 QE = self.QE[1:, 1:]
147 QI = self.QI[1:, 1:]
148 Q = self.coll.Qmat[1:, 1:]
149 return QE, QI, Q
151 def get_scalar_problems_sweeper_mats(self, lambdas=None):
152 """
153 This function returns the corresponding matrices of an IMEX-SDC sweep in matrix formulation.
155 See Ruprecht & Speck, Spectral deferred corrections with fast-wave slow-wave splitting, 2016 for the derivation.
157 Args:
158 lambdas (numpy.ndarray): the first entry in lambdas is lambda_fast, the second is lambda_slow.
159 """
160 QE, QI, Q = self.get_sweeper_mats()
161 if lambdas is None:
162 pass
163 # should use lambdas from attached problem and make sure it is a scalar IMEX
164 raise NotImplementedError("At the moment, the values for lambda have to be provided")
165 else:
166 lambda_fast = lambdas[0]
167 lambda_slow = lambdas[1]
168 nnodes = self.coll.num_nodes
169 dt = self.level.dt
170 LHS = np.eye(nnodes) - dt * (lambda_fast * QI + lambda_slow * QE)
171 RHS = dt * ((lambda_fast + lambda_slow) * Q - (lambda_fast * QI + lambda_slow * QE))
172 return LHS, RHS
174 def get_scalar_problems_manysweep_mat(self, nsweeps, lambdas=None):
175 """
176 For a scalar problem, K sweeps of IMEX-SDC can be written in matrix form.
178 Args:
179 nsweeps (int): number of sweeps
180 lambdas (numpy.ndarray): the first entry in lambdas is lambda_fast, the second is lambda_slow.
181 """
182 LHS, RHS = self.get_scalar_problems_sweeper_mats(lambdas=lambdas)
183 Pinv = np.linalg.inv(LHS)
184 Mat_sweep = np.linalg.matrix_power(Pinv.dot(RHS), nsweeps)
185 for k in range(0, nsweeps):
186 Mat_sweep += np.linalg.matrix_power(Pinv.dot(RHS), k).dot(Pinv)
187 return Mat_sweep