Source code for implementations.convergence_controller_classes.interpolate_between_restarts

import numpy as np
from pySDC.core.convergence_controller import ConvergenceController, Status
from qmat.lagrange import LagrangeApproximation


[docs] class InterpolateBetweenRestarts(ConvergenceController): """ Interpolate the solution and right hand side to the new set of collocation nodes after a restart. The idea is that when you adjust the step size between restarts, you already know what the new quadrature method is going to be and possibly interpolating the current iterate to these results in a better initial guess than spreading the initial conditions or whatever you usually like to do. """
[docs] def setup(self, controller, params, description, **kwargs): """ Store the initial guess used in the sweeper when no restart has happened Args: controller (pySDC.Controller.controller): The controller params (dict): Parameters for the convergence controller description (dict): The description object used to instantiate the controller """ defaults = { 'control_order': 50, } return {**defaults, **super().setup(controller, params, description, **kwargs)}
[docs] def setup_status_variables(self, controller, **kwargs): """ Add variables to the sweeper containing the interpolated solution and right hand side. Args: controller (pySDC.Controller.controller): The controller """ self.status = Status(['u_inter', 'f_inter', 'perform_interpolation', 'skip_interpolation']) self.status.u_inter = [] self.status.f_inter = [] self.status.perform_interpolation = False self.status.skip_interpolation = False
[docs] def post_spread_processing(self, controller, step, **kwargs): """ Spread the interpolated values to the collocation nodes. This overrides whatever the sweeper uses for prediction. Args: controller (pySDC.Controller.controller): The controller step (pySDC.Step.step): The current step """ if self.status.perform_interpolation and not self.status.skip_interpolation: for i in range(len(step.levels)): level = step.levels[i] if level.f[0] is None: level.f[0] = level.prob.dtype_f(level.prob.init) for m in range(len(level.u)): level.u[m][:] = self.status.u_inter[i][m].reshape(level.prob.init[0])[:] level.f[m][:] = self.status.f_inter[i][m].reshape(level.f[m].shape)[:] # reset the status variables self.status.perform_interpolation = False self.status.u_inter = [] self.status.f_inter = [] self.status.skip_interpolation = False
[docs] def post_iteration_processing(self, controller, step, **kwargs): """ Interpolate the solution and right hand sides and store them in the sweeper, where they will be distributed accordingly in the prediction step. This function is called after every iteration instead of just after the step because we might choose to stop iterating as soon as we have decided to restart. If we let the step continue to iterate, this is not the most efficient implementation and you may choose to write a different convergence controller. The interpolation is based on Thibaut's magic. Args: controller (pySDC.Controller): The controller step (pySDC.Step.step): The current step """ if ( step.status.restart and all(level.status.dt_new for level in step.levels) and not self.status.skip_interpolation ): for level in step.levels: nodes_old = level.sweep.coll.nodes.copy() nodes_new = level.sweep.coll.nodes.copy() * level.status.dt_new / level.params.dt if level.f[0] is None: prob = level.prob level.f[0] = prob.eval_f(level.u[0], level.time) interpolator = LagrangeApproximation(points=np.append(0, nodes_old)) interpolation_matrix = interpolator.getInterpolationMatrix(np.append(0, nodes_new)) self.status.u_inter += [(interpolation_matrix @ [me.flatten() for me in level.u][:])[:]] self.status.f_inter += [(interpolation_matrix @ [me.flatten() for me in level.f][:])[:]] self.status.perform_interpolation = True self.log( f'Interpolating before restart from dt={level.params.dt:.2e} to dt={level.status.dt_new:.2e}', step ) else: self.status.perform_interpolation = False