Source code for implementations.convergence_controller_classes.estimate_polynomial_error

import numpy as np

from qmat.lagrange import LagrangeApproximation
from pySDC.core.convergence_controller import ConvergenceController


[docs] class EstimatePolynomialError(ConvergenceController): """ Estimate the local error by using all but one collocation node in a polynomial interpolation to that node. While the converged collocation problem with M nodes gives a order M approximation to this point, the interpolation gives only an order M-1 approximation. Hence, we have two solutions with different order, and we know their order. That is to say this gives an error estimate that is order M. Keep in mind that the collocation problem should be converged for this and has order up to 2M. Still, the lower order method can be used for time step selection, for instance. If the last node is not the end point, we can interpolate to that node, which is an order M approximation and compare to the order 2M approximation we get from the extrapolation step. By default, we interpolate to the second to last node. """
[docs] def setup(self, controller, params, description, **kwargs): """ Args: controller (pySDC.Controller.controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: (dict): The updated params dictionary """ from pySDC.implementations.hooks.log_embedded_error_estimate import LogEmbeddedErrorEstimate from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence sweeper_params = description['sweeper_params'] num_nodes = sweeper_params['num_nodes'] quad_type = sweeper_params['quad_type'] defaults = { 'control_order': -75, 'estimate_on_node': num_nodes + 1 if quad_type == 'GAUSS' else num_nodes - 1, **super().setup(controller, params, description, **kwargs), } self.comm = description['sweeper_params'].get('comm', None) if self.comm: from mpi4py import MPI self.prepare_MPI_datatypes() self.MPI_SUM = MPI.SUM controller.add_hook(LogEmbeddedErrorEstimate) self.check_convergence = CheckConvergence.check_convergence if quad_type != 'GAUSS' and defaults['estimate_on_node'] > num_nodes: from pySDC.core.errors import ParameterError raise ParameterError( 'You cannot interpolate with lower accuracy to the end point if the end point is a node!' ) self.interpolation_matrix = None return defaults
[docs] def reset_status_variables(self, *args, **kwargs): """ Add variable for embedded error Returns: None """ self.add_status_variable_to_level('error_embedded_estimate') self.add_status_variable_to_level('order_embedded_estimate')
[docs] def matmul(self, A, b, xp=np): """ Matrix vector multiplication, possibly MPI parallel. The parallel implementation performs a reduce operation in every row of the matrix. While communicating the entire vector once could reduce the number of communications, this way we never need to store the entire vector on any specific rank. Args: A (2d np.ndarray): Matrix b (list): Vector xp: Either numpy or cupy Returns: List: Axb """ if self.comm: res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])] buf = b[0] * 0.0 for i in range(0, A.shape[0]): index = self.comm.rank + (1 if self.comm.rank < self.params.estimate_on_node - 1 else 0) send_buf = ( (A[i, index] * b[index]) if self.comm.rank != self.params.estimate_on_node - 1 else xp.zeros_like(res[0]) ) self.comm.Allreduce(send_buf, buf, op=self.MPI_SUM) res[i] += buf return res else: return A @ xp.asarray(b)
[docs] def post_iteration_processing(self, controller, S, **kwargs): """ Estimate the error Args: controller (pySDC.Controller.controller): The controller S (pySDC.Step.step): The current step Returns: None """ if self.check_convergence(S): L = S.levels[0] coll = L.sweep.coll nodes = np.append(np.append(0, coll.nodes), 1.0) estimate_on_node = self.params.estimate_on_node xp = L.u[0].xp if self.interpolation_matrix is None: interpolator = LagrangeApproximation( points=[nodes[i] for i in range(coll.num_nodes + 1) if i != estimate_on_node] ) self.interpolation_matrix = xp.array(interpolator.getInterpolationMatrix([nodes[estimate_on_node]])) u = [ L.u[i].flatten() if L.u[i] is not None else L.u[i] for i in range(coll.num_nodes + 1) if i != estimate_on_node ] u_inter = self.matmul(self.interpolation_matrix, u, xp=xp)[0].reshape(L.prob.init[0]) # compute end point if needed if estimate_on_node == len(nodes) - 1: if L.uend is None: L.sweep.compute_end_point() high_order_sol = L.uend rank = 0 L.status.order_embedded_estimate = coll.num_nodes + 1 else: high_order_sol = L.u[estimate_on_node] rank = estimate_on_node - 1 L.status.order_embedded_estimate = coll.num_nodes * 1 if self.comm: buf = np.array(abs(u_inter - high_order_sol) if self.comm.rank == rank else 0.0) self.comm.Bcast(buf, root=rank) L.status.error_embedded_estimate = float(buf) else: L.status.error_embedded_estimate = abs(u_inter - high_order_sol) self.debug( f'Obtained error estimate: {L.status.error_embedded_estimate:.2e} of order {L.status.order_embedded_estimate}', S, )
[docs] def check_parameters(self, controller, params, description, **kwargs): """ Check if we allow the scheme to solve the collocation problems to convergence. Args: controller (pySDC.Controller): The controller params (dict): The params passed for this specific convergence controller description (dict): The description object used to instantiate the controller Returns: bool: Whether the parameters are compatible str: The error message """ if description['sweeper_params'].get('num_nodes', 0) < 2: return False, 'Need at least two collocation nodes to interpolate to one!' return True, ""