Coverage for pySDC / core / base_transfer.py: 99%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 09:00 +0000

1import logging 

2from typing import Any, Dict, Optional, TYPE_CHECKING 

3 

4import scipy.sparse as sp 

5import numpy as np 

6 

7from pySDC.core.errors import UnlockError 

8from pySDC.helpers.pysdc_helper import FrozenClass 

9from qmat.lagrange import LagrangeApproximation 

10 

11if TYPE_CHECKING: 

12 from pySDC.core.level import Level 

13 

14 

15# short helper class to add params as attributes 

16class _Pars(FrozenClass): 

17 def __init__(self, pars: Dict[str, Any]) -> None: 

18 self.finter: bool = False 

19 for k, v in pars.items(): 

20 setattr(self, k, v) 

21 

22 self._freeze() 

23 

24 

25class BaseTransfer(object): 

26 """ 

27 Standard base_transfer class 

28 

29 Attributes: 

30 logger: custom logger for sweeper-related logging 

31 params(__Pars): parameter object containing the custom parameters passed by the user 

32 fine (pySDC.Level.level): reference to the fine level 

33 coarse (pySDC.Level.level): reference to the coarse level 

34 """ 

35 

36 def __init__( 

37 self, 

38 fine_level: 'Level', 

39 coarse_level: 'Level', 

40 base_transfer_params: Dict[str, Any], 

41 space_transfer_class: Any, 

42 space_transfer_params: Dict[str, Any], 

43 ) -> None: 

44 """ 

45 Initialization routine 

46 

47 Args: 

48 fine_level (pySDC.Level.level): fine level connected with the base_transfer operations 

49 coarse_level (pySDC.Level.level): coarse level connected with the base_transfer operations 

50 base_transfer_params (dict): parameters for the base_transfer operations 

51 space_transfer_class: class to perform spatial transfer 

52 space_transfer_params (dict): parameters for the space_transfer operations 

53 """ 

54 

55 self.params: _Pars = _Pars(base_transfer_params) 

56 

57 # set up logger 

58 self.logger: logging.Logger = logging.getLogger('transfer') 

59 

60 self.fine: 'Level' = fine_level 

61 self.coarse: 'Level' = coarse_level 

62 

63 fine_grid = self.fine.sweep.coll.nodes 

64 coarse_grid = self.coarse.sweep.coll.nodes 

65 

66 if len(fine_grid) == len(coarse_grid): 

67 self.Pcoll: np.ndarray = sp.eye(len(fine_grid)).toarray() 

68 self.Rcoll: np.ndarray = sp.eye(len(fine_grid)).toarray() 

69 else: 

70 self.Pcoll = self.get_transfer_matrix_Q(fine_grid, coarse_grid) 

71 self.Rcoll = self.get_transfer_matrix_Q(coarse_grid, fine_grid) 

72 

73 # set up spatial transfer 

74 self.space_transfer: Any = space_transfer_class( 

75 fine_prob=self.fine.prob, coarse_prob=self.coarse.prob, params=space_transfer_params 

76 ) 

77 

78 @staticmethod 

79 def get_transfer_matrix_Q(f_nodes: np.ndarray, c_nodes: np.ndarray) -> np.ndarray: 

80 """ 

81 Helper routine to quickly define transfer matrices from a coarse set 

82 to a fine set of nodes (fully Lagrangian) 

83 Args: 

84 f_nodes: fine nodes (size nF) 

85 c_nodes: coarse nodes (size nC) 

86 

87 Returns: 

88 matrix containing the interpolation weights (shape (nF, nC)) 

89 """ 

90 approx = LagrangeApproximation(c_nodes) 

91 return approx.getInterpolationMatrix(f_nodes) 

92 

93 def restrict(self) -> None: 

94 """ 

95 Space-time restriction routine 

96 

97 The routine applies the spatial restriction operator to the fine values on the fine nodes, then reevaluates f 

98 on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part 

99 is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the 

100 fine level are restricted as well. 

101 """ 

102 

103 # get data for easier access 

104 F = self.fine 

105 G = self.coarse 

106 

107 PG = G.prob 

108 

109 SF = F.sweep 

110 SG = G.sweep 

111 

112 # only if the level is unlocked at least by prediction 

113 if not F.status.unlocked: 

114 raise UnlockError('fine level is still locked, cannot use data from there') 

115 

116 # restrict fine values in space 

117 tmp_u = [] 

118 for m in range(1, SF.coll.num_nodes + 1): 

119 tmp_u.append(self.space_transfer.restrict(F.u[m])) 

120 

121 # restrict collocation values 

122 G.u[0] = self.space_transfer.restrict(F.u[0]) 

123 for n in range(1, SG.coll.num_nodes + 1): 

124 G.u[n] = self.Rcoll[n - 1, 0] * tmp_u[0] 

125 for m in range(1, SF.coll.num_nodes): 

126 G.u[n] += self.Rcoll[n - 1, m] * tmp_u[m] 

127 

128 # re-evaluate f on coarse level 

129 G.f[0] = PG.eval_f(G.u[0], G.time) 

130 for m in range(1, SG.coll.num_nodes + 1): 

131 G.f[m] = PG.eval_f(G.u[m], G.time + G.dt * SG.coll.nodes[m - 1]) 

132 

133 # build coarse level tau correction part 

134 tauG = G.sweep.integrate() 

135 

136 # build fine level tau correction part 

137 tauF = F.sweep.integrate() 

138 

139 # restrict fine level tau correction part in space 

140 tmp_tau = [] 

141 for m in range(SF.coll.num_nodes): 

142 tmp_tau.append(self.space_transfer.restrict(tauF[m])) 

143 

144 # restrict fine level tau correction part in collocation 

145 tauFG = [] 

146 for n in range(1, SG.coll.num_nodes + 1): 

147 tauFG.append(self.Rcoll[n - 1, 0] * tmp_tau[0]) 

148 for m in range(1, SF.coll.num_nodes): 

149 tauFG[-1] += self.Rcoll[n - 1, m] * tmp_tau[m] 

150 

151 # build tau correction 

152 for m in range(SG.coll.num_nodes): 

153 G.tau[m] = tauFG[m] - tauG[m] 

154 

155 if F.tau[0] is not None: 

156 # restrict possible tau correction from fine in space 

157 tmp_tau = [] 

158 for m in range(SF.coll.num_nodes): 

159 tmp_tau.append(self.space_transfer.restrict(F.tau[m])) 

160 

161 # restrict possible tau correction from fine in collocation 

162 for n in range(SG.coll.num_nodes): 

163 for m in range(SF.coll.num_nodes): 

164 G.tau[n] += self.Rcoll[n, m] * tmp_tau[m] 

165 else: 

166 pass 

167 

168 # save u and rhs evaluations for interpolation 

169 for m in range(1, SG.coll.num_nodes + 1): 

170 G.uold[m] = PG.dtype_u(G.u[m]) 

171 G.fold[m] = PG.dtype_f(G.f[m]) 

172 

173 # works as a predictor 

174 G.status.unlocked = True 

175 

176 return None 

177 

178 def prolong(self) -> None: 

179 """ 

180 Space-time prolongation routine 

181 

182 This routine applies the spatial prolongation routine to the difference between the computed and the restricted 

183 values on the coarse level and then adds this difference to the fine values as coarse correction. 

184 """ 

185 

186 # get data for easier access 

187 F = self.fine 

188 G = self.coarse 

189 

190 PF = F.prob 

191 

192 SF = F.sweep 

193 SG = G.sweep 

194 

195 # only of the level is unlocked at least by prediction or restriction 

196 if not G.status.unlocked: 

197 raise UnlockError('coarse level is still locked, cannot use data from there') 

198 

199 # build coarse correction 

200 

201 # interpolate values in space first 

202 tmp_u = [] 

203 for m in range(1, SG.coll.num_nodes + 1): 

204 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m])) 

205 

206 # interpolate values in collocation 

207 for n in range(1, SF.coll.num_nodes + 1): 

208 for m in range(SG.coll.num_nodes): 

209 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m] 

210 

211 # re-evaluate f on fine level 

212 for m in range(1, SF.coll.num_nodes + 1): 

213 F.f[m] = PF.eval_f(F.u[m], F.time + F.dt * SF.coll.nodes[m - 1]) 

214 

215 return None 

216 

217 def prolong_f(self) -> None: 

218 """ 

219 Space-time prolongation routine w.r.t. the rhs f 

220 

221 This routine applies the spatial prolongation routine to the difference between the computed and the restricted 

222 values on the coarse level and then adds this difference to the fine values as coarse correction. 

223 """ 

224 

225 # get data for easier access 

226 F = self.fine 

227 G = self.coarse 

228 

229 SF = F.sweep 

230 SG = G.sweep 

231 

232 # only of the level is unlocked at least by prediction or restriction 

233 if not G.status.unlocked: 

234 raise UnlockError('coarse level is still locked, cannot use data from there') 

235 

236 # build coarse correction 

237 

238 # interpolate values in space first 

239 tmp_u = [] 

240 tmp_f = [] 

241 for m in range(1, SG.coll.num_nodes + 1): 

242 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m])) 

243 tmp_f.append(self.space_transfer.prolong(G.f[m] - G.fold[m])) 

244 

245 # interpolate values in collocation 

246 for n in range(1, SF.coll.num_nodes + 1): 

247 for m in range(SG.coll.num_nodes): 

248 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m] 

249 F.f[n] += self.Pcoll[n - 1, m] * tmp_f[m] 

250 

251 return None