Coverage for pySDC/core/base_transfer.py: 100%
103 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 14:59 +0000
1import logging
3import scipy.sparse as sp
5from pySDC.core.errors import UnlockError
6from pySDC.helpers.pysdc_helper import FrozenClass
7from qmat.lagrange import LagrangeApproximation
10# short helper class to add params as attributes
11class _Pars(FrozenClass):
12 def __init__(self, pars):
13 self.finter = False
14 for k, v in pars.items():
15 setattr(self, k, v)
17 self._freeze()
20class BaseTransfer(object):
21 """
22 Standard base_transfer class
24 Attributes:
25 logger: custom logger for sweeper-related logging
26 params(__Pars): parameter object containing the custom parameters passed by the user
27 fine (pySDC.Level.level): reference to the fine level
28 coarse (pySDC.Level.level): reference to the coarse level
29 """
31 def __init__(self, fine_level, coarse_level, base_transfer_params, space_transfer_class, space_transfer_params):
32 """
33 Initialization routine
35 Args:
36 fine_level (pySDC.Level.level): fine level connected with the base_transfer operations
37 coarse_level (pySDC.Level.level): coarse level connected with the base_transfer operations
38 base_transfer_params (dict): parameters for the base_transfer operations
39 space_transfer_class: class to perform spatial transfer
40 space_transfer_params (dict): parameters for the space_transfer operations
41 """
43 self.params = _Pars(base_transfer_params)
45 # set up logger
46 self.logger = logging.getLogger('transfer')
48 self.fine = fine_level
49 self.coarse = coarse_level
51 fine_grid = self.fine.sweep.coll.nodes
52 coarse_grid = self.coarse.sweep.coll.nodes
54 if len(fine_grid) == len(coarse_grid):
55 self.Pcoll = sp.eye(len(fine_grid)).toarray()
56 self.Rcoll = sp.eye(len(fine_grid)).toarray()
57 else:
58 self.Pcoll = self.get_transfer_matrix_Q(fine_grid, coarse_grid)
59 self.Rcoll = self.get_transfer_matrix_Q(coarse_grid, fine_grid)
61 # set up spatial transfer
62 self.space_transfer = space_transfer_class(
63 fine_prob=self.fine.prob, coarse_prob=self.coarse.prob, params=space_transfer_params
64 )
66 @staticmethod
67 def get_transfer_matrix_Q(f_nodes, c_nodes):
68 """
69 Helper routine to quickly define transfer matrices from a coarse set
70 to a fine set of nodes (fully Lagrangian)
71 Args:
72 f_nodes: fine nodes (size nF)
73 c_nodes: coarse nodes (size nC)
75 Returns:
76 matrix containing the interpolation weights (shape (nF, nC))
77 """
78 approx = LagrangeApproximation(c_nodes)
79 return approx.getInterpolationMatrix(f_nodes)
81 def restrict(self):
82 """
83 Space-time restriction routine
85 The routine applies the spatial restriction operator to the fine values on the fine nodes, then reevaluates f
86 on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part
87 is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the
88 fine level are restricted as well.
89 """
91 # get data for easier access
92 F = self.fine
93 G = self.coarse
95 PG = G.prob
97 SF = F.sweep
98 SG = G.sweep
100 # only if the level is unlocked at least by prediction
101 if not F.status.unlocked:
102 raise UnlockError('fine level is still locked, cannot use data from there')
104 # restrict fine values in space
105 tmp_u = []
106 for m in range(1, SF.coll.num_nodes + 1):
107 tmp_u.append(self.space_transfer.restrict(F.u[m]))
109 # restrict collocation values
110 G.u[0] = self.space_transfer.restrict(F.u[0])
111 for n in range(1, SG.coll.num_nodes + 1):
112 G.u[n] = self.Rcoll[n - 1, 0] * tmp_u[0]
113 for m in range(1, SF.coll.num_nodes):
114 G.u[n] += self.Rcoll[n - 1, m] * tmp_u[m]
116 # re-evaluate f on coarse level
117 G.f[0] = PG.eval_f(G.u[0], G.time)
118 for m in range(1, SG.coll.num_nodes + 1):
119 G.f[m] = PG.eval_f(G.u[m], G.time + G.dt * SG.coll.nodes[m - 1])
121 # build coarse level tau correction part
122 tauG = G.sweep.integrate()
124 # build fine level tau correction part
125 tauF = F.sweep.integrate()
127 # restrict fine level tau correction part in space
128 tmp_tau = []
129 for m in range(SF.coll.num_nodes):
130 tmp_tau.append(self.space_transfer.restrict(tauF[m]))
132 # restrict fine level tau correction part in collocation
133 tauFG = []
134 for n in range(1, SG.coll.num_nodes + 1):
135 tauFG.append(self.Rcoll[n - 1, 0] * tmp_tau[0])
136 for m in range(1, SF.coll.num_nodes):
137 tauFG[-1] += self.Rcoll[n - 1, m] * tmp_tau[m]
139 # build tau correction
140 for m in range(SG.coll.num_nodes):
141 G.tau[m] = tauFG[m] - tauG[m]
143 if F.tau[0] is not None:
144 # restrict possible tau correction from fine in space
145 tmp_tau = []
146 for m in range(SF.coll.num_nodes):
147 tmp_tau.append(self.space_transfer.restrict(F.tau[m]))
149 # restrict possible tau correction from fine in collocation
150 for n in range(SG.coll.num_nodes):
151 for m in range(SF.coll.num_nodes):
152 G.tau[n] += self.Rcoll[n, m] * tmp_tau[m]
153 else:
154 pass
156 # save u and rhs evaluations for interpolation
157 for m in range(1, SG.coll.num_nodes + 1):
158 G.uold[m] = PG.dtype_u(G.u[m])
159 G.fold[m] = PG.dtype_f(G.f[m])
161 # works as a predictor
162 G.status.unlocked = True
164 return None
166 def prolong(self):
167 """
168 Space-time prolongation routine
170 This routine applies the spatial prolongation routine to the difference between the computed and the restricted
171 values on the coarse level and then adds this difference to the fine values as coarse correction.
172 """
174 # get data for easier access
175 F = self.fine
176 G = self.coarse
178 PF = F.prob
180 SF = F.sweep
181 SG = G.sweep
183 # only of the level is unlocked at least by prediction or restriction
184 if not G.status.unlocked:
185 raise UnlockError('coarse level is still locked, cannot use data from there')
187 # build coarse correction
189 # interpolate values in space first
190 tmp_u = []
191 for m in range(1, SG.coll.num_nodes + 1):
192 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m]))
194 # interpolate values in collocation
195 for n in range(1, SF.coll.num_nodes + 1):
196 for m in range(SG.coll.num_nodes):
197 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m]
199 # re-evaluate f on fine level
200 for m in range(1, SF.coll.num_nodes + 1):
201 F.f[m] = PF.eval_f(F.u[m], F.time + F.dt * SF.coll.nodes[m - 1])
203 return None
205 def prolong_f(self):
206 """
207 Space-time prolongation routine w.r.t. the rhs f
209 This routine applies the spatial prolongation routine to the difference between the computed and the restricted
210 values on the coarse level and then adds this difference to the fine values as coarse correction.
211 """
213 # get data for easier access
214 F = self.fine
215 G = self.coarse
217 SF = F.sweep
218 SG = G.sweep
220 # only of the level is unlocked at least by prediction or restriction
221 if not G.status.unlocked:
222 raise UnlockError('coarse level is still locked, cannot use data from there')
224 # build coarse correction
226 # interpolate values in space first
227 tmp_u = []
228 tmp_f = []
229 for m in range(1, SG.coll.num_nodes + 1):
230 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m]))
231 tmp_f.append(self.space_transfer.prolong(G.f[m] - G.fold[m]))
233 # interpolate values in collocation
234 for n in range(1, SF.coll.num_nodes + 1):
235 for m in range(SG.coll.num_nodes):
236 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m]
237 F.f[n] += self.Pcoll[n - 1, m] * tmp_f[m]
239 return None