Coverage for pySDC / implementations / convergence_controller_classes / estimate_extrapolation_error.py: 97%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 07:06 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 07:06 +0000
1import numpy as np
2from scipy.special import factorial
4from pySDC.core.convergence_controller import ConvergenceController, Status
5from pySDC.core.errors import DataError
6from pySDC.implementations.datatype_classes.mesh import mesh, imex_mesh
7from pySDC.implementations.hooks.log_extrapolated_error_estimate import LogExtrapolationErrorEstimate
10class EstimateExtrapolationErrorBase(ConvergenceController):
11 """
12 Abstract base class for extrapolated error estimates
13 ----------------------------------------------------
14 This error estimate extrapolates a solution based on Taylor expansions using solutions of previous time steps.
15 In particular, child classes need to implement how to make these solutions available, which works differently for
16 MPI and non-MPI versions.
17 """
19 def __init__(self, controller, params, description, **kwargs):
20 """
21 Initialization routine
23 Args:
24 controller (pySDC.Controller): The controller
25 params (dict): The params passed for this specific convergence controller
26 description (dict): The description object used to instantiate the controller
27 """
28 self.prev = Status(["t", "u", "f", "dt"]) # store solutions etc. of previous steps here
29 self.coeff = Status(["u", "f", "prefactor"]) # store coefficients for extrapolation here
30 super().__init__(controller, params, description)
31 controller.add_hook(LogExtrapolationErrorEstimate)
33 def setup(self, controller, params, description, **kwargs):
34 """
35 The extrapolation based method requires storage of previous values of u, f, t and dt and also requires solving
36 a linear system of equations to compute the Taylor expansion finite difference style. Here, all variables are
37 initialized which are needed for this process.
39 Args:
40 controller (pySDC.Controller): The controller
41 params (dict): The params passed for this specific convergence controller
42 description (dict): The description object used to instantiate the controller
44 Returns:
45 dict: Updated parameters with default values
46 """
48 from pySDC.implementations.convergence_controller_classes.hotrod import HotRod
49 from pySDC.implementations.convergence_controller_classes.adaptivity import (
50 Adaptivity,
51 )
53 default_params = {
54 "control_order": -75,
55 "use_adaptivity": True in [me == Adaptivity for me in description.get("convergence_controllers", {})],
56 "use_HotRod": True in [me == HotRod for me in description.get("convergence_controllers", {})],
57 "order_time_marching": description["step_params"]["maxiter"],
58 }
60 new_params = {**default_params, **super().setup(controller, params, description, **kwargs)}
62 # Do a sufficiently high order Taylor expansion
63 new_params["Taylor_order"] = new_params["order_time_marching"] + 2
65 # Estimate and store values from this iteration
66 new_params["estimate_iter"] = new_params["order_time_marching"] - (1 if new_params["use_HotRod"] else 0)
68 # Store n values. Since we store u and f, we need only half of each (the +1 is for rounding)
69 new_params["n"] = (new_params["Taylor_order"] + 1) // 2
70 new_params["n_per_proc"] = new_params["n"] * 1
72 return new_params
74 def setup_status_variables(self, controller, **kwargs):
75 """
76 Initialize coefficient variables and add variable to the levels for extrapolated error
78 Args:
79 controller (pySDC.controller): The controller
81 Returns:
82 None
83 """
84 self.coeff.u = [None] * self.params.n
85 self.coeff.f = [0.0] * self.params.n
87 self.add_status_variable_to_level('error_extrapolation_estimate')
89 def check_parameters(self, controller, params, description, **kwargs):
90 """
91 Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
93 Args:
94 controller (pySDC.Controller): The controller
95 params (dict): The params passed for this specific convergence controller
96 description (dict): The description object used to instantiate the controller
98 Returns:
99 bool: Whether the parameters are compatible
100 str: Error message
101 """
102 if description["step_params"].get("restol", -1.0) >= 0:
103 return (
104 False,
105 "Extrapolation error needs constant order in time and hence restol in the step parameters \
106has to be smaller than 0!",
107 )
109 if controller.params.mssdc_jac:
110 return (
111 False,
112 "Extrapolation error estimator needs the same order on all steps, please activate Gauss-Seid\
113el multistep mode!",
114 )
116 return True, ""
118 def store_values(self, S, **kwargs):
119 """
120 Store the required attributes of the step to do the extrapolation. We only care about the last collocation
121 node on the finest level at the moment.
123 Args:
124 S (pySDC.Step): The current step
126 Returns:
127 None
128 """
129 # figure out which values are to be replaced by the new ones
130 if None in self.prev.t:
131 oldest_val = len(self.prev.t) - len(self.prev.t[self.prev.t == [None]])
132 else:
133 oldest_val = np.argmin(self.prev.t)
135 # figure out how to store the right hand side
136 f = S.levels[0].f[-1]
137 if type(f) == imex_mesh:
138 self.prev.f[oldest_val] = f.impl + f.expl
139 elif type(f) == mesh:
140 self.prev.f[oldest_val] = f
141 else:
142 raise DataError(f"Unable to store f from datatype {type(f)}, extrapolation based error estimate only\
143 works with types imex_mesh and mesh")
145 # store the rest of the values
146 self.prev.u[oldest_val] = S.levels[0].u[-1]
147 self.prev.t[oldest_val] = S.time + S.dt
148 self.prev.dt[oldest_val] = S.dt
150 return None
152 def get_extrapolation_coefficients(self, t, dt, t_eval):
153 """
154 This function solves a linear system where in the matrix A, the row index reflects the order of the derivative
155 in the Taylor expansion and the column index reflects the particular step and whether its u or f from that
156 step. The vector b on the other hand, contains a 1 in the first entry and zeros elsewhere, since we want to
157 compute the value itself and all the derivatives should vanish after combining the Taylor expansions. This
158 works to the order of the number of rows and since we want a square matrix for solving, we need the same amount
159 of columns, which determines the memory overhead, since it is equal to the solutions / rhs that we need in
160 memory at the time of evaluation.
162 This is enough to get the extrapolated solution, but if we want to compute the local error, we have to compute
163 a prefactor. This is based on error accumulation between steps (first step's solution is exact plus 1 LTE,
164 second solution is exact plus 2 LTE and so on), which can be computed for adaptive step sizes as well. However,
165 this is only true for linear problems, which means we expect the error estimate to work less well for non-linear
166 problems.
168 Since only time differences are important for computing the coefficients, we need to compute this only once when
169 using constant step sizes. When we allow the step size to change, however, we need to recompute this in every
170 step, which is activated by the `use_adaptivity` parameter.
172 Solving for the coefficients requires solving a dense linear system of equations. The number of unknowns is
173 equal to the order of the Taylor expansion, so this step should be cheap compared to the solves in each SDC
174 iteration.
176 The function stores the computed coefficients in the `self.coeff` variables.
178 Args:
179 t (list): The list of times at which we have solutions available
180 dt (list): The step sizes used for computing these solutions (needed for the prefactor)
181 t_eval (float): The time we want to extrapolate to
183 Returns:
184 None
185 """
187 # prepare A matrix
188 A = np.zeros((self.params.Taylor_order, self.params.Taylor_order))
189 A[0, 0 : self.params.n] = 1.0
190 j = np.arange(self.params.Taylor_order)
191 inv_facs = 1.0 / factorial(j)
193 # get the steps backwards from the point of evaluation
194 idx = np.argsort(t)
195 steps_from_now = t[idx] - t_eval
197 # fill A matrix
198 for i in range(1, self.params.Taylor_order):
199 # Taylor expansions of the solutions
200 A[i, : self.params.n] = steps_from_now ** j[i] * inv_facs[i]
202 # Taylor expansions of the first derivatives a.k.a. right hand side evaluations
203 A[i, self.params.n : self.params.Taylor_order] = (
204 steps_from_now[2 * self.params.n - self.params.Taylor_order :] ** (j[i] - 1) * inv_facs[i - 1]
205 )
207 # prepare rhs
208 b = np.zeros(self.params.Taylor_order)
209 b[0] = 1.0
211 # solve linear system for the coefficients
212 coeff = np.linalg.solve(A, b)
213 self.coeff.u = coeff[: self.params.n]
214 self.coeff.f[self.params.n * 2 - self.params.Taylor_order :] = coeff[self.params.n : self.params.Taylor_order]
216 # determine prefactor
217 step_size_ratios = abs(dt[len(dt) - len(self.coeff.u) :] / dt[-1]) ** (self.params.Taylor_order - 1)
218 inv_prefactor = -sum(step_size_ratios[1:]) - 1.0
219 for i in range(len(self.coeff.u)):
220 inv_prefactor += sum(step_size_ratios[1 : i + 1]) * self.coeff.u[i]
221 self.coeff.prefactor = 1.0 / abs(inv_prefactor)
223 return None
226class EstimateExtrapolationErrorNonMPI(EstimateExtrapolationErrorBase):
227 """
228 Implementation of the extrapolation error estimate for the non-MPI controller.
229 """
231 def setup(self, controller, params, description, **kwargs):
232 """
233 Add a no parameter 'no_storage' which decides whether the standard or the no-memory-overhead version is run,
234 where only values are used for extrapolation which are in memory of other processes
236 Args:
237 controller (pySDC.Controller): The controller
238 params (dict): The params passed for this specific convergence controller
239 description (dict): The description object used to instantiate the controller
241 Returns:
242 dict: Updated parameters with default values
243 """
244 default_params = super().setup(controller, params, description)
246 non_mpi_defaults = {
247 "no_storage": False,
248 }
250 return {**non_mpi_defaults, **default_params}
252 def setup_status_variables(self, controller, **kwargs):
253 """
254 Initialize storage variables.
256 Args:
257 controller (pySDC.controller): The controller
259 Returns:
260 None
261 """
262 super().setup_status_variables(controller, **kwargs)
264 self.prev.t = np.array([None] * self.params.n)
265 self.prev.dt = np.array([None] * self.params.n)
266 self.prev.u = [None] * self.params.n
267 self.prev.f = [None] * self.params.n
269 return None
271 def post_iteration_processing(self, controller, S, **kwargs):
272 """
273 We perform three key operations here in the last iteration:
274 - Compute the error estimate
275 - Compute the coefficients if needed
276 - Store the values of the step if we pretend not to for the no-memory overhead version
278 Args:
279 controller (pySDC.Controller): The controller
280 S (pySDC.Step): The current step
282 Returns:
283 None
284 """
285 if S.status.iter == self.params.estimate_iter:
286 t_eval = S.time + S.dt
288 # compute the extrapolation coefficients if needed
289 if (
290 (
291 None in self.coeff.u
292 or self.params.use_adaptivity
293 or (not self.params.no_storage and S.status.time_size > 1)
294 )
295 and None not in self.prev.t
296 and t_eval > max(self.prev.t)
297 ):
298 self.get_extrapolation_coefficients(self.prev.t, self.prev.dt, t_eval)
300 # compute the error if we can
301 if None not in self.coeff.u and None not in self.prev.t:
302 self.get_extrapolated_error(S)
304 # store the solution and pretend we didn't because in the non MPI version we take a few shortcuts
305 if self.params.no_storage:
306 self.store_values(S)
308 return None
310 def prepare_next_block(self, controller, S, size, time, Tend, MS, **kwargs):
311 """
312 If the no-memory-overhead version is used, we need to delete stuff that shouldn't be available. Otherwise, we
313 need to store all the stuff that we can.
315 Args:
316 controller (pySDC.Controller): The controller
317 S (pySDC.step): The current step
318 size (int): Number of ranks
319 time (float): The current time
320 Tend (float): The final time
321 MS (list): Active steps
323 Returns:
324 None
325 """
327 # delete values that should not be available in the next step
328 if self.params.no_storage:
329 self.prev.t = np.array([None] * self.params.n)
330 self.prev.dt = np.array([None] * self.params.n)
331 self.prev.u = [None] * self.params.n
332 self.prev.f = [None] * self.params.n
334 else:
335 # decide where we need to restart to store everything up to that point
336 restarts = [S.status.restart for S in MS]
337 restart_at = np.where(restarts)[0][0] if True in restarts else len(MS)
339 # store values in the current block that don't need restarting
340 if restart_at > S.status.slot:
341 self.store_values(S)
343 return None
345 def get_extrapolated_solution(self, S, **kwargs):
346 """
347 Combine values from previous steps to extrapolate.
349 Args:
350 S (pySDC.Step): The current step
352 Returns:
353 dtype_u: The extrapolated solution
354 """
355 if len(S.levels) > 1:
356 raise NotImplementedError("Extrapolated estimate only works on the finest level for now")
358 # prepare variables
359 u_ex = S.levels[0].u[-1] * 0.0
360 idx = np.argsort(self.prev.t)
362 # see if we have a solution for the current step already stored
363 if (abs(S.time + S.dt - self.prev.t) < 10.0 * np.finfo(float).eps).any():
364 idx_step = idx[np.argmin(abs(self.prev.t - S.time - S.dt))]
365 else:
366 idx_step = max(idx) + 1
368 # make a mask of all the steps we want to include in the extrapolation
369 mask = np.logical_and(idx < idx_step, idx >= idx_step - self.params.n)
371 # do the extrapolation by summing everything up
372 for i in range(self.params.n):
373 u_ex += self.coeff.u[i] * self.prev.u[idx[mask][i]] + self.coeff.f[i] * self.prev.f[idx[mask][i]]
375 return u_ex
377 def get_extrapolated_error(self, S, **kwargs):
378 """
379 The extrapolation estimate combines values of u and f from multiple steps to extrapolate and compare to the
380 solution obtained by the time marching scheme.
382 Args:
383 S (pySDC.Step): The current step
385 Returns:
386 None
387 """
388 u_ex = self.get_extrapolated_solution(S)
389 if u_ex is not None:
390 S.levels[0].status.error_extrapolation_estimate = abs(u_ex - S.levels[0].u[-1]) * self.coeff.prefactor
391 else:
392 S.levels[0].status.error_extrapolation_estimate = None
395class EstimateExtrapolationErrorWithinQ(EstimateExtrapolationErrorBase):
396 """
397 This convergence controller estimates the local error based on comparing the SDC solution to an extrapolated
398 solution within the quadrature matrix. Collocation methods compute a high order solution from a linear combination
399 of solutions at intermediate time points. While the intermediate solutions (a.k.a. stages) don't share the order of
400 accuracy with the solution at the end of the interval, for SDC we know that the order is equal to the number of
401 nodes + 1 (locally). This is because the solution to the collocation problem is a polynomial approximation of order
402 of the number of nodes.
403 That means we can do a Taylor expansion around the end point of the interval to higher order and after cancelling
404 terms just like we are used to with the extrapolation based error estimate across multiple steps, we get an error
405 estimate that is of the order of accuracy of the stages.
406 This can be used for adaptivity, for instance, with the nice property that it doesn't matter how we arrived at the
407 converged collocation solution, as long as we did. We don't rely on knowing the order of accuracy after every sweep,
408 only after convergence of the collocation problem has been achieved, which we can check from the residual.
409 """
411 def setup(self, controller, params, description, **kwargs):
412 """
413 We need this convergence controller to become active after the check for convergence, because we need the step
414 to be converged.
416 Args:
417 controller (pySDC.Controller): The controller
418 params (dict): The params passed for this specific convergence controller
419 description (dict): The description object used to instantiate the controller
421 Returns:
422 dict: Updated parameters with default values
423 """
424 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
426 num_nodes = description['sweeper_params']['num_nodes']
427 self.comm = description['sweeper_params'].get('comm', None)
428 if self.comm:
429 from mpi4py import MPI
431 self.sum = MPI.SUM
433 self.check_convergence = CheckConvergence.check_convergence
435 default_params = {
436 'Taylor_order': 2 * num_nodes,
437 'n': num_nodes,
438 'recompute_coefficients': False,
439 **params,
440 }
442 return {**super().setup(controller, params, description, **kwargs), **default_params}
444 def post_iteration_processing(self, controller, S, **kwargs):
445 """
446 Compute the extrapolated error estimate here if the step is converged.
448 Args:
449 controller (pySDC.Controller): The controller
450 S (pySDC.Step): The current step
452 Returns:
453 None
454 """
456 if not self.check_convergence(S):
457 return None
459 lvl = S.levels[0]
461 nodes_ = lvl.sweep.coll.nodes * S.dt
462 nodes = S.time + np.append(0, nodes_[:-1])
463 t_eval = S.time + nodes_[-1]
465 dts = np.append(nodes_[0], nodes_[1:] - nodes_[:-1])
466 self.params.Taylor_order = len(nodes)
467 self.params.n = len(nodes)
469 # compute the extrapolation coefficients
470 if None in self.coeff.u or self.params.recompute_coefficients:
471 self.get_extrapolation_coefficients(nodes, dts, t_eval)
473 # compute the extrapolated solution
474 if lvl.f[0] is None:
475 lvl.f[0] = lvl.prob.eval_f(lvl.u[0], lvl.time)
477 if type(lvl.f[0]) == imex_mesh:
478 f = [lvl.f[i].impl + lvl.f[i].expl if self.coeff.f[i] and lvl.f[i] else 0.0 for i in range(len(lvl.f) - 1)]
479 elif type(lvl.f[0]) == mesh:
480 f = [lvl.f[i] if self.coeff.f[i] else 0.0 for i in range(len(lvl.f) - 1)]
481 else:
482 raise DataError(f"Unable to store f from datatype {type(lvl.f[0])}, extrapolation based error estimate only\
483 works with types imex_mesh and mesh")
485 # compute the error with the weighted sum
486 if self.comm:
487 idx = (self.comm.rank + 1) % self.comm.size
488 sendbuf = self.coeff.u[idx] * lvl.u[idx] + self.coeff.f[idx] * f[idx]
489 u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) if self.comm.rank == self.comm.size - 1 else None
490 self.comm.Reduce(sendbuf, u_ex, op=self.sum, root=self.comm.size - 1)
491 else:
492 u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0)
493 for i in range(self.params.n):
494 u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i]
496 # store the error
497 if self.comm:
498 error = (
499 abs(u_ex - lvl.u[self.comm.rank + 1]) * self.coeff.prefactor
500 if self.comm.rank == self.comm.size - 1
501 else None
502 )
503 lvl.status.error_extrapolation_estimate = self.comm.bcast(error, root=self.comm.size - 1)
504 else:
505 lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor
506 return None