Coverage for pySDC/implementations/convergence_controller_classes/adaptivity.py: 92%
262 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-01 13:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-01 13:12 +0000
1import numpy as np
2from pySDC.core.convergence_controller import ConvergenceController, Status
3from pySDC.implementations.convergence_controller_classes.step_size_limiter import (
4 StepSizeLimiter,
5)
8class AdaptivityBase(ConvergenceController):
9 """
10 Abstract base class for convergence controllers that implement adaptivity based on arbitrary local error estimates
11 and update rules.
12 """
14 def setup(self, controller, params, description, **kwargs):
15 """
16 Define default parameters here.
18 Default parameters are:
19 - control_order (int): The order relative to other convergence controllers
20 - beta (float): The safety factor
22 Args:
23 controller (pySDC.Controller): The controller
24 params (dict): The params passed for this specific convergence controller
25 description (dict): The description object used to instantiate the controller
27 Returns:
28 (dict): The updated params dictionary
29 """
30 defaults = {
31 "control_order": -50,
32 "beta": 0.9,
33 }
34 from pySDC.implementations.hooks.log_step_size import LogStepSize
36 controller.add_hook(LogStepSize)
38 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
40 self.communicate_convergence = CheckConvergence.communicate_convergence
42 return {**defaults, **super().setup(controller, params, description, **kwargs)}
44 def dependencies(self, controller, description, **kwargs):
45 """
46 Load step size limiters here, if they are desired.
48 Args:
49 controller (pySDC.Controller): The controller
50 description (dict): The description object used to instantiate the controller
52 Returns:
53 None
54 """
55 step_limiter_keys = ['dt_min', 'dt_max', 'dt_slope_min', 'dt_slope_max', 'dt_rel_min_slope']
56 available_keys = [me for me in step_limiter_keys if me in self.params.__dict__.keys()]
58 if len(available_keys) > 0:
59 step_limiter_params = {key: self.params.__dict__[key] for key in available_keys}
60 controller.add_convergence_controller(StepSizeLimiter, params=step_limiter_params, description=description)
62 if self.params.useMPI:
63 self.prepare_MPI_logical_operations()
65 return None
67 def get_new_step_size(self, controller, S, **kwargs):
68 """
69 Determine a step size for the next step from an estimate of the local error of the current step.
71 Args:
72 controller (pySDC.Controller): The controller
73 S (pySDC.Step): The current step
75 Returns:
76 None
77 """
78 raise NotImplementedError("Please implement a rule for updating the step size!")
80 def compute_optimal_step_size(self, beta, dt, e_tol, e_est, order):
81 """
82 Compute the optimal step size for the current step based on the order of the scheme.
83 This function can be called from `get_new_step_size` for various implementations of adaptivity, but notably not
84 all! We require to know the order of the error estimate and if we do adaptivity based on the residual, for
85 instance, we do not know that and we can't use this function.
87 Args:
88 beta (float): Safety factor
89 dt (float): Current step size
90 e_tol (float): The desired tolerance
91 e_est (float): The estimated local error
92 order (int): The order of the local error estimate
94 Returns:
95 float: The optimal step size
96 """
97 return beta * dt * (e_tol / e_est) ** (1.0 / order)
99 def get_local_error_estimate(self, controller, S, **kwargs):
100 """
101 Get the local error estimate for updating the step size.
102 It does not have to be an error estimate, but could be the residual or something else.
104 Args:
105 controller (pySDC.Controller): The controller
106 S (pySDC.Step): The current step
108 Returns:
109 float: The error estimate
110 """
111 raise NotImplementedError("Please implement a way to get the local error")
113 def determine_restart(self, controller, S, **kwargs):
114 """
115 Check if the step wants to be restarted by comparing the estimate of the local error to a preset tolerance
117 Args:
118 controller (pySDC.Controller): The controller
119 S (pySDC.Step): The current step
121 Returns:
122 None
123 """
124 if S.status.iter >= S.params.maxiter:
125 e_est = self.get_local_error_estimate(controller, S)
126 if e_est >= self.params.e_tol:
127 # see if we try to avoid restarts
128 if self.params.get('avoid_restarts'):
129 more_iter_needed = max([L.status.iter_to_convergence for L in S.levels])
130 k_final = S.status.iter + more_iter_needed
131 rho = max([L.status.contraction_factor for L in S.levels])
132 coll_order = S.levels[0].sweep.coll.order
134 if rho > 1:
135 S.status.restart = True
136 self.log(f"Convergence factor = {rho:.2e} > 1 -> restarting", S)
137 elif k_final > 2 * S.params.maxiter:
138 S.status.restart = True
139 self.log(
140 f"{more_iter_needed} more iterations needed for convergence -> restart is more efficient", S
141 )
142 elif k_final > coll_order:
143 S.status.restart = True
144 self.log(
145 f"{more_iter_needed} more iterations needed for convergence -> restart because collocation problem would be over resolved",
146 S,
147 )
148 else:
149 S.status.force_continue = True
150 self.log(f"{more_iter_needed} more iterations needed for convergence -> no restart", S)
151 else:
152 S.status.restart = True
153 self.log(f"Restarting: e={e_est:.2e} >= e_tol={self.params.e_tol:.2e}", S)
155 return None
158class AdaptivityForConvergedCollocationProblems(AdaptivityBase):
159 def dependencies(self, controller, description, **kwargs):
160 """
161 Load interpolation between restarts.
163 Args:
164 controller (pySDC.Controller): The controller
165 description (dict): The description object used to instantiate the controller
167 Returns:
168 None
169 """
170 super().dependencies(controller, description, **kwargs)
172 if self.params.interpolate_between_restarts:
173 from pySDC.implementations.convergence_controller_classes.interpolate_between_restarts import (
174 InterpolateBetweenRestarts,
175 )
177 controller.add_convergence_controller(
178 InterpolateBetweenRestarts,
179 description=description,
180 params={},
181 )
182 if self.params.interpolate_between_restarts:
183 self.interpolator = controller.convergence_controllers[-1]
184 return None
186 def get_convergence(self, controller, S, **kwargs):
187 raise NotImplementedError("Please implement a way to check if the collocation problem is converged!")
189 def setup(self, controller, params, description, **kwargs):
190 """
191 Add a default value for control order to the parameters.
193 Args:
194 controller (pySDC.Controller): The controller
195 params (dict): Parameters for the convergence controller
196 description (dict): The description object used to instantiate the controller
198 Returns:
199 dict: Updated parameters
200 """
201 defaults = {
202 'restol_rel': None,
203 'e_tol_rel': None,
204 'restart_at_maxiter': True,
205 'restol_min': 1e-12,
206 'restol_max': 1e-5,
207 'factor_if_not_converged': 4.0,
208 'residual_max_tol': 1e9,
209 'maxiter': description['sweeper_params'].get('maxiter', 99),
210 'interpolate_between_restarts': True,
211 'abort_at_growing_residual': True,
212 **super().setup(controller, params, description, **kwargs),
213 }
214 if defaults['restol_rel']:
215 description['level_params']['restol'] = min(
216 [max([defaults['restol_rel'] * defaults['e_tol'], defaults['restol_min']]), defaults['restol_max']]
217 )
218 elif defaults['e_tol_rel']:
219 description['level_params']['e_tol'] = min([max([defaults['e_tol_rel'] * defaults['e_tol'], 1e-10]), 1e-5])
221 if defaults['restart_at_maxiter']:
222 defaults['maxiter'] = description['step_params'].get('maxiter', 99)
224 self.res_last_iter = np.inf
226 return defaults
228 def determine_restart(self, controller, S, **kwargs):
229 if self.get_convergence(controller, S, **kwargs):
230 self.res_last_iter = np.inf
232 L = S.levels[0]
233 e_tol_converged = (
234 L.status.increment < L.params.e_tol if (L.params.get('e_tol') and L.status.get('increment')) else False
235 )
237 if (
238 self.params.restart_at_maxiter
239 and S.levels[0].status.residual > S.levels[0].params.restol
240 and not e_tol_converged
241 ):
242 self.trigger_restart_upon_nonconvergence(S)
243 elif self.get_local_error_estimate(controller, S, **kwargs) > self.params.e_tol:
244 S.status.restart = True
245 elif (
246 S.status.time_size == 1
247 and self.res_last_iter < S.levels[0].status.residual
248 and S.status.iter > 0
249 and self.params.abort_at_growing_residual
250 ):
251 self.trigger_restart_upon_nonconvergence(S)
252 elif S.levels[0].status.residual > self.params.residual_max_tol:
253 self.trigger_restart_upon_nonconvergence(S)
255 if self.params.useMPI:
256 self.communicate_convergence(self, controller, S, **kwargs)
258 self.res_last_iter = S.levels[0].status.residual * 1.0
260 def trigger_restart_upon_nonconvergence(self, S):
261 S.status.restart = True
262 S.status.force_done = True
263 for L in S.levels:
264 L.status.dt_new = L.params.dt / self.params.factor_if_not_converged
265 self.log(
266 f'Collocation problem not converged. Reducing step size to {L.status.dt_new:.2e}',
267 S,
268 )
269 if self.params.interpolate_between_restarts:
270 self.interpolator.status.skip_interpolation = True
273class Adaptivity(AdaptivityBase):
274 """
275 Class to compute time step size adaptively based on embedded error estimate.
277 We have a version working in non-MPI pipelined SDC, but Adaptivity requires you to know the order of the scheme,
278 which you can also know for block-Jacobi, but it works differently and it is only implemented for block
279 Gauss-Seidel so far.
281 There is an option to reduce restarts if continued iterations could yield convergence in fewer iterations than
282 restarting based on an estimate of the contraction factor.
283 Since often only one or two more iterations suffice, this can boost efficiency of adaptivity significantly.
284 Notice that the computed step size is not effected.
285 Be aware that this does not work when Hot Rod is enabled, since that requires us to know the order of the scheme in
286 more detail. Since we reset to the second to last sweep before moving on, we cannot continue to iterate.
287 Set the reduced restart up by setting a boolean value for "avoid_restarts" in the parameters for the convergence
288 controller.
289 The behaviour in multi-step SDC is not well studied and it is unclear if anything useful happens there.
290 """
292 def setup(self, controller, params, description, **kwargs):
293 """
294 Define default parameters here.
296 Default parameters are:
297 - control_order (int): The order relative to other convergence controllers
298 - beta (float): The safety factor
300 Args:
301 controller (pySDC.Controller): The controller
302 params (dict): The params passed for this specific convergence controller
303 description (dict): The description object used to instantiate the controller
305 Returns:
306 (dict): The updated params dictionary
307 """
308 defaults = {
309 "embedded_error_flavor": 'standard',
310 "rel_error": False,
311 }
312 return {**defaults, **super().setup(controller, params, description, **kwargs)}
314 def dependencies(self, controller, description, **kwargs):
315 """
316 Load the embedded error estimator.
318 Args:
319 controller (pySDC.Controller): The controller
320 description (dict): The description object used to instantiate the controller
322 Returns:
323 None
324 """
325 from pySDC.implementations.convergence_controller_classes.estimate_embedded_error import EstimateEmbeddedError
327 super().dependencies(controller, description, **kwargs)
329 controller.add_convergence_controller(
330 EstimateEmbeddedError.get_implementation(self.params.embedded_error_flavor, self.params.useMPI),
331 description=description,
332 params={
333 'rel_error': self.params.rel_error,
334 },
335 )
337 # load contraction factor estimator if necessary
338 if self.params.get('avoid_restarts'):
339 from pySDC.implementations.convergence_controller_classes.estimate_contraction_factor import (
340 EstimateContractionFactor,
341 )
343 params = {'e_tol': self.params.e_tol}
344 controller.add_convergence_controller(EstimateContractionFactor, description=description, params=params)
345 return None
347 def check_parameters(self, controller, params, description, **kwargs):
348 """
349 Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
350 For adaptivity, we need to know the order of the scheme.
352 Args:
353 controller (pySDC.Controller): The controller
354 params (dict): The params passed for this specific convergence controller
355 description (dict): The description object used to instantiate the controller
357 Returns:
358 bool: Whether the parameters are compatible
359 str: The error message
360 """
361 if description["level_params"].get("restol", -1.0) >= 0:
362 return (
363 False,
364 "Adaptivity needs constant order in time and hence restol in the step parameters has to be \
365smaller than 0!",
366 )
368 if controller.params.mssdc_jac:
369 return (
370 False,
371 "Adaptivity needs the same order on all steps, please activate Gauss-Seidel multistep mode!",
372 )
374 if "e_tol" not in params.keys():
375 return (
376 False,
377 "Adaptivity needs a local tolerance! Please pass `e_tol` to the parameters for this convergence controller!",
378 )
380 return True, ""
382 def get_new_step_size(self, controller, S, **kwargs):
383 """
384 Determine a step size for the next step from an embedded estimate of the local error of the current step.
386 Args:
387 controller (pySDC.Controller): The controller
388 S (pySDC.Step): The current step
390 Returns:
391 None
392 """
393 # check if we performed the desired amount of sweeps
394 if S.status.iter == S.params.maxiter:
395 L = S.levels[0]
397 # compute next step size
398 order = S.status.iter # embedded error estimate is same order as time marching
400 e_est = self.get_local_error_estimate(controller, S)
401 L.status.dt_new = self.compute_optimal_step_size(
402 self.params.beta, L.params.dt, self.params.e_tol, e_est, order
403 )
404 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
406 return None
408 def get_local_error_estimate(self, controller, S, **kwargs):
409 """
410 Get the embedded error estimate of the finest level of the step.
412 Args:
413 controller (pySDC.Controller): The controller
414 S (pySDC.Step): The current step
416 Returns:
417 float: Embedded error estimate
418 """
419 return S.levels[0].status.error_embedded_estimate
422class AdaptivityRK(Adaptivity):
423 """
424 Adaptivity for Runge-Kutta methods. Basically, we need to change the order in the step size update
425 """
427 def setup(self, controller, params, description, **kwargs):
428 defaults = {}
429 defaults['update_order'] = params.get('update_order', description['sweeper_class'].get_update_order())
430 return {**defaults, **super().setup(controller, params, description, **kwargs)}
432 def get_new_step_size(self, controller, S, **kwargs):
433 """
434 Determine a step size for the next step from an embedded estimate of the local error of the current step.
436 Args:
437 controller (pySDC.Controller): The controller
438 S (pySDC.Step): The current step
440 Returns:
441 None
442 """
443 # check if we performed the desired amount of sweeps
444 if S.status.iter == S.params.maxiter:
445 L = S.levels[0]
447 # compute next step size
448 order = self.params.update_order
449 e_est = self.get_local_error_estimate(controller, S)
450 L.status.dt_new = self.compute_optimal_step_size(
451 self.params.beta, L.params.dt, self.params.e_tol, e_est, order
452 )
453 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
455 return None
458class AdaptivityResidual(AdaptivityBase):
459 """
460 Do adaptivity based on residual.
462 Since we don't know a correlation between the residual and the error (for nonlinear problems), we employ a simpler
463 rule to update the step size. Instead of giving a local tolerance that we try to hit as closely as possible, we set
464 two thresholds for the residual. When we exceed the upper one, we reduce the step size by a factor of 2 and if the
465 residual falls below the lower threshold, we double the step size.
466 Please setup these parameters as "e_tol" and "e_tol_low".
467 """
469 def setup(self, controller, params, description, **kwargs):
470 """
471 Define default parameters here.
473 Default parameters are:
474 - control_order (int): The order relative to other convergence controllers
475 - e_tol_low (float): Lower absolute threshold for the residual
476 - e_tol (float): Upper absolute threshold for the residual
477 - use_restol (bool): Restart if the residual tolerance was not reached
478 - max_restarts: Override maximum number of restarts
480 Args:
481 controller (pySDC.Controller): The controller
482 params (dict): The params passed for this specific convergence controller
483 description (dict): The description object used to instantiate the controller
485 Returns:
486 (dict): The updated params dictionary
487 """
488 defaults = {
489 "control_order": -45,
490 "e_tol_low": 0,
491 "e_tol": np.inf,
492 "use_restol": False,
493 "max_restarts": 99 if "e_tol_low" in params else None,
494 "allowed_modifications": ['increase', 'decrease'], # what we are allowed to do with the step size
495 }
496 return {**defaults, **params}
498 def setup_status_variables(self, controller, **kwargs):
499 """
500 Change maximum number of allowed restarts here.
502 Args:
503 controller (pySDC.Controller): The controller
505 Reutrns:
506 None
507 """
508 from pySDC.implementations.convergence_controller_classes.basic_restarting import BasicRestarting
510 if self.params.max_restarts is not None:
511 conv_controllers = controller.convergence_controllers
512 restart_cont = [me for me in conv_controllers if BasicRestarting in type(me).__bases__]
514 if len(restart_cont) == 0:
515 raise NotImplementedError("Please implement override of maximum number of restarts!")
517 restart_cont[0].params.max_restarts = self.params.max_restarts
518 return None
520 def check_parameters(self, controller, params, description, **kwargs):
521 """
522 Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
524 Args:
525 controller (pySDC.Controller): The controller
526 params (dict): The params passed for this specific convergence controller
527 description (dict): The description object used to instantiate the controller
529 Returns:
530 bool: Whether the parameters are compatible
531 str: The error message
532 """
533 if controller.params.mssdc_jac:
534 return (
535 False,
536 "Adaptivity needs the same order on all steps, please activate Gauss-Seidel multistep mode!",
537 )
539 return True, ""
541 def get_new_step_size(self, controller, S, **kwargs):
542 """
543 Determine a step size for the next step.
544 If we exceed the absolute tolerance of the residual in either direction, we either double or halve the step
545 size.
547 Args:
548 controller (pySDC.Controller): The controller
549 S (pySDC.Step): The current step
551 Returns:
552 None
553 """
554 # check if we performed the desired amount of sweeps
555 if S.status.iter == S.params.maxiter:
556 L = S.levels[0]
558 res = self.get_local_error_estimate(controller, S)
560 dt_planned = L.status.dt_new if L.status.dt_new is not None else L.params.dt
562 if (
563 res > self.params.e_tol or (res > L.params.restol and self.params.use_restol)
564 ) and 'decrease' in self.params.allowed_modifications:
565 L.status.dt_new = min([dt_planned, L.params.dt / 2.0])
566 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
567 elif res < self.params.e_tol_low and 'increase' in self.params.allowed_modifications:
568 L.status.dt_new = max([dt_planned, L.params.dt * 2.0])
569 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
571 return None
573 def get_local_error_estimate(self, controller, S, **kwargs):
574 """
575 Get the residual of the finest level of the step.
577 Args:
578 controller (pySDC.Controller): The controller
579 S (pySDC.Step): The current step
581 Returns:
582 float: Embedded error estimate
583 """
584 return S.levels[0].status.residual
587class AdaptivityCollocation(AdaptivityForConvergedCollocationProblems):
588 """
589 Control the step size via a collocation based estimate of the local error.
590 The error estimate works by subtracting two solutions to collocation problems with different order. You can
591 interpolate between collocation methods as much as you want but the adaptive step size selection will always be
592 based on the last switch of quadrature.
593 """
595 def setup(self, controller, params, description, **kwargs):
596 """
597 Add a default value for control order to the parameters.
599 Args:
600 controller (pySDC.Controller): The controller
601 params (dict): Parameters for the convergence controller
602 description (dict): The description object used to instantiate the controller
604 Returns:
605 dict: Updated parameters
606 """
607 defaults = {
608 "adaptive_coll_params": {},
609 "num_colls": 0,
610 **super().setup(controller, params, description, **kwargs),
611 "control_order": 220,
612 }
614 for key in defaults['adaptive_coll_params'].keys():
615 if type(defaults['adaptive_coll_params'][key]) == list:
616 defaults['num_colls'] = max([defaults['num_colls'], len(defaults['adaptive_coll_params'][key])])
618 if defaults['restart_at_maxiter']:
619 defaults['maxiter'] = description['step_params'].get('maxiter', 99) * defaults['num_colls']
621 return defaults
623 def setup_status_variables(self, controller, **kwargs):
624 self.status = Status(['error', 'order'])
625 self.status.error = []
626 self.status.order = []
628 def reset_status_variables(self, controller, **kwargs):
629 self.setup_status_variables(controller, **kwargs)
631 def dependencies(self, controller, description, **kwargs):
632 """
633 Load the `EstimateEmbeddedErrorCollocation` convergence controller to estimate the local error by switching
634 between collocation problems between iterations.
636 Args:
637 controller (pySDC.Controller): The controller
638 description (dict): The description object used to instantiate the controller
639 """
640 from pySDC.implementations.convergence_controller_classes.estimate_embedded_error import (
641 EstimateEmbeddedErrorCollocation,
642 )
644 super().dependencies(controller, description, **kwargs)
646 params = {'adaptive_coll_params': self.params.adaptive_coll_params}
647 controller.add_convergence_controller(
648 EstimateEmbeddedErrorCollocation,
649 params=params,
650 description=description,
651 )
653 def get_convergence(self, controller, S, **kwargs):
654 return len(self.status.order) == self.params.num_colls
656 def get_local_error_estimate(self, controller, S, **kwargs):
657 """
658 Get the collocation based embedded error estimate.
660 Args:
661 controller (pySDC.Controller): The controller
662 S (pySDC.Step): The current step
664 Returns:
665 float: Embedded error estimate
666 """
667 if len(self.status.error) > 1:
668 return self.status.error[-1][1]
669 else:
670 return 0.0
672 def post_iteration_processing(self, controller, step, **kwargs):
673 """
674 Get the error estimate and its order if available.
676 Args:
677 controller (pySDC.Controller.controller): The controller
678 step (pySDC.Step.step): The current step
679 """
680 if step.status.done:
681 lvl = step.levels[0]
682 self.status.error += [lvl.status.error_embedded_estimate_collocation]
683 self.status.order += [lvl.sweep.coll.order]
685 def get_new_step_size(self, controller, S, **kwargs):
686 if len(self.status.order) == self.params.num_colls:
687 lvl = S.levels[0]
689 # compute next step size
690 order = (
691 min(self.status.order[-2::]) + 1
692 ) # local order of less accurate of the last two collocation problems
693 e_est = self.get_local_error_estimate(controller, S)
695 lvl.status.dt_new = self.compute_optimal_step_size(
696 self.params.beta, lvl.params.dt, self.params.e_tol, e_est, order
697 )
698 self.log(f'Adjusting step size from {lvl.params.dt:.2e} to {lvl.status.dt_new:.2e}', S)
700 def determine_restart(self, controller, S, **kwargs):
701 """
702 Check if the step wants to be restarted by comparing the estimate of the local error to a preset tolerance
704 Args:
705 controller (pySDC.Controller): The controller
706 S (pySDC.Step): The current step
708 Returns:
709 None
710 """
711 if len(self.status.order) == self.params.num_colls:
712 e_est = self.get_local_error_estimate(controller, S)
713 if e_est >= self.params.e_tol:
714 S.status.restart = True
715 self.log(f"Restarting: e={e_est:.2e} >= e_tol={self.params.e_tol:.2e}", S)
717 def check_parameters(self, controller, params, description, **kwargs):
718 """
719 Check whether parameters are compatible with whatever assumptions went into the step size functions etc.
720 For adaptivity, we need to know the order of the scheme.
722 Args:
723 controller (pySDC.Controller): The controller
724 params (dict): The params passed for this specific convergence controller
725 description (dict): The description object used to instantiate the controller
727 Returns:
728 bool: Whether the parameters are compatible
729 str: The error message
730 """
731 if "e_tol" not in params.keys():
732 return (
733 False,
734 "Adaptivity needs a local tolerance! Please pass `e_tol` to the parameters for this convergence controller!",
735 )
737 return True, ""
740class AdaptivityExtrapolationWithinQ(AdaptivityForConvergedCollocationProblems):
741 """
742 Class to compute time step size adaptively based on error estimate obtained from extrapolation within the quadrature
743 nodes.
745 This error estimate depends on solving the collocation problem exactly, so make sure you set a sufficient stopping criterion.
746 """
748 def setup(self, controller, params, description, **kwargs):
749 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
751 defaults = {
752 'high_Taylor_order': False,
753 **params,
754 }
756 self.check_convergence = CheckConvergence.check_convergence
757 return {**defaults, **super().setup(controller, params, description, **kwargs)}
759 def get_convergence(self, controller, S, **kwargs):
760 return self.check_convergence(S)
762 def dependencies(self, controller, description, **kwargs):
763 """
764 Load the error estimator.
766 Args:
767 controller (pySDC.Controller): The controller
768 description (dict): The description object used to instantiate the controller
770 Returns:
771 None
772 """
773 from pySDC.implementations.convergence_controller_classes.estimate_extrapolation_error import (
774 EstimateExtrapolationErrorWithinQ,
775 )
777 super().dependencies(controller, description, **kwargs)
779 controller.add_convergence_controller(
780 EstimateExtrapolationErrorWithinQ,
781 description=description,
782 params={'high_Taylor_order': self.params.high_Taylor_order},
783 )
784 return None
786 def get_new_step_size(self, controller, S, **kwargs):
787 """
788 Determine a step size for the next step from the error estimate.
790 Args:
791 controller (pySDC.Controller): The controller
792 S (pySDC.Step): The current step
794 Returns:
795 None
796 """
797 if self.get_convergence(controller, S, **kwargs):
798 L = S.levels[0]
800 # compute next step size
801 order = L.sweep.coll.num_nodes + 1 if self.params.high_Taylor_order else L.sweep.coll.num_nodes
803 e_est = self.get_local_error_estimate(controller, S)
804 L.status.dt_new = self.compute_optimal_step_size(
805 self.params.beta, L.params.dt, self.params.e_tol, e_est, order
806 )
808 self.log(
809 f'Error target: {self.params.e_tol:.2e}, error estimate: {e_est:.2e}, update_order: {order}',
810 S,
811 level=10,
812 )
813 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
815 return None
817 def get_local_error_estimate(self, controller, S, **kwargs):
818 """
819 Get the embedded error estimate of the finest level of the step.
821 Args:
822 controller (pySDC.Controller): The controller
823 S (pySDC.Step): The current step
825 Returns:
826 float: Embedded error estimate
827 """
828 return S.levels[0].status.error_extrapolation_estimate
831class AdaptivityPolynomialError(AdaptivityForConvergedCollocationProblems):
832 """
833 Class to compute time step size adaptively based on error estimate obtained from interpolation within the quadrature
834 nodes.
836 This error estimate depends on solving the collocation problem exactly, so make sure you set a sufficient stopping criterion.
837 """
839 def setup(self, controller, params, description, **kwargs):
840 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
842 defaults = {
843 'control_order': -50,
844 'problem_mesh_type': 'numpyesque',
845 'rel_error': False,
846 **super().setup(controller, params, description, **kwargs),
847 **params,
848 }
850 self.check_convergence = CheckConvergence.check_convergence
851 return defaults
853 def get_convergence(self, controller, S, **kwargs):
854 return self.check_convergence(S)
856 def dependencies(self, controller, description, **kwargs):
857 """
858 Load the error estimator.
860 Args:
861 controller (pySDC.Controller): The controller
862 description (dict): The description object used to instantiate the controller
864 Returns:
865 None
866 """
867 if self.params.problem_mesh_type.lower() == 'numpyesque':
868 from pySDC.implementations.convergence_controller_classes.estimate_polynomial_error import (
869 EstimatePolynomialError as error_estimation_cls,
870 )
871 elif self.params.problem_mesh_type.lower() == 'firedrake':
872 from pySDC.implementations.convergence_controller_classes.estimate_polynomial_error import (
873 EstimatePolynomialErrorFiredrake as error_estimation_cls,
874 )
875 else:
876 raise NotImplementedError(
877 f'Don\'t know what error estimation class to use for problems with mesh type {self.params.problem_mesh_type}'
878 )
880 super().dependencies(controller, description, **kwargs)
882 controller.add_convergence_controller(
883 error_estimation_cls,
884 description=description,
885 params={
886 'rel_error': self.params.rel_error,
887 },
888 )
889 return None
891 def get_new_step_size(self, controller, S, **kwargs):
892 """
893 Determine a step size for the next step from the error estimate.
895 Args:
896 controller (pySDC.Controller): The controller
897 S (pySDC.Step): The current step
899 Returns:
900 None
901 """
902 if self.get_convergence(controller, S, **kwargs):
903 L = S.levels[0]
905 # compute next step size
906 order = L.status.order_embedded_estimate
908 e_est = self.get_local_error_estimate(controller, S)
909 L.status.dt_new = self.compute_optimal_step_size(
910 self.params.beta, L.params.dt, self.params.e_tol, e_est, order
911 )
913 self.log(
914 f'Error target: {self.params.e_tol:.2e}, error estimate: {e_est:.2e}, update_order: {order}',
915 S,
916 level=10,
917 )
918 self.log(f'Adjusting step size from {L.params.dt:.2e} to {L.status.dt_new:.2e}', S)
920 return None
922 def get_local_error_estimate(self, controller, S, **kwargs):
923 """
924 Get the embedded error estimate of the finest level of the step.
926 Args:
927 controller (pySDC.Controller): The controller
928 S (pySDC.Step): The current step
930 Returns:
931 float: Embedded error estimate
932 """
933 return S.levels[0].status.error_embedded_estimate