Coverage for pySDC / core / base_transfer.py: 99%
107 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 09:00 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 09:00 +0000
1import logging
2from typing import Any, Dict, Optional, TYPE_CHECKING
4import scipy.sparse as sp
5import numpy as np
7from pySDC.core.errors import UnlockError
8from pySDC.helpers.pysdc_helper import FrozenClass
9from qmat.lagrange import LagrangeApproximation
11if TYPE_CHECKING:
12 from pySDC.core.level import Level
15# short helper class to add params as attributes
16class _Pars(FrozenClass):
17 def __init__(self, pars: Dict[str, Any]) -> None:
18 self.finter: bool = False
19 for k, v in pars.items():
20 setattr(self, k, v)
22 self._freeze()
25class BaseTransfer(object):
26 """
27 Standard base_transfer class
29 Attributes:
30 logger: custom logger for sweeper-related logging
31 params(__Pars): parameter object containing the custom parameters passed by the user
32 fine (pySDC.Level.level): reference to the fine level
33 coarse (pySDC.Level.level): reference to the coarse level
34 """
36 def __init__(
37 self,
38 fine_level: 'Level',
39 coarse_level: 'Level',
40 base_transfer_params: Dict[str, Any],
41 space_transfer_class: Any,
42 space_transfer_params: Dict[str, Any],
43 ) -> None:
44 """
45 Initialization routine
47 Args:
48 fine_level (pySDC.Level.level): fine level connected with the base_transfer operations
49 coarse_level (pySDC.Level.level): coarse level connected with the base_transfer operations
50 base_transfer_params (dict): parameters for the base_transfer operations
51 space_transfer_class: class to perform spatial transfer
52 space_transfer_params (dict): parameters for the space_transfer operations
53 """
55 self.params: _Pars = _Pars(base_transfer_params)
57 # set up logger
58 self.logger: logging.Logger = logging.getLogger('transfer')
60 self.fine: 'Level' = fine_level
61 self.coarse: 'Level' = coarse_level
63 fine_grid = self.fine.sweep.coll.nodes
64 coarse_grid = self.coarse.sweep.coll.nodes
66 if len(fine_grid) == len(coarse_grid):
67 self.Pcoll: np.ndarray = sp.eye(len(fine_grid)).toarray()
68 self.Rcoll: np.ndarray = sp.eye(len(fine_grid)).toarray()
69 else:
70 self.Pcoll = self.get_transfer_matrix_Q(fine_grid, coarse_grid)
71 self.Rcoll = self.get_transfer_matrix_Q(coarse_grid, fine_grid)
73 # set up spatial transfer
74 self.space_transfer: Any = space_transfer_class(
75 fine_prob=self.fine.prob, coarse_prob=self.coarse.prob, params=space_transfer_params
76 )
78 @staticmethod
79 def get_transfer_matrix_Q(f_nodes: np.ndarray, c_nodes: np.ndarray) -> np.ndarray:
80 """
81 Helper routine to quickly define transfer matrices from a coarse set
82 to a fine set of nodes (fully Lagrangian)
83 Args:
84 f_nodes: fine nodes (size nF)
85 c_nodes: coarse nodes (size nC)
87 Returns:
88 matrix containing the interpolation weights (shape (nF, nC))
89 """
90 approx = LagrangeApproximation(c_nodes)
91 return approx.getInterpolationMatrix(f_nodes)
93 def restrict(self) -> None:
94 """
95 Space-time restriction routine
97 The routine applies the spatial restriction operator to the fine values on the fine nodes, then reevaluates f
98 on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part
99 is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the
100 fine level are restricted as well.
101 """
103 # get data for easier access
104 F = self.fine
105 G = self.coarse
107 PG = G.prob
109 SF = F.sweep
110 SG = G.sweep
112 # only if the level is unlocked at least by prediction
113 if not F.status.unlocked:
114 raise UnlockError('fine level is still locked, cannot use data from there')
116 # restrict fine values in space
117 tmp_u = []
118 for m in range(1, SF.coll.num_nodes + 1):
119 tmp_u.append(self.space_transfer.restrict(F.u[m]))
121 # restrict collocation values
122 G.u[0] = self.space_transfer.restrict(F.u[0])
123 for n in range(1, SG.coll.num_nodes + 1):
124 G.u[n] = self.Rcoll[n - 1, 0] * tmp_u[0]
125 for m in range(1, SF.coll.num_nodes):
126 G.u[n] += self.Rcoll[n - 1, m] * tmp_u[m]
128 # re-evaluate f on coarse level
129 G.f[0] = PG.eval_f(G.u[0], G.time)
130 for m in range(1, SG.coll.num_nodes + 1):
131 G.f[m] = PG.eval_f(G.u[m], G.time + G.dt * SG.coll.nodes[m - 1])
133 # build coarse level tau correction part
134 tauG = G.sweep.integrate()
136 # build fine level tau correction part
137 tauF = F.sweep.integrate()
139 # restrict fine level tau correction part in space
140 tmp_tau = []
141 for m in range(SF.coll.num_nodes):
142 tmp_tau.append(self.space_transfer.restrict(tauF[m]))
144 # restrict fine level tau correction part in collocation
145 tauFG = []
146 for n in range(1, SG.coll.num_nodes + 1):
147 tauFG.append(self.Rcoll[n - 1, 0] * tmp_tau[0])
148 for m in range(1, SF.coll.num_nodes):
149 tauFG[-1] += self.Rcoll[n - 1, m] * tmp_tau[m]
151 # build tau correction
152 for m in range(SG.coll.num_nodes):
153 G.tau[m] = tauFG[m] - tauG[m]
155 if F.tau[0] is not None:
156 # restrict possible tau correction from fine in space
157 tmp_tau = []
158 for m in range(SF.coll.num_nodes):
159 tmp_tau.append(self.space_transfer.restrict(F.tau[m]))
161 # restrict possible tau correction from fine in collocation
162 for n in range(SG.coll.num_nodes):
163 for m in range(SF.coll.num_nodes):
164 G.tau[n] += self.Rcoll[n, m] * tmp_tau[m]
165 else:
166 pass
168 # save u and rhs evaluations for interpolation
169 for m in range(1, SG.coll.num_nodes + 1):
170 G.uold[m] = PG.dtype_u(G.u[m])
171 G.fold[m] = PG.dtype_f(G.f[m])
173 # works as a predictor
174 G.status.unlocked = True
176 return None
178 def prolong(self) -> None:
179 """
180 Space-time prolongation routine
182 This routine applies the spatial prolongation routine to the difference between the computed and the restricted
183 values on the coarse level and then adds this difference to the fine values as coarse correction.
184 """
186 # get data for easier access
187 F = self.fine
188 G = self.coarse
190 PF = F.prob
192 SF = F.sweep
193 SG = G.sweep
195 # only of the level is unlocked at least by prediction or restriction
196 if not G.status.unlocked:
197 raise UnlockError('coarse level is still locked, cannot use data from there')
199 # build coarse correction
201 # interpolate values in space first
202 tmp_u = []
203 for m in range(1, SG.coll.num_nodes + 1):
204 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m]))
206 # interpolate values in collocation
207 for n in range(1, SF.coll.num_nodes + 1):
208 for m in range(SG.coll.num_nodes):
209 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m]
211 # re-evaluate f on fine level
212 for m in range(1, SF.coll.num_nodes + 1):
213 F.f[m] = PF.eval_f(F.u[m], F.time + F.dt * SF.coll.nodes[m - 1])
215 return None
217 def prolong_f(self) -> None:
218 """
219 Space-time prolongation routine w.r.t. the rhs f
221 This routine applies the spatial prolongation routine to the difference between the computed and the restricted
222 values on the coarse level and then adds this difference to the fine values as coarse correction.
223 """
225 # get data for easier access
226 F = self.fine
227 G = self.coarse
229 SF = F.sweep
230 SG = G.sweep
232 # only of the level is unlocked at least by prediction or restriction
233 if not G.status.unlocked:
234 raise UnlockError('coarse level is still locked, cannot use data from there')
236 # build coarse correction
238 # interpolate values in space first
239 tmp_u = []
240 tmp_f = []
241 for m in range(1, SG.coll.num_nodes + 1):
242 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m]))
243 tmp_f.append(self.space_transfer.prolong(G.f[m] - G.fold[m]))
245 # interpolate values in collocation
246 for n in range(1, SF.coll.num_nodes + 1):
247 for m in range(SG.coll.num_nodes):
248 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m]
249 F.f[n] += self.Pcoll[n - 1, m] * tmp_f[m]
251 return None