Source code for implementations.convergence_controller_classes.estimate_extrapolation_error

import numpy as np
from scipy.special import factorial

from pySDC.core.convergence_controller import ConvergenceController, Status
from pySDC.core.errors import DataError
from pySDC.implementations.datatype_classes.mesh import mesh, imex_mesh
from pySDC.implementations.hooks.log_extrapolated_error_estimate import LogExtrapolationErrorEstimate


[docs] class EstimateExtrapolationErrorBase(ConvergenceController): """ Abstract base class for extrapolated error estimates ---------------------------------------------------- This error estimate extrapolates a solution based on Taylor expansions using solutions of previous time steps. In particular, child classes need to implement how to make these solutions available, which works differently for MPI and non-MPI versions. """ def __init__(self, controller, params, description, **kwargs): """ Initialization routine Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller """ self.prev = Status(["t", "u", "f", "dt"]) # store solutions etc. of previous steps here self.coeff = Status(["u", "f", "prefactor"]) # store coefficients for extrapolation here super().__init__(controller, params, description) controller.add_hook(LogExtrapolationErrorEstimate)
[docs] def setup(self, controller, params, description, **kwargs): """ The extrapolation based method requires storage of previous values of u, f, t and dt and also requires solving a linear system of equations to compute the Taylor expansion finite difference style. Here, all variables are initialized which are needed for this process. Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: dict: Updated parameters with default values """ from pySDC.implementations.convergence_controller_classes.hotrod import HotRod from pySDC.implementations.convergence_controller_classes.adaptivity import ( Adaptivity, ) default_params = { "control_order": -75, "use_adaptivity": True in [me == Adaptivity for me in description.get("convergence_controllers", {})], "use_HotRod": True in [me == HotRod for me in description.get("convergence_controllers", {})], "order_time_marching": description["step_params"]["maxiter"], } new_params = {**default_params, **super().setup(controller, params, description, **kwargs)} # Do a sufficiently high order Taylor expansion new_params["Taylor_order"] = new_params["order_time_marching"] + 2 # Estimate and store values from this iteration new_params["estimate_iter"] = new_params["order_time_marching"] - (1 if new_params["use_HotRod"] else 0) # Store n values. Since we store u and f, we need only half of each (the +1 is for rounding) new_params["n"] = (new_params["Taylor_order"] + 1) // 2 new_params["n_per_proc"] = new_params["n"] * 1 return new_params
[docs] def setup_status_variables(self, controller, **kwargs): """ Initialize coefficient variables and add variable to the levels for extrapolated error Args: controller (pySDC.controller): The controller Returns: None """ self.coeff.u = [None] * self.params.n self.coeff.f = [0.0] * self.params.n self.add_status_variable_to_level('error_extrapolation_estimate')
[docs] def check_parameters(self, controller, params, description, **kwargs): """ Check whether parameters are compatible with whatever assumptions went into the step size functions etc. Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: bool: Whether the parameters are compatible str: Error message """ if description["step_params"].get("restol", -1.0) >= 0: return ( False, "Extrapolation error needs constant order in time and hence restol in the step parameters \ has to be smaller than 0!", ) if controller.params.mssdc_jac: return ( False, "Extrapolation error estimator needs the same order on all steps, please activate Gauss-Seid\ el multistep mode!", ) return True, ""
[docs] def store_values(self, S, **kwargs): """ Store the required attributes of the step to do the extrapolation. We only care about the last collocation node on the finest level at the moment. Args: S (pySDC.Step): The current step Returns: None """ # figure out which values are to be replaced by the new ones if None in self.prev.t: oldest_val = len(self.prev.t) - len(self.prev.t[self.prev.t == [None]]) else: oldest_val = np.argmin(self.prev.t) # figure out how to store the right hand side f = S.levels[0].f[-1] if type(f) == imex_mesh: self.prev.f[oldest_val] = f.impl + f.expl elif type(f) == mesh: self.prev.f[oldest_val] = f else: raise DataError( f"Unable to store f from datatype {type(f)}, extrapolation based error estimate only\ works with types imex_mesh and mesh" ) # store the rest of the values self.prev.u[oldest_val] = S.levels[0].u[-1] self.prev.t[oldest_val] = S.time + S.dt self.prev.dt[oldest_val] = S.dt return None
[docs] def get_extrapolation_coefficients(self, t, dt, t_eval): """ This function solves a linear system where in the matrix A, the row index reflects the order of the derivative in the Taylor expansion and the column index reflects the particular step and whether its u or f from that step. The vector b on the other hand, contains a 1 in the first entry and zeros elsewhere, since we want to compute the value itself and all the derivatives should vanish after combining the Taylor expansions. This works to the order of the number of rows and since we want a square matrix for solving, we need the same amount of columns, which determines the memory overhead, since it is equal to the solutions / rhs that we need in memory at the time of evaluation. This is enough to get the extrapolated solution, but if we want to compute the local error, we have to compute a prefactor. This is based on error accumulation between steps (first step's solution is exact plus 1 LTE, second solution is exact plus 2 LTE and so on), which can be computed for adaptive step sizes as well. However, this is only true for linear problems, which means we expect the error estimate to work less well for non-linear problems. Since only time differences are important for computing the coefficients, we need to compute this only once when using constant step sizes. When we allow the step size to change, however, we need to recompute this in every step, which is activated by the `use_adaptivity` parameter. Solving for the coefficients requires solving a dense linear system of equations. The number of unknowns is equal to the order of the Taylor expansion, so this step should be cheap compared to the solves in each SDC iteration. The function stores the computed coefficients in the `self.coeff` variables. Args: t (list): The list of times at which we have solutions available dt (list): The step sizes used for computing these solutions (needed for the prefactor) t_eval (float): The time we want to extrapolate to Returns: None """ # prepare A matrix A = np.zeros((self.params.Taylor_order, self.params.Taylor_order)) A[0, 0 : self.params.n] = 1.0 j = np.arange(self.params.Taylor_order) inv_facs = 1.0 / factorial(j) # get the steps backwards from the point of evaluation idx = np.argsort(t) steps_from_now = t[idx] - t_eval # fill A matrix for i in range(1, self.params.Taylor_order): # Taylor expansions of the solutions A[i, : self.params.n] = steps_from_now ** j[i] * inv_facs[i] # Taylor expansions of the first derivatives a.k.a. right hand side evaluations A[i, self.params.n : self.params.Taylor_order] = ( steps_from_now[2 * self.params.n - self.params.Taylor_order :] ** (j[i] - 1) * inv_facs[i - 1] ) # prepare rhs b = np.zeros(self.params.Taylor_order) b[0] = 1.0 # solve linear system for the coefficients coeff = np.linalg.solve(A, b) self.coeff.u = coeff[: self.params.n] self.coeff.f[self.params.n * 2 - self.params.Taylor_order :] = coeff[self.params.n : self.params.Taylor_order] # determine prefactor step_size_ratios = abs(dt[len(dt) - len(self.coeff.u) :] / dt[-1]) ** (self.params.Taylor_order - 1) inv_prefactor = -sum(step_size_ratios[1:]) - 1.0 for i in range(len(self.coeff.u)): inv_prefactor += sum(step_size_ratios[1 : i + 1]) * self.coeff.u[i] self.coeff.prefactor = 1.0 / abs(inv_prefactor) return None
[docs] class EstimateExtrapolationErrorNonMPI(EstimateExtrapolationErrorBase): """ Implementation of the extrapolation error estimate for the non-MPI controller. """
[docs] def setup(self, controller, params, description, **kwargs): """ Add a no parameter 'no_storage' which decides whether the standard or the no-memory-overhead version is run, where only values are used for extrapolation which are in memory of other processes Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: dict: Updated parameters with default values """ default_params = super().setup(controller, params, description) non_mpi_defaults = { "no_storage": False, } return {**non_mpi_defaults, **default_params}
[docs] def setup_status_variables(self, controller, **kwargs): """ Initialize storage variables. Args: controller (pySDC.controller): The controller Returns: None """ super().setup_status_variables(controller, **kwargs) self.prev.t = np.array([None] * self.params.n) self.prev.dt = np.array([None] * self.params.n) self.prev.u = [None] * self.params.n self.prev.f = [None] * self.params.n return None
[docs] def post_iteration_processing(self, controller, S, **kwargs): """ We perform three key operations here in the last iteration: - Compute the error estimate - Compute the coefficients if needed - Store the values of the step if we pretend not to for the no-memory overhead version Args: controller (pySDC.Controller): The controller S (pySDC.Step): The current step Returns: None """ if S.status.iter == self.params.estimate_iter: t_eval = S.time + S.dt # compute the extrapolation coefficients if needed if ( ( None in self.coeff.u or self.params.use_adaptivity or (not self.params.no_storage and S.status.time_size > 1) ) and None not in self.prev.t and t_eval > max(self.prev.t) ): self.get_extrapolation_coefficients(self.prev.t, self.prev.dt, t_eval) # compute the error if we can if None not in self.coeff.u and None not in self.prev.t: self.get_extrapolated_error(S) # store the solution and pretend we didn't because in the non MPI version we take a few shortcuts if self.params.no_storage: self.store_values(S) return None
[docs] def prepare_next_block(self, controller, S, size, time, Tend, MS, **kwargs): """ If the no-memory-overhead version is used, we need to delete stuff that shouldn't be available. Otherwise, we need to store all the stuff that we can. Args: controller (pySDC.Controller): The controller S (pySDC.step): The current step size (int): Number of ranks time (float): The current time Tend (float): The final time MS (list): Active steps Returns: None """ # delete values that should not be available in the next step if self.params.no_storage: self.prev.t = np.array([None] * self.params.n) self.prev.dt = np.array([None] * self.params.n) self.prev.u = [None] * self.params.n self.prev.f = [None] * self.params.n else: # decide where we need to restart to store everything up to that point restarts = [S.status.restart for S in MS] restart_at = np.where(restarts)[0][0] if True in restarts else len(MS) # store values in the current block that don't need restarting if restart_at > S.status.slot: self.store_values(S) return None
[docs] def get_extrapolated_solution(self, S, **kwargs): """ Combine values from previous steps to extrapolate. Args: S (pySDC.Step): The current step Returns: dtype_u: The extrapolated solution """ if len(S.levels) > 1: raise NotImplementedError("Extrapolated estimate only works on the finest level for now") # prepare variables u_ex = S.levels[0].u[-1] * 0.0 idx = np.argsort(self.prev.t) # see if we have a solution for the current step already stored if (abs(S.time + S.dt - self.prev.t) < 10.0 * np.finfo(float).eps).any(): idx_step = idx[np.argmin(abs(self.prev.t - S.time - S.dt))] else: idx_step = max(idx) + 1 # make a mask of all the steps we want to include in the extrapolation mask = np.logical_and(idx < idx_step, idx >= idx_step - self.params.n) # do the extrapolation by summing everything up for i in range(self.params.n): u_ex += self.coeff.u[i] * self.prev.u[idx[mask][i]] + self.coeff.f[i] * self.prev.f[idx[mask][i]] return u_ex
[docs] def get_extrapolated_error(self, S, **kwargs): """ The extrapolation estimate combines values of u and f from multiple steps to extrapolate and compare to the solution obtained by the time marching scheme. Args: S (pySDC.Step): The current step Returns: None """ u_ex = self.get_extrapolated_solution(S) if u_ex is not None: S.levels[0].status.error_extrapolation_estimate = abs(u_ex - S.levels[0].u[-1]) * self.coeff.prefactor else: S.levels[0].status.error_extrapolation_estimate = None
[docs] class EstimateExtrapolationErrorWithinQ(EstimateExtrapolationErrorBase): """ This convergence controller estimates the local error based on comparing the SDC solution to an extrapolated solution within the quadrature matrix. Collocation methods compute a high order solution from a linear combination of solutions at intermediate time points. While the intermediate solutions (a.k.a. stages) don't share the order of accuracy with the solution at the end of the interval, for SDC we know that the order is equal to the number of nodes + 1 (locally). This is because the solution to the collocation problem is a polynomial approximation of order of the number of nodes. That means we can do a Taylor expansion around the end point of the interval to higher order and after cancelling terms just like we are used to with the extrapolation based error estimate across multiple steps, we get an error estimate that is of the order of accuracy of the stages. This can be used for adaptivity, for instance, with the nice property that it doesn't matter how we arrived at the converged collocation solution, as long as we did. We don't rely on knowing the order of accuracy after every sweep, only after convergence of the collocation problem has been achieved, which we can check from the residual. """
[docs] def setup(self, controller, params, description, **kwargs): """ We need this convergence controller to become active after the check for convergence, because we need the step to be converged. Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: dict: Updated parameters with default values """ from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence num_nodes = description['sweeper_params']['num_nodes'] self.comm = description['sweeper_params'].get('comm', None) if self.comm: from mpi4py import MPI self.sum = MPI.SUM self.check_convergence = CheckConvergence.check_convergence default_params = { 'Taylor_order': 2 * num_nodes, 'n': num_nodes, 'recompute_coefficients': False, **params, } return {**super().setup(controller, params, description, **kwargs), **default_params}
[docs] def post_iteration_processing(self, controller, S, **kwargs): """ Compute the extrapolated error estimate here if the step is converged. Args: controller (pySDC.Controller): The controller S (pySDC.Step): The current step Returns: None """ if not self.check_convergence(S): return None lvl = S.levels[0] nodes_ = lvl.sweep.coll.nodes * S.dt nodes = S.time + np.append(0, nodes_[:-1]) t_eval = S.time + nodes_[-1] dts = np.append(nodes_[0], nodes_[1:] - nodes_[:-1]) self.params.Taylor_order = len(nodes) self.params.n = len(nodes) # compute the extrapolation coefficients if None in self.coeff.u or self.params.recompute_coefficients: self.get_extrapolation_coefficients(nodes, dts, t_eval) # compute the extrapolated solution if lvl.f[0] is None: lvl.f[0] = lvl.prob.eval_f(lvl.u[0], lvl.time) if type(lvl.f[0]) == imex_mesh: f = [lvl.f[i].impl + lvl.f[i].expl if self.coeff.f[i] and lvl.f[i] else 0.0 for i in range(len(lvl.f) - 1)] elif type(lvl.f[0]) == mesh: f = [lvl.f[i] if self.coeff.f[i] else 0.0 for i in range(len(lvl.f) - 1)] else: raise DataError( f"Unable to store f from datatype {type(lvl.f[0])}, extrapolation based error estimate only\ works with types imex_mesh and mesh" ) # compute the error with the weighted sum if self.comm: idx = (self.comm.rank + 1) % self.comm.size sendbuf = self.coeff.u[idx] * lvl.u[idx] + self.coeff.f[idx] * f[idx] u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) if self.comm.rank == self.comm.size - 1 else None self.comm.Reduce(sendbuf, u_ex, op=self.sum, root=self.comm.size - 1) else: u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) for i in range(self.params.n): u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i] # store the error if self.comm: error = ( abs(u_ex - lvl.u[self.comm.rank + 1]) * self.coeff.prefactor if self.comm.rank == self.comm.size - 1 else None ) lvl.status.error_extrapolation_estimate = self.comm.bcast(error, root=self.comm.size - 1) else: lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor return None