Coverage for pySDC/projects/Resilience/sweepers.py: 91%
103 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
1import numpy as np
2from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
3from pySDC.implementations.sweeper_classes.imex_1st_order import imex_1st_order
4from pySDC.core.errors import ParameterError
7class efficient_sweeper:
8 """
9 Replace the predict function by something that does not excessively evaluate superfluous right hand sides.
10 """
12 def predict(self):
13 """
14 Predictor to fill values at nodes before first sweep. Skip evaluation of RHS on initial conditions.
16 Default prediction for the sweepers, only copies the values to all collocation nodes
17 and evaluates the RHS of the ODE there
18 """
20 # get current level and problem description
21 L = self.level
22 P = L.prob
24 for m in range(1, self.coll.num_nodes + 1):
25 # copy u[0] to all collocation nodes, evaluate RHS
26 if self.params.initial_guess == 'spread':
27 L.u[m] = P.dtype_u(L.u[0])
28 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1])
29 # start with zero everywhere
30 elif self.params.initial_guess == 'zero':
31 L.u[m] = P.dtype_u(init=P.init, val=0.0)
32 L.f[m] = P.dtype_f(init=P.init, val=0.0)
33 # start with random initial guess
34 elif self.params.initial_guess == 'random':
35 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0])
36 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0])
37 else:
38 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented')
40 # indicate that this level is now ready for sweeps
41 L.status.unlocked = True
42 L.status.updated = True
45class generic_implicit_efficient(efficient_sweeper, generic_implicit):
46 """
47 This sweeper has the same functionality of the `generic_implicit` sweeper, but saves a few operations at the expense
48 of readability.
49 """
51 def integrate(self, Q=None):
52 """
53 Integrates the right-hand side. Depending on `Q`, this may or may not be consistent with an integral
54 approximation.
56 Args:
57 Q (numpy.ndarray): Some sort of quadrature rule
59 Returns:
60 list of dtype_u: containing the integral as values
61 """
63 # get current level and problem description
64 L = self.level
65 P = L.prob
67 Q = self.coll.Qmat if Q is None else Q
69 me = []
71 # integrate RHS over all collocation nodes
72 for m in range(1, self.coll.num_nodes + 1):
73 # new instance of dtype_u, initialize values with 0
74 me.append(P.dtype_u(P.init, val=0.0))
75 for j in range(1, self.coll.num_nodes + 1):
76 me[-1] += L.dt * Q[m, j] * L.f[j]
78 return me
80 def update_nodes(self):
81 """
82 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
84 Returns:
85 None
86 """
88 # get current level and problem description
89 L = self.level
90 P = L.prob
92 # only if the level has been touched before
93 assert L.status.unlocked
95 # get number of collocation nodes for easier access
96 M = self.coll.num_nodes
98 # gather all terms which are known already (e.g. from the previous iteration)
99 # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau
101 # get QF(u^k)
102 integral = self.integrate(Q=self.coll.Qmat - self.QI)
103 for m in range(M):
104 # add initial value
105 integral[m] += L.u[0]
106 # add tau if associated
107 if L.tau[m] is not None:
108 integral[m] += L.tau[m]
110 # do the sweep
111 for m in range(0, M):
112 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
113 rhs = P.dtype_u(integral[m])
114 for j in range(1, m + 1):
115 rhs += L.dt * self.QI[m + 1, j] * L.f[j]
117 # implicit solve with prefactor stemming from the diagonal of Qd
118 L.u[m + 1] = P.solve_system(
119 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
120 )
121 # update function values
122 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
124 # indicate presence of new values at this level
125 L.status.updated = True
127 return None
129 def compute_residual(self, stage=''):
130 lvl = self.level
132 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation:
133 super().compute_residual(stage=stage)
134 return None
136 res = lvl.u[0] - lvl.u[-1]
137 for m in range(1, self.coll.num_nodes + 1):
138 res += lvl.dt * self.coll.Qmat[-1, m] * lvl.f[m]
140 if lvl.params.residual_type[-3:] == 'abs':
141 lvl.status.residual = abs(res)
142 else:
143 lvl.status.residual = abs(res) / abs(lvl.u[0])
145 lvl.status.updated = False
148class imex_1st_order_efficient(efficient_sweeper, imex_1st_order):
149 """
150 Duplicate of `imex_1st_order` sweeper which is slightly more efficient at the cost of code readability.
151 """
153 def integrate(self, Q=None, QI=None, QE=None):
154 """
155 Integrates the right-hand side (here impl + expl)
157 Args:
158 Q (numpy.ndarray): Full quadrature rule
159 QI (numpy.ndarray): Implicit preconditioner
160 QE (numpy.ndarray): Explicit preconditioner
162 Returns:
163 list of dtype_u: containing the integral as values
164 """
166 Q = self.coll.Qmat if Q is None else Q
167 QI = np.zeros_like(Q) if QI is None else QI
168 QE = np.zeros_like(Q) if QE is None else QE
170 # get current level and problem description
171 L = self.level
173 me = []
175 # integrate RHS over all collocation nodes
176 for m in range(1, self.coll.num_nodes + 1):
177 me.append(L.dt * ((Q - QI)[m, 1] * L.f[1].impl + (Q - QE)[m, 1] * L.f[1].expl))
178 # new instance of dtype_u, initialize values with 0
179 for j in range(2, self.coll.num_nodes + 1):
180 me[m - 1] += L.dt * ((Q - QI)[m, j] * L.f[j].impl + (Q - QE)[m, j] * L.f[j].expl)
182 return me
184 def update_nodes(self):
185 """
186 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
188 Returns:
189 None
190 """
192 # get current level and problem description
193 L = self.level
194 P = L.prob
196 # only if the level has been touched before
197 assert L.status.unlocked
199 # get number of collocation nodes for easier access
200 M = self.coll.num_nodes
202 # gather all terms which are known already (e.g. from the previous iteration)
203 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau
205 # get QF(u^k)
206 integral = self.integrate(Q=self.coll.Qmat, QI=self.QI, QE=self.QE)
207 for m in range(M):
208 # add initial value
209 integral[m] += L.u[0]
210 # add tau if associated
211 if L.tau[m] is not None:
212 integral[m] += L.tau[m]
214 # do the sweep
215 for m in range(0, M):
216 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
217 rhs = P.dtype_u(integral[m])
218 for j in range(1, m + 1):
219 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl)
221 # implicit solve with prefactor stemming from QI
222 L.u[m + 1] = P.solve_system(
223 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
224 )
226 # update function values
227 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
229 # indicate presence of new values at this level
230 L.status.updated = True
232 return None
234 def compute_residual(self, stage=''):
235 lvl = self.level
237 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation:
238 super().compute_residual(stage=stage)
239 return None
241 res = lvl.u[0] - lvl.u[-1]
242 for m in range(1, self.coll.num_nodes + 1):
243 res += lvl.dt * self.coll.Qmat[-1, m] * (lvl.f[m].impl + lvl.f[m].expl)
245 if lvl.params.residual_type[-3:] == 'abs':
246 lvl.status.residual = abs(res)
247 else:
248 lvl.status.residual = abs(res) / abs(lvl.u[0])
250 lvl.status.updated = False