Coverage for pySDC/core/sweeper.py: 94%
126 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 15:08 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 15:08 +0000
1import logging
2import numpy as np
3from qmat.qdelta import QDeltaGenerator, QDELTA_GENERATORS
5from pySDC.core.errors import ParameterError
6from pySDC.core.collocation import CollBase
7from pySDC.helpers.pysdc_helper import FrozenClass
10# Organize QDeltaGenerator class in dict[type(QDeltaGenerator),set(str)] to retrieve aliases
11QDELTA_GENERATORS_ALIASES = {v: set() for v in set(QDELTA_GENERATORS.values())}
12for k, v in QDELTA_GENERATORS.items():
13 QDELTA_GENERATORS_ALIASES[v].add(k)
16# short helper class to add params as attributes
17class _Pars(FrozenClass):
18 def __init__(self, pars):
19 self.do_coll_update = False
20 self.initial_guess = 'spread' # default value (see also below)
21 self.skip_residual_computation = () # gain performance at the cost of correct residual output
23 for k, v in pars.items():
24 if k != 'collocation_class':
25 setattr(self, k, v)
27 self._freeze()
30class Sweeper(object):
31 """
32 Base abstract sweeper class, provides two base methods to generate QDelta matrices:
34 - get_Qdelta_implicit(qd_type):
35 Returns a (pySDC-type) QDelta matrix of **implicit type**,
36 *i.e* lower triangular with zeros on the first collumn.
37 - get_Qdelta_explicit(qd_type):
38 Returns a (pySDC-type) QDelta matrix of **explicit type**,
39 *i.e* strictly lower triangular with first node distance to zero on the first collumn.
42 All possible QDelta matrix coefficients are generated with
43 `qmat <https://qmat.readthedocs.io/en/latest/autoapi/qmat/qdelta/index.html>`_,
44 check it out to see all available coefficient types.
46 Attributes:
47 logger: custom logger for sweeper-related logging
48 params (__Pars): parameter object containing the custom parameters passed by the user
49 coll (pySDC.Collocation.CollBase): collocation object
50 """
52 def __init__(self, params, level):
53 """
54 Initialization routine for the base sweeper
56 Args:
57 params (dict): parameter object
58 level (pySDC.Level.level): the level that uses this sweeper
59 """
61 self.logger = logging.getLogger('sweeper')
63 essential_keys = ['num_nodes']
64 for key in essential_keys:
65 if key not in params:
66 msg = 'need %s to instantiate step, only got %s' % (key, str(params.keys()))
67 self.logger.error(msg)
68 raise ParameterError(msg)
70 if 'collocation_class' not in params:
71 params['collocation_class'] = CollBase
73 # prepare random generator for initial guess
74 if params.get('initial_guess', 'spread') == 'random': # default value (see also above)
75 params['random_seed'] = params.get('random_seed', 1984)
76 self.rng = np.random.RandomState(params['random_seed'])
78 self.params = _Pars(params)
80 self.coll: CollBase = params['collocation_class'](**params)
82 if not self.coll.right_is_node and not self.params.do_coll_update:
83 self.logger.warning(
84 'we need to do a collocation update here, since the right end point is not a node. Changing this!'
85 )
86 self.params.do_coll_update = True
88 self.__level = level
89 self.parallelizable = False
90 for name in ["genQI", "genQE"]:
91 if hasattr(self, name):
92 delattr(self, name)
94 def buildGenerator(self, qdType: str) -> QDeltaGenerator:
95 return QDELTA_GENERATORS[qdType](qGen=self.coll.generator, tLeft=self.coll.tleft)
97 def get_Qdelta_implicit(self, qd_type, k=None):
98 QDmat = np.zeros_like(self.coll.Qmat)
99 if not hasattr(self, "genQI") or qd_type not in QDELTA_GENERATORS_ALIASES[type(self.genQI)]:
100 self.genQI: QDeltaGenerator = self.buildGenerator(qd_type)
101 QDmat[1:, 1:] = self.genQI.genCoeffs(k=k)
103 err_msg = 'Lower triangular matrix expected!'
104 np.testing.assert_array_equal(np.triu(QDmat, k=1), np.zeros(QDmat.shape), err_msg=err_msg)
105 if np.allclose(np.diag(np.diag(QDmat)), QDmat):
106 self.parallelizable = True
107 return QDmat
109 def get_Qdelta_explicit(self, qd_type, k=None):
110 coll = self.coll
111 QDmat = np.zeros(coll.Qmat.shape, dtype=float)
112 if not hasattr(self, "genQE") or qd_type not in QDELTA_GENERATORS_ALIASES[type(self.genQE)]:
113 self.genQE: QDeltaGenerator = self.buildGenerator(qd_type)
114 QDmat[1:, 1:], QDmat[1:, 0] = self.genQE.genCoeffs(k=k, dTau=True)
116 err_msg = 'Strictly lower triangular matrix expected!'
117 np.testing.assert_array_equal(np.triu(QDmat, k=0), np.zeros(QDmat.shape), err_msg=err_msg)
118 if np.allclose(np.diag(np.diag(QDmat)), QDmat):
119 self.parallelizable = True # for PIC ;)
120 return QDmat
122 def predict(self):
123 """
124 Predictor to fill values at nodes before first sweep
126 Default prediction for the sweepers, only copies the values to all collocation nodes
127 and evaluates the RHS of the ODE there
128 """
130 # get current level and problem description
131 L = self.level
132 P = L.prob
134 # evaluate RHS at left point
135 L.f[0] = P.eval_f(L.u[0], L.time)
137 for m in range(1, self.coll.num_nodes + 1):
138 # copy u[0] to all collocation nodes, evaluate RHS
139 if self.params.initial_guess == 'spread':
140 L.u[m] = P.dtype_u(L.u[0])
141 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1])
142 # copy u[0] and RHS evaluation to all collocation nodes
143 elif self.params.initial_guess == 'copy':
144 L.u[m] = P.dtype_u(L.u[0])
145 L.f[m] = P.dtype_f(L.f[0])
146 # start with zero everywhere
147 elif self.params.initial_guess == 'zero':
148 L.u[m] = P.dtype_u(init=P.init, val=0.0)
149 L.f[m] = P.dtype_f(init=P.init, val=0.0)
150 # start with random initial guess
151 elif self.params.initial_guess == 'random':
152 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0])
153 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0])
154 else:
155 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented')
157 # indicate that this level is now ready for sweeps
158 L.status.unlocked = True
159 L.status.updated = True
161 def compute_residual(self, stage=''):
162 """
163 Computation of the residual using the collocation matrix Q
165 Args:
166 stage (str): The current stage of the step the level belongs to
167 """
169 # get current level and problem description
170 L = self.level
172 # Check if we want to skip the residual computation to gain performance
173 # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual!
174 if stage in self.params.skip_residual_computation:
175 L.status.residual = 0.0 if L.status.residual is None else L.status.residual
176 return None
178 # check if there are new values (e.g. from a sweep)
179 # assert L.status.updated
181 # compute the residual for each node
183 # build QF(u)
184 res_norm = []
185 L.residual = self.integrate()
186 for m in range(self.coll.num_nodes):
187 L.residual[m] += L.u[0] - L.u[m + 1]
188 # add tau if associated
189 if L.tau[m] is not None:
190 L.residual[m] += L.tau[m]
191 # use abs function from data type here
192 res_norm.append(abs(L.residual[m]))
194 # find maximal residual over the nodes
195 if L.params.residual_type == 'full_abs':
196 L.status.residual = max(res_norm)
197 elif L.params.residual_type == 'last_abs':
198 L.status.residual = res_norm[-1]
199 elif L.params.residual_type == 'full_rel':
200 L.status.residual = max(res_norm) / abs(L.u[0])
201 elif L.params.residual_type == 'last_rel':
202 L.status.residual = res_norm[-1] / abs(L.u[0])
203 else:
204 raise ParameterError(
205 f'residual_type = {L.params.residual_type} not implemented, choose '
206 f'full_abs, last_abs, full_rel or last_rel instead'
207 )
209 # indicate that the residual has seen the new values
210 L.status.updated = False
212 return None
214 def compute_end_point(self):
215 """
216 Abstract interface to end-node computation
217 """
218 raise NotImplementedError('ERROR: sweeper has to implement compute_end_point(self)')
220 def integrate(self):
221 """
222 Abstract interface to right-hand side integration
223 """
224 raise NotImplementedError('ERROR: sweeper has to implement integrate(self)')
226 def update_nodes(self):
227 """
228 Abstract interface to node update
229 """
230 raise NotImplementedError('ERROR: sweeper has to implement update_nodes(self)')
232 @property
233 def level(self):
234 """
235 Returns the current level
237 Returns:
238 pySDC.Level.level: the current level
239 """
240 return self.__level
242 @level.setter
243 def level(self, L):
244 """
245 Sets a reference to the current level (done in the initialization of the level)
247 Args:
248 L (pySDC.Level.level): current level
249 """
250 from pySDC.core.level import Level
252 assert isinstance(L, Level)
253 self.__level = L
255 @property
256 def rank(self):
257 return 0
259 def updateVariableCoeffs(self, k):
260 """
261 Potentially update QDelta implicit coefficients if variable ...
263 Parameters
264 ----------
265 k : int
266 Index of the sweep (0 for initial sweep, 1 for the first one, ...).
267 """
268 if hasattr(self, "genQI") and self.genQI.isKDependent():
269 qdType = type(self.genQI).__name__
270 self.QI = self.get_Qdelta_implicit(qdType, k=k)
271 if hasattr(self, "genQE") and self.genQE.isKDependent():
272 qdType = type(self.genQE).__name__
273 self.QE = self.get_Qdelta_explicit(qdType, k=k)