import numpy as np
from pySDC.core.convergence_controller import ConvergenceController, Status
from pySDC.implementations.convergence_controller_classes.step_size_limiter import (
StepSizeLimiter,
)
[docs]
class AdaptivityBase(ConvergenceController):
"""
Abstract base class for convergence controllers that implement adaptivity based on arbitrary local error estimates
and update rules.
"""
[docs]
def setup(self, controller, params, description, **kwargs):
"""
Define default parameters here.
Default parameters are:
- control_order (int): The order relative to other convergence controllers
- beta (float): The safety factor
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
(dict): The updated params dictionary
"""
defaults = {
"control_order": -50,
"beta": 0.9,
}
from pySDC.implementations.hooks.log_step_size import LogStepSize
controller.add_hook(LogStepSize)
from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
self.communicate_convergence = CheckConvergence.communicate_convergence
return {**defaults, **super().setup(controller, params, description, **kwargs)}
[docs]
def dependencies(self, controller, description, **kwargs):
"""
Load step size limiters here, if they are desired.
Args:
controller (pySDC.Controller): The controller
description (dict): The description object used to instantiate the controller
Returns:
None
"""
step_limiter_keys = ['dt_min', 'dt_max', 'dt_slope_min', 'dt_slope_max', 'dt_rel_min_slope']
available_keys = [me for me in step_limiter_keys if me in self.params.__dict__.keys()]
if len(available_keys) > 0:
step_limiter_params = {key: self.params.__dict__[key] for key in available_keys}
controller.add_convergence_controller(StepSizeLimiter, params=step_limiter_params, description=description)
if self.params.useMPI:
self.prepare_MPI_logical_operations()
return None
[docs]
def get_new_step_size(self, controller, S, **kwargs):
"""
Determine a step size for the next step from an estimate of the local error of the current step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
raise NotImplementedError("Please implement a rule for updating the step size!")
[docs]
def compute_optimal_step_size(self, beta, dt, e_tol, e_est, order):
"""
Compute the optimal step size for the current step based on the order of the scheme.
This function can be called from `get_new_step_size` for various implementations of adaptivity, but notably not
all! We require to know the order of the error estimate and if we do adaptivity based on the residual, for
instance, we do not know that and we can't use this function.
Args:
beta (float): Safety factor
dt (float): Current step size
e_tol (float): The desired tolerance
e_est (float): The estimated local error
order (int): The order of the local error estimate
Returns:
float: The optimal step size
"""
return beta * dt * (e_tol / e_est) ** (1.0 / order)
[docs]
def get_local_error_estimate(self, controller, S, **kwargs):
"""
Get the local error estimate for updating the step size.
It does not have to be an error estimate, but could be the residual or something else.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
float: The error estimate
"""
raise NotImplementedError("Please implement a way to get the local error")
[docs]
def determine_restart(self, controller, S, **kwargs):
"""
Check if the step wants to be restarted by comparing the estimate of the local error to a preset tolerance
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
if S.status.iter >= S.params.maxiter:
e_est = self.get_local_error_estimate(controller, S)
if e_est >= self.params.e_tol:
# see if we try to avoid restarts
if self.params.get('avoid_restarts'):
more_iter_needed = max([L.status.iter_to_convergence for L in S.levels])
k_final = S.status.iter + more_iter_needed
rho = max([L.status.contraction_factor for L in S.levels])
coll_order = S.levels[0].sweep.coll.order
if rho > 1:
S.status.restart = True
self.log(f"Convergence factor = {rho:.2e} > 1 -> restarting", S)
elif k_final > 2 * S.params.maxiter:
S.status.restart = True
self.log(
f"{more_iter_needed} more iterations needed for convergence -> restart is more efficient", S
)
elif k_final > coll_order:
S.status.restart = True
self.log(
f"{more_iter_needed} more iterations needed for convergence -> restart because collocation problem would be over resolved",
S,
)
else:
S.status.force_continue = True
self.log(f"{more_iter_needed} more iterations needed for convergence -> no restart", S)
else:
S.status.restart = True
self.log(f"Restarting: e={e_est:.2e} >= e_tol={self.params.e_tol:.2e}", S)
return None
[docs]
class AdaptivityForConvergedCollocationProblems(AdaptivityBase):
[docs]
def dependencies(self, controller, description, **kwargs):
"""
Load interpolation between restarts.
Args:
controller (pySDC.Controller): The controller
description (dict): The description object used to instantiate the controller
Returns:
None
"""
super().dependencies(controller, description, **kwargs)
if self.params.interpolate_between_restarts:
from pySDC.implementations.convergence_controller_classes.interpolate_between_restarts import (
InterpolateBetweenRestarts,
)
controller.add_convergence_controller(
InterpolateBetweenRestarts,
description=description,
params={},
)
if self.params.interpolate_between_restarts:
self.interpolator = controller.convergence_controllers[-1]
return None
[docs]
def get_convergence(self, controller, S, **kwargs):
raise NotImplementedError("Please implement a way to check if the collocation problem is converged!")
[docs]
def setup(self, controller, params, description, **kwargs):
"""
Add a default value for control order to the parameters.
Args:
controller (pySDC.Controller): The controller
params (dict): Parameters for the convergence controller
description (dict): The description object used to instantiate the controller
Returns:
dict: Updated parameters
"""
defaults = {
'restol_rel': None,
'e_tol_rel': None,
'restart_at_maxiter': True,
'restol_min': 1e-12,
'restol_max': 1e-5,
'factor_if_not_converged': 4.0,
'residual_max_tol': 1e9,
'maxiter': description['sweeper_params'].get('maxiter', 99),
'interpolate_between_restarts': True,
'abort_at_growing_residual': True,
**super().setup(controller, params, description, **kwargs),
}
if defaults['restol_rel']:
description['level_params']['restol'] = min(
[max([defaults['restol_rel'] * defaults['e_tol'], defaults['restol_min']]), defaults['restol_max']]
)
elif defaults['e_tol_rel']:
description['level_params']['e_tol'] = min([max([defaults['e_tol_rel'] * defaults['e_tol'], 1e-10]), 1e-5])
if defaults['restart_at_maxiter']:
defaults['maxiter'] = description['step_params'].get('maxiter', 99)
self.res_last_iter = np.inf
return defaults
[docs]
def determine_restart(self, controller, S, **kwargs):
if self.get_convergence(controller, S, **kwargs):
self.res_last_iter = np.inf
L = S.levels[0]
e_tol_converged = (
L.status.increment < L.params.e_tol if (L.params.get('e_tol') and L.status.get('increment')) else False
)
if (
self.params.restart_at_maxiter
and S.levels[0].status.residual > S.levels[0].params.restol
and not e_tol_converged
):
self.trigger_restart_upon_nonconvergence(S)
elif self.get_local_error_estimate(controller, S, **kwargs) > self.params.e_tol:
S.status.restart = True
elif (
S.status.time_size == 1
and self.res_last_iter < S.levels[0].status.residual
and S.status.iter > 0
and self.params.abort_at_growing_residual
):
self.trigger_restart_upon_nonconvergence(S)
elif S.levels[0].status.residual > self.params.residual_max_tol:
self.trigger_restart_upon_nonconvergence(S)
if self.params.useMPI:
self.communicate_convergence(self, controller, S, **kwargs)
self.res_last_iter = S.levels[0].status.residual * 1.0
[docs]
def trigger_restart_upon_nonconvergence(self, S):
S.status.restart = True
S.status.force_done = True
for L in S.levels:
L.status.dt_new = L.params.dt / self.params.factor_if_not_converged
self.log(
f'Collocation problem not converged. Reducing step size to {L.status.dt_new:.2e}',
S,
)
if self.params.interpolate_between_restarts:
self.interpolator.status.skip_interpolation = True
[docs]
class Adaptivity(AdaptivityBase):
"""
Class to compute time step size adaptively based on embedded error estimate.
We have a version working in non-MPI pipelined SDC, but Adaptivity requires you to know the order of the scheme,
which you can also know for block-Jacobi, but it works differently and it is only implemented for block
Gauss-Seidel so far.
There is an option to reduce restarts if continued iterations could yield convergence in fewer iterations than
restarting based on an estimate of the contraction factor.
Since often only one or two more iterations suffice, this can boost efficiency of adaptivity significantly.
Notice that the computed step size is not effected.
Be aware that this does not work when Hot Rod is enabled, since that requires us to know the order of the scheme in
more detail. Since we reset to the second to last sweep before moving on, we cannot continue to iterate.
Set the reduced restart up by setting a boolean value for "avoid_restarts" in the parameters for the convergence
controller.
The behaviour in multi-step SDC is not well studied and it is unclear if anything useful happens there.
"""
[docs]
def setup(self, controller, params, description, **kwargs):
"""
Define default parameters here.
Default parameters are:
- control_order (int): The order relative to other convergence controllers
- beta (float): The safety factor
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
(dict): The updated params dictionary
"""
defaults = {
"embedded_error_flavor": 'standard',
}
return {**defaults, **super().setup(controller, params, description, **kwargs)}
[docs]
def dependencies(self, controller, description, **kwargs):
"""
Load the embedded error estimator.
Args:
controller (pySDC.Controller): The controller
description (dict): The description object used to instantiate the controller
Returns:
None
"""
from pySDC.implementations.convergence_controller_classes.estimate_embedded_error import EstimateEmbeddedError
super().dependencies(controller, description, **kwargs)
controller.add_convergence_controller(
EstimateEmbeddedError.get_implementation(self.params.embedded_error_flavor, self.params.useMPI),
description=description,
)
# load contraction factor estimator if necessary
if self.params.get('avoid_restarts'):
from pySDC.implementations.convergence_controller_classes.estimate_contraction_factor import (
EstimateContractionFactor,
)
params = {'e_tol': self.params.e_tol}
controller.add_convergence_controller(EstimateContractionFactor, description=description, params=params)
return None
[docs]
def check_parameters(self, controller, params, description, **kwargs):
"""
Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
For adaptivity, we need to know the order of the scheme.
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
bool: Whether the parameters are compatible
str: The error message
"""
if description["level_params"].get("restol", -1.0) >= 0:
return (
False,
"Adaptivity needs constant order in time and hence restol in the step parameters has to be \
smaller than 0!",
)
if controller.params.mssdc_jac:
return (
False,
"Adaptivity needs the same order on all steps, please activate Gauss-Seidel multistep mode!",
)
if "e_tol" not in params.keys():
return (
False,
"Adaptivity needs a local tolerance! Please pass `e_tol` to the parameters for this convergence controller!",
)
return True, ""
[docs]
def get_new_step_size(self, controller, S, **kwargs):
"""
Determine a step size for the next step from an embedded estimate of the local error of the current step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
# check if we performed the desired amount of sweeps
if S.status.iter == S.params.maxiter:
L = S.levels[0]
# compute next step size
order = S.status.iter # embedded error estimate is same order as time marching
e_est = self.get_local_error_estimate(controller, S)
L.status.dt_new = self.compute_optimal_step_size(
self.params.beta, L.params.dt, self.params.e_tol, e_est, order
)
self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
return None
[docs]
def get_local_error_estimate(self, controller, S, **kwargs):
"""
Get the embedded error estimate of the finest level of the step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
float: Embedded error estimate
"""
return S.levels[0].status.error_embedded_estimate
[docs]
class AdaptivityRK(Adaptivity):
"""
Adaptivity for Runge-Kutta methods. Basically, we need to change the order in the step size update
"""
[docs]
def setup(self, controller, params, description, **kwargs):
defaults = {}
defaults['update_order'] = params.get('update_order', description['sweeper_class'].get_update_order())
return {**defaults, **super().setup(controller, params, description, **kwargs)}
[docs]
def get_new_step_size(self, controller, S, **kwargs):
"""
Determine a step size for the next step from an embedded estimate of the local error of the current step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
# check if we performed the desired amount of sweeps
if S.status.iter == S.params.maxiter:
L = S.levels[0]
# compute next step size
order = self.params.update_order
e_est = self.get_local_error_estimate(controller, S)
L.status.dt_new = self.compute_optimal_step_size(
self.params.beta, L.params.dt, self.params.e_tol, e_est, order
)
self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
return None
[docs]
class AdaptivityResidual(AdaptivityBase):
"""
Do adaptivity based on residual.
Since we don't know a correlation between the residual and the error (for nonlinear problems), we employ a simpler
rule to update the step size. Instead of giving a local tolerance that we try to hit as closely as possible, we set
two thresholds for the residual. When we exceed the upper one, we reduce the step size by a factor of 2 and if the
residual falls below the lower threshold, we double the step size.
Please setup these parameters as "e_tol" and "e_tol_low".
"""
[docs]
def setup(self, controller, params, description, **kwargs):
"""
Define default parameters here.
Default parameters are:
- control_order (int): The order relative to other convergence controllers
- e_tol_low (float): Lower absolute threshold for the residual
- e_tol (float): Upper absolute threshold for the residual
- use_restol (bool): Restart if the residual tolerance was not reached
- max_restarts: Override maximum number of restarts
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
(dict): The updated params dictionary
"""
defaults = {
"control_order": -45,
"e_tol_low": 0,
"e_tol": np.inf,
"use_restol": False,
"max_restarts": 99 if "e_tol_low" in params else None,
"allowed_modifications": ['increase', 'decrease'], # what we are allowed to do with the step size
}
return {**defaults, **params}
[docs]
def setup_status_variables(self, controller, **kwargs):
"""
Change maximum number of allowed restarts here.
Args:
controller (pySDC.Controller): The controller
Reutrns:
None
"""
from pySDC.implementations.convergence_controller_classes.basic_restarting import BasicRestarting
if self.params.max_restarts is not None:
conv_controllers = controller.convergence_controllers
restart_cont = [me for me in conv_controllers if BasicRestarting in type(me).__bases__]
if len(restart_cont) == 0:
raise NotImplementedError("Please implement override of maximum number of restarts!")
restart_cont[0].params.max_restarts = self.params.max_restarts
return None
[docs]
def check_parameters(self, controller, params, description, **kwargs):
"""
Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
bool: Whether the parameters are compatible
str: The error message
"""
if controller.params.mssdc_jac:
return (
False,
"Adaptivity needs the same order on all steps, please activate Gauss-Seidel multistep mode!",
)
return True, ""
[docs]
def get_new_step_size(self, controller, S, **kwargs):
"""
Determine a step size for the next step.
If we exceed the absolute tolerance of the residual in either direction, we either double or halve the step
size.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
# check if we performed the desired amount of sweeps
if S.status.iter == S.params.maxiter:
L = S.levels[0]
res = self.get_local_error_estimate(controller, S)
dt_planned = L.status.dt_new if L.status.dt_new is not None else L.params.dt
if (
res > self.params.e_tol or (res > L.params.restol and self.params.use_restol)
) and 'decrease' in self.params.allowed_modifications:
L.status.dt_new = min([dt_planned, L.params.dt / 2.0])
self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
elif res < self.params.e_tol_low and 'increase' in self.params.allowed_modifications:
L.status.dt_new = max([dt_planned, L.params.dt * 2.0])
self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
return None
[docs]
def get_local_error_estimate(self, controller, S, **kwargs):
"""
Get the residual of the finest level of the step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
float: Embedded error estimate
"""
return S.levels[0].status.residual
[docs]
class AdaptivityCollocation(AdaptivityForConvergedCollocationProblems):
"""
Control the step size via a collocation based estimate of the local error.
The error estimate works by subtracting two solutions to collocation problems with different order. You can
interpolate between collocation methods as much as you want but the adaptive step size selection will always be
based on the last switch of quadrature.
"""
[docs]
def setup(self, controller, params, description, **kwargs):
"""
Add a default value for control order to the parameters.
Args:
controller (pySDC.Controller): The controller
params (dict): Parameters for the convergence controller
description (dict): The description object used to instantiate the controller
Returns:
dict: Updated parameters
"""
defaults = {
"adaptive_coll_params": {},
"num_colls": 0,
**super().setup(controller, params, description, **kwargs),
"control_order": 220,
}
for key in defaults['adaptive_coll_params'].keys():
if type(defaults['adaptive_coll_params'][key]) == list:
defaults['num_colls'] = max([defaults['num_colls'], len(defaults['adaptive_coll_params'][key])])
if defaults['restart_at_maxiter']:
defaults['maxiter'] = description['step_params'].get('maxiter', 99) * defaults['num_colls']
return defaults
[docs]
def setup_status_variables(self, controller, **kwargs):
self.status = Status(['error', 'order'])
self.status.error = []
self.status.order = []
[docs]
def reset_status_variables(self, controller, **kwargs):
self.setup_status_variables(controller, **kwargs)
[docs]
def dependencies(self, controller, description, **kwargs):
"""
Load the `EstimateEmbeddedErrorCollocation` convergence controller to estimate the local error by switching
between collocation problems between iterations.
Args:
controller (pySDC.Controller): The controller
description (dict): The description object used to instantiate the controller
"""
from pySDC.implementations.convergence_controller_classes.estimate_embedded_error import (
EstimateEmbeddedErrorCollocation,
)
super().dependencies(controller, description, **kwargs)
params = {'adaptive_coll_params': self.params.adaptive_coll_params}
controller.add_convergence_controller(
EstimateEmbeddedErrorCollocation,
params=params,
description=description,
)
[docs]
def get_convergence(self, controller, S, **kwargs):
return len(self.status.order) == self.params.num_colls
[docs]
def get_local_error_estimate(self, controller, S, **kwargs):
"""
Get the collocation based embedded error estimate.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
float: Embedded error estimate
"""
if len(self.status.error) > 1:
return self.status.error[-1][1]
else:
return 0.0
[docs]
def post_iteration_processing(self, controller, step, **kwargs):
"""
Get the error estimate and its order if available.
Args:
controller (pySDC.Controller.controller): The controller
step (pySDC.Step.step): The current step
"""
if step.status.done:
lvl = step.levels[0]
self.status.error += [lvl.status.error_embedded_estimate_collocation]
self.status.order += [lvl.sweep.coll.order]
[docs]
def get_new_step_size(self, controller, S, **kwargs):
if len(self.status.order) == self.params.num_colls:
lvl = S.levels[0]
# compute next step size
order = (
min(self.status.order[-2::]) + 1
) # local order of less accurate of the last two collocation problems
e_est = self.get_local_error_estimate(controller, S)
lvl.status.dt_new = self.compute_optimal_step_size(
self.params.beta, lvl.params.dt, self.params.e_tol, e_est, order
)
self.log(f'Adjusting step size from {lvl.params.dt:.2e} to {lvl.status.dt_new:.2e}', S)
[docs]
def determine_restart(self, controller, S, **kwargs):
"""
Check if the step wants to be restarted by comparing the estimate of the local error to a preset tolerance
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
if len(self.status.order) == self.params.num_colls:
e_est = self.get_local_error_estimate(controller, S)
if e_est >= self.params.e_tol:
S.status.restart = True
self.log(f"Restarting: e={e_est:.2e} >= e_tol={self.params.e_tol:.2e}", S)
[docs]
def check_parameters(self, controller, params, description, **kwargs):
"""
Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
For adaptivity, we need to know the order of the scheme.
Args:
controller (pySDC.Controller): The controller
params (dict): The params passed for this specific convergence controller
description (dict): The description object used to instantiate the controller
Returns:
bool: Whether the parameters are compatible
str: The error message
"""
if "e_tol" not in params.keys():
return (
False,
"Adaptivity needs a local tolerance! Please pass `e_tol` to the parameters for this convergence controller!",
)
return True, ""
[docs]
class AdaptivityPolynomialError(AdaptivityForConvergedCollocationProblems):
"""
Class to compute time step size adaptively based on error estimate obtained from interpolation within the quadrature
nodes.
This error estimate depends on solving the collocation problem exactly, so make sure you set a sufficient stopping criterion.
"""
[docs]
def setup(self, controller, params, description, **kwargs):
from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
defaults = {
'control_order': -50,
**super().setup(controller, params, description, **kwargs),
**params,
}
self.check_convergence = CheckConvergence.check_convergence
return defaults
[docs]
def get_convergence(self, controller, S, **kwargs):
return self.check_convergence(S)
[docs]
def dependencies(self, controller, description, **kwargs):
"""
Load the error estimator.
Args:
controller (pySDC.Controller): The controller
description (dict): The description object used to instantiate the controller
Returns:
None
"""
from pySDC.implementations.convergence_controller_classes.estimate_polynomial_error import (
EstimatePolynomialError,
)
super().dependencies(controller, description, **kwargs)
controller.add_convergence_controller(
EstimatePolynomialError,
description=description,
params={},
)
return None
[docs]
def get_new_step_size(self, controller, S, **kwargs):
"""
Determine a step size for the next step from the error estimate.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
None
"""
if self.get_convergence(controller, S, **kwargs):
L = S.levels[0]
# compute next step size
order = L.status.order_embedded_estimate
e_est = self.get_local_error_estimate(controller, S)
L.status.dt_new = self.compute_optimal_step_size(
self.params.beta, L.params.dt, self.params.e_tol, e_est, order
)
self.log(
f'Error target: {self.params.e_tol:.2e}, error estimate: {e_est:.2e}, update_order: {order}',
S,
level=10,
)
self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
return None
[docs]
def get_local_error_estimate(self, controller, S, **kwargs):
"""
Get the embedded error estimate of the finest level of the step.
Args:
controller (pySDC.Controller): The controller
S (pySDC.Step): The current step
Returns:
float: Embedded error estimate
"""
return S.levels[0].status.error_embedded_estimate