Coverage for pySDC/projects/Resilience/sweepers.py: 92%
107 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-04 12:37 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-04 12:37 +0000
1import numpy as np
2from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
3from pySDC.implementations.sweeper_classes.imex_1st_order import imex_1st_order
4from pySDC.core.errors import ParameterError
7class efficient_sweeper:
8 """
9 Replace the predict function by something that does not excessively evaluate superfluous right hand sides.
10 """
12 def predict(self):
13 """
14 Predictor to fill values at nodes before first sweep. Skip evaluation of RHS on initial conditions.
16 Default prediction for the sweepers, only copies the values to all collocation nodes
17 and evaluates the RHS of the ODE there
18 """
20 # get current level and problem description
21 L = self.level
22 P = L.prob
24 for m in range(1, self.coll.num_nodes + 1):
25 # copy u[0] to all collocation nodes, evaluate RHS
26 if self.params.initial_guess == 'spread':
27 L.u[m] = P.dtype_u(L.u[0])
28 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1])
29 elif self.params.initial_guess == 'copy':
30 L.f[0] = P.eval_f(L.u[0], L.time)
31 L.u[m] = P.dtype_u(L.u[0])
32 L.f[m] = P.dtype_f(L.f[0])
33 # start with zero everywhere
34 elif self.params.initial_guess == 'zero':
35 L.u[m] = P.dtype_u(init=P.init, val=0.0)
36 L.f[m] = P.dtype_f(init=P.init, val=0.0)
37 # start with random initial guess
38 elif self.params.initial_guess == 'random':
39 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0])
40 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0])
41 else:
42 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented')
44 # indicate that this level is now ready for sweeps
45 L.status.unlocked = True
46 L.status.updated = True
49class generic_implicit_efficient(efficient_sweeper, generic_implicit):
50 """
51 This sweeper has the same functionality of the `generic_implicit` sweeper, but saves a few operations at the expense
52 of readability.
53 """
55 def integrate(self, Q=None):
56 """
57 Integrates the right-hand side. Depending on `Q`, this may or may not be consistent with an integral
58 approximation.
60 Args:
61 Q (numpy.ndarray): Some sort of quadrature rule
63 Returns:
64 list of dtype_u: containing the integral as values
65 """
67 # get current level and problem description
68 L = self.level
69 P = L.prob
71 Q = self.coll.Qmat if Q is None else Q
73 me = []
75 # integrate RHS over all collocation nodes
76 for m in range(1, self.coll.num_nodes + 1):
77 # new instance of dtype_u, initialize values with 0
78 me.append(P.dtype_u(P.init, val=0.0))
79 for j in range(1, self.coll.num_nodes + 1):
80 me[-1] += L.dt * Q[m, j] * L.f[j]
82 return me
84 def update_nodes(self):
85 """
86 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
88 Returns:
89 None
90 """
92 # get current level and problem description
93 L = self.level
94 P = L.prob
96 # only if the level has been touched before
97 assert L.status.unlocked
99 # get number of collocation nodes for easier access
100 M = self.coll.num_nodes
102 # gather all terms which are known already (e.g. from the previous iteration)
103 # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau
105 # get QF(u^k)
106 integral = self.integrate(Q=self.coll.Qmat - self.QI)
107 for m in range(M):
108 # add initial value
109 integral[m] += L.u[0]
110 # add tau if associated
111 if L.tau[m] is not None:
112 integral[m] += L.tau[m]
114 # do the sweep
115 for m in range(0, M):
116 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
117 rhs = P.dtype_u(integral[m])
118 for j in range(1, m + 1):
119 rhs += L.dt * self.QI[m + 1, j] * L.f[j]
121 # implicit solve with prefactor stemming from the diagonal of Qd
122 L.u[m + 1] = P.solve_system(
123 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
124 )
125 # update function values
126 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
128 # indicate presence of new values at this level
129 L.status.updated = True
131 return None
133 def compute_residual(self, stage=''):
134 lvl = self.level
136 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation:
137 super().compute_residual(stage=stage)
138 return None
140 res = lvl.u[0] - lvl.u[-1]
141 for m in range(1, self.coll.num_nodes + 1):
142 res += lvl.dt * self.coll.Qmat[-1, m] * lvl.f[m]
144 if lvl.params.residual_type[-3:] == 'abs':
145 lvl.status.residual = abs(res)
146 else:
147 lvl.status.residual = abs(res) / abs(lvl.u[0])
149 lvl.status.updated = False
152class imex_1st_order_efficient(efficient_sweeper, imex_1st_order):
153 """
154 Duplicate of `imex_1st_order` sweeper which is slightly more efficient at the cost of code readability.
155 """
157 def integrate(self, Q=None, QI=None, QE=None):
158 """
159 Integrates the right-hand side (here impl + expl)
161 Args:
162 Q (numpy.ndarray): Full quadrature rule
163 QI (numpy.ndarray): Implicit preconditioner
164 QE (numpy.ndarray): Explicit preconditioner
166 Returns:
167 list of dtype_u: containing the integral as values
168 """
170 Q = self.coll.Qmat if Q is None else Q
171 QI = np.zeros_like(Q) if QI is None else QI
172 QE = np.zeros_like(Q) if QE is None else QE
174 # get current level and problem description
175 L = self.level
177 me = []
179 # integrate RHS over all collocation nodes
180 for m in range(1, self.coll.num_nodes + 1):
181 me.append(L.dt * ((Q - QI)[m, 1] * L.f[1].impl + (Q - QE)[m, 1] * L.f[1].expl))
182 # new instance of dtype_u, initialize values with 0
183 for j in range(2, self.coll.num_nodes + 1):
184 me[m - 1] += L.dt * ((Q - QI)[m, j] * L.f[j].impl + (Q - QE)[m, j] * L.f[j].expl)
186 return me
188 def update_nodes(self):
189 """
190 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
192 Returns:
193 None
194 """
196 # get current level and problem description
197 L = self.level
198 P = L.prob
200 # only if the level has been touched before
201 assert L.status.unlocked
203 # get number of collocation nodes for easier access
204 M = self.coll.num_nodes
206 # gather all terms which are known already (e.g. from the previous iteration)
207 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau
209 # get QF(u^k)
210 integral = self.integrate(Q=self.coll.Qmat, QI=self.QI, QE=self.QE)
211 for m in range(M):
212 # add initial value
213 integral[m] += L.u[0]
214 # add tau if associated
215 if L.tau[m] is not None:
216 integral[m] += L.tau[m]
218 # do the sweep
219 for m in range(0, M):
220 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
221 rhs = P.dtype_u(integral[m])
222 for j in range(1, m + 1):
223 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl)
225 # implicit solve with prefactor stemming from QI
226 L.u[m + 1] = P.solve_system(
227 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
228 )
230 # update function values
231 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
233 # indicate presence of new values at this level
234 L.status.updated = True
236 return None
238 def compute_residual(self, stage=''):
239 lvl = self.level
241 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation:
242 super().compute_residual(stage=stage)
243 return None
245 res = lvl.u[0] - lvl.u[-1]
246 for m in range(1, self.coll.num_nodes + 1):
247 res += lvl.dt * self.coll.Qmat[-1, m] * (lvl.f[m].impl + lvl.f[m].expl)
249 if lvl.params.residual_type[-3:] == 'abs':
250 lvl.status.residual = abs(res)
251 else:
252 lvl.status.residual = abs(res) / abs(lvl.u[0])
254 lvl.status.updated = False