Coverage for pySDC/core/sweeper.py: 98%
120 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-20 17:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-20 17:10 +0000
1import logging
2import numpy as np
3from qmat import QDELTA_GENERATORS
5from pySDC.core.errors import ParameterError
6from pySDC.core.level import Level
7from pySDC.core.collocation import CollBase
8from pySDC.helpers.pysdc_helper import FrozenClass
11# short helper class to add params as attributes
12class _Pars(FrozenClass):
13 def __init__(self, pars):
14 self.do_coll_update = False
15 self.initial_guess = 'spread' # default value (see also below)
16 self.skip_residual_computation = () # gain performance at the cost of correct residual output
18 for k, v in pars.items():
19 if k != 'collocation_class':
20 setattr(self, k, v)
22 self._freeze()
25class Sweeper(object):
26 """
27 Base abstract sweeper class, provides two base methods to generate QDelta matrices:
29 - get_Qdelta_implicit(qd_type):
30 Returns a (pySDC-type) QDelta matrix of **implicit type**,
31 *i.e* lower triangular with zeros on the first collumn.
32 - get_Qdelta_explicit(qd_type):
33 Returns a (pySDC-type) QDelta matrix of **explicit type**,
34 *i.e* strictly lower triangular with first node distance to zero on the first collumn.
37 All possible QDelta matrix coefficients are generated with
38 `qmat <https://qmat.readthedocs.io/en/latest/autoapi/qmat/qdelta/index.html>`_,
39 check it out to see all available coefficient types.
41 Attributes:
42 logger: custom logger for sweeper-related logging
43 params (__Pars): parameter object containing the custom parameters passed by the user
44 coll (pySDC.Collocation.CollBase): collocation object
45 """
47 def __init__(self, params):
48 """
49 Initialization routine for the base sweeper
51 Args:
52 params (dict): parameter object
54 """
56 # set up logger
57 self.logger = logging.getLogger('sweeper')
59 essential_keys = ['num_nodes']
60 for key in essential_keys:
61 if key not in params:
62 msg = 'need %s to instantiate step, only got %s' % (key, str(params.keys()))
63 self.logger.error(msg)
64 raise ParameterError(msg)
66 if 'collocation_class' not in params:
67 params['collocation_class'] = CollBase
69 # prepare random generator for initial guess
70 if params.get('initial_guess', 'spread') == 'random': # default value (see also above)
71 params['random_seed'] = params.get('random_seed', 1984)
72 self.rng = np.random.RandomState(params['random_seed'])
74 self.params = _Pars(params)
76 self.coll: CollBase = params['collocation_class'](**params)
78 if not self.coll.right_is_node and not self.params.do_coll_update:
79 self.logger.warning(
80 'we need to do a collocation update here, since the right end point is not a node. Changing this!'
81 )
82 self.params.do_coll_update = True
84 # This will be set as soon as the sweeper is instantiated at the level
85 self.__level = None
87 self.parallelizable = False
89 def setupGenerator(self, qd_type):
90 coll = self.coll
91 try:
92 assert QDELTA_GENERATORS[qd_type] == type(self.generator)
93 assert self.generator.QDelta.shape[0] == coll.Qmat.shape[0] - 1
94 except (AssertionError, AttributeError):
95 self.generator = QDELTA_GENERATORS[qd_type](
96 # for algebraic types (LU, ...)
97 Q=coll.generator.Q,
98 # for MIN in tables, MIN-SR-S ...
99 nNodes=coll.num_nodes,
100 nodeType=coll.node_type,
101 quadType=coll.quad_type,
102 # for time-stepping types, MIN-SR-NS
103 nodes=coll.nodes,
104 tLeft=coll.tleft,
105 )
106 except Exception as e:
107 raise ValueError(f"could not generate {qd_type=!r} with qmat, got error : {e}") from e
109 def get_Qdelta_implicit(self, qd_type, k=None):
110 QDmat = np.zeros_like(self.coll.Qmat)
111 self.setupGenerator(qd_type)
112 QDmat[1:, 1:] = self.generator.genCoeffs(k=k)
114 err_msg = 'Lower triangular matrix expected!'
115 np.testing.assert_array_equal(np.triu(QDmat, k=1), np.zeros(QDmat.shape), err_msg=err_msg)
116 if np.allclose(np.diag(np.diag(QDmat)), QDmat):
117 self.parallelizable = True
118 return QDmat
120 def get_Qdelta_explicit(self, qd_type, k=None):
121 coll = self.coll
122 QDmat = np.zeros(coll.Qmat.shape, dtype=float)
123 self.setupGenerator(qd_type)
124 QDmat[1:, 1:], QDmat[1:, 0] = self.generator.genCoeffs(k=k, dTau=True)
126 err_msg = 'Strictly lower triangular matrix expected!'
127 np.testing.assert_array_equal(np.triu(QDmat, k=0), np.zeros(QDmat.shape), err_msg=err_msg)
128 if np.allclose(np.diag(np.diag(QDmat)), QDmat):
129 self.parallelizable = True # for PIC ;)
130 return QDmat
132 def predict(self):
133 """
134 Predictor to fill values at nodes before first sweep
136 Default prediction for the sweepers, only copies the values to all collocation nodes
137 and evaluates the RHS of the ODE there
138 """
140 # get current level and problem description
141 L = self.level
142 P = L.prob
144 # evaluate RHS at left point
145 L.f[0] = P.eval_f(L.u[0], L.time)
147 for m in range(1, self.coll.num_nodes + 1):
148 # copy u[0] to all collocation nodes, evaluate RHS
149 if self.params.initial_guess == 'spread':
150 L.u[m] = P.dtype_u(L.u[0])
151 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1])
152 # copy u[0] and RHS evaluation to all collocation nodes
153 elif self.params.initial_guess == 'copy':
154 L.u[m] = P.dtype_u(L.u[0])
155 L.f[m] = P.dtype_f(L.f[0])
156 # start with zero everywhere
157 elif self.params.initial_guess == 'zero':
158 L.u[m] = P.dtype_u(init=P.init, val=0.0)
159 L.f[m] = P.dtype_f(init=P.init, val=0.0)
160 # start with random initial guess
161 elif self.params.initial_guess == 'random':
162 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0])
163 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0])
164 else:
165 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented')
167 # indicate that this level is now ready for sweeps
168 L.status.unlocked = True
169 L.status.updated = True
171 def compute_residual(self, stage=''):
172 """
173 Computation of the residual using the collocation matrix Q
175 Args:
176 stage (str): The current stage of the step the level belongs to
177 """
179 # get current level and problem description
180 L = self.level
182 # Check if we want to skip the residual computation to gain performance
183 # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual!
184 if stage in self.params.skip_residual_computation:
185 L.status.residual = 0.0 if L.status.residual is None else L.status.residual
186 return None
188 # check if there are new values (e.g. from a sweep)
189 # assert L.status.updated
191 # compute the residual for each node
193 # build QF(u)
194 res_norm = []
195 res = self.integrate()
196 for m in range(self.coll.num_nodes):
197 res[m] += L.u[0] - L.u[m + 1]
198 # add tau if associated
199 if L.tau[m] is not None:
200 res[m] += L.tau[m]
201 # use abs function from data type here
202 res_norm.append(abs(res[m]))
204 # find maximal residual over the nodes
205 if L.params.residual_type == 'full_abs':
206 L.status.residual = max(res_norm)
207 elif L.params.residual_type == 'last_abs':
208 L.status.residual = res_norm[-1]
209 elif L.params.residual_type == 'full_rel':
210 L.status.residual = max(res_norm) / abs(L.u[0])
211 elif L.params.residual_type == 'last_rel':
212 L.status.residual = res_norm[-1] / abs(L.u[0])
213 else:
214 raise ParameterError(
215 f'residual_type = {L.params.residual_type} not implemented, choose '
216 f'full_abs, last_abs, full_rel or last_rel instead'
217 )
219 # indicate that the residual has seen the new values
220 L.status.updated = False
222 return None
224 def compute_end_point(self):
225 """
226 Abstract interface to end-node computation
227 """
228 raise NotImplementedError('ERROR: sweeper has to implement compute_end_point(self)')
230 def integrate(self):
231 """
232 Abstract interface to right-hand side integration
233 """
234 raise NotImplementedError('ERROR: sweeper has to implement integrate(self)')
236 def update_nodes(self):
237 """
238 Abstract interface to node update
239 """
240 raise NotImplementedError('ERROR: sweeper has to implement update_nodes(self)')
242 @property
243 def level(self):
244 """
245 Returns the current level
247 Returns:
248 pySDC.Level.level: the current level
249 """
250 return self.__level
252 @level.setter
253 def level(self, L):
254 """
255 Sets a reference to the current level (done in the initialization of the level)
257 Args:
258 L (pySDC.Level.level): current level
259 """
260 assert isinstance(L, Level)
261 self.__level = L
263 @property
264 def rank(self):
265 return 0
267 def updateVariableCoeffs(self, k):
268 """
269 Potentially update QDelta implicit coefficients if variable ...
271 Parameters
272 ----------
273 k : int
274 Index of the sweep (0 for initial sweep, 1 for the first one, ...).
275 """
276 if self.params.QI == 'MIN-SR-FLEX':
277 self.QI = self.get_Qdelta_implicit(qd_type="MIN-SR-FLEX", k=k)