Coverage for pySDC/core/base_transfer.py: 100%

103 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-09 14:59 +0000

1import logging 

2 

3import scipy.sparse as sp 

4 

5from pySDC.core.errors import UnlockError 

6from pySDC.helpers.pysdc_helper import FrozenClass 

7from qmat.lagrange import LagrangeApproximation 

8 

9 

10# short helper class to add params as attributes 

11class _Pars(FrozenClass): 

12 def __init__(self, pars): 

13 self.finter = False 

14 for k, v in pars.items(): 

15 setattr(self, k, v) 

16 

17 self._freeze() 

18 

19 

20class BaseTransfer(object): 

21 """ 

22 Standard base_transfer class 

23 

24 Attributes: 

25 logger: custom logger for sweeper-related logging 

26 params(__Pars): parameter object containing the custom parameters passed by the user 

27 fine (pySDC.Level.level): reference to the fine level 

28 coarse (pySDC.Level.level): reference to the coarse level 

29 """ 

30 

31 def __init__(self, fine_level, coarse_level, base_transfer_params, space_transfer_class, space_transfer_params): 

32 """ 

33 Initialization routine 

34 

35 Args: 

36 fine_level (pySDC.Level.level): fine level connected with the base_transfer operations 

37 coarse_level (pySDC.Level.level): coarse level connected with the base_transfer operations 

38 base_transfer_params (dict): parameters for the base_transfer operations 

39 space_transfer_class: class to perform spatial transfer 

40 space_transfer_params (dict): parameters for the space_transfer operations 

41 """ 

42 

43 self.params = _Pars(base_transfer_params) 

44 

45 # set up logger 

46 self.logger = logging.getLogger('transfer') 

47 

48 self.fine = fine_level 

49 self.coarse = coarse_level 

50 

51 fine_grid = self.fine.sweep.coll.nodes 

52 coarse_grid = self.coarse.sweep.coll.nodes 

53 

54 if len(fine_grid) == len(coarse_grid): 

55 self.Pcoll = sp.eye(len(fine_grid)).toarray() 

56 self.Rcoll = sp.eye(len(fine_grid)).toarray() 

57 else: 

58 self.Pcoll = self.get_transfer_matrix_Q(fine_grid, coarse_grid) 

59 self.Rcoll = self.get_transfer_matrix_Q(coarse_grid, fine_grid) 

60 

61 # set up spatial transfer 

62 self.space_transfer = space_transfer_class( 

63 fine_prob=self.fine.prob, coarse_prob=self.coarse.prob, params=space_transfer_params 

64 ) 

65 

66 @staticmethod 

67 def get_transfer_matrix_Q(f_nodes, c_nodes): 

68 """ 

69 Helper routine to quickly define transfer matrices from a coarse set 

70 to a fine set of nodes (fully Lagrangian) 

71 Args: 

72 f_nodes: fine nodes (size nF) 

73 c_nodes: coarse nodes (size nC) 

74 

75 Returns: 

76 matrix containing the interpolation weights (shape (nF, nC)) 

77 """ 

78 approx = LagrangeApproximation(c_nodes) 

79 return approx.getInterpolationMatrix(f_nodes) 

80 

81 def restrict(self): 

82 """ 

83 Space-time restriction routine 

84 

85 The routine applies the spatial restriction operator to the fine values on the fine nodes, then reevaluates f 

86 on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part 

87 is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the 

88 fine level are restricted as well. 

89 """ 

90 

91 # get data for easier access 

92 F = self.fine 

93 G = self.coarse 

94 

95 PG = G.prob 

96 

97 SF = F.sweep 

98 SG = G.sweep 

99 

100 # only if the level is unlocked at least by prediction 

101 if not F.status.unlocked: 

102 raise UnlockError('fine level is still locked, cannot use data from there') 

103 

104 # restrict fine values in space 

105 tmp_u = [] 

106 for m in range(1, SF.coll.num_nodes + 1): 

107 tmp_u.append(self.space_transfer.restrict(F.u[m])) 

108 

109 # restrict collocation values 

110 G.u[0] = self.space_transfer.restrict(F.u[0]) 

111 for n in range(1, SG.coll.num_nodes + 1): 

112 G.u[n] = self.Rcoll[n - 1, 0] * tmp_u[0] 

113 for m in range(1, SF.coll.num_nodes): 

114 G.u[n] += self.Rcoll[n - 1, m] * tmp_u[m] 

115 

116 # re-evaluate f on coarse level 

117 G.f[0] = PG.eval_f(G.u[0], G.time) 

118 for m in range(1, SG.coll.num_nodes + 1): 

119 G.f[m] = PG.eval_f(G.u[m], G.time + G.dt * SG.coll.nodes[m - 1]) 

120 

121 # build coarse level tau correction part 

122 tauG = G.sweep.integrate() 

123 

124 # build fine level tau correction part 

125 tauF = F.sweep.integrate() 

126 

127 # restrict fine level tau correction part in space 

128 tmp_tau = [] 

129 for m in range(SF.coll.num_nodes): 

130 tmp_tau.append(self.space_transfer.restrict(tauF[m])) 

131 

132 # restrict fine level tau correction part in collocation 

133 tauFG = [] 

134 for n in range(1, SG.coll.num_nodes + 1): 

135 tauFG.append(self.Rcoll[n - 1, 0] * tmp_tau[0]) 

136 for m in range(1, SF.coll.num_nodes): 

137 tauFG[-1] += self.Rcoll[n - 1, m] * tmp_tau[m] 

138 

139 # build tau correction 

140 for m in range(SG.coll.num_nodes): 

141 G.tau[m] = tauFG[m] - tauG[m] 

142 

143 if F.tau[0] is not None: 

144 # restrict possible tau correction from fine in space 

145 tmp_tau = [] 

146 for m in range(SF.coll.num_nodes): 

147 tmp_tau.append(self.space_transfer.restrict(F.tau[m])) 

148 

149 # restrict possible tau correction from fine in collocation 

150 for n in range(SG.coll.num_nodes): 

151 for m in range(SF.coll.num_nodes): 

152 G.tau[n] += self.Rcoll[n, m] * tmp_tau[m] 

153 else: 

154 pass 

155 

156 # save u and rhs evaluations for interpolation 

157 for m in range(1, SG.coll.num_nodes + 1): 

158 G.uold[m] = PG.dtype_u(G.u[m]) 

159 G.fold[m] = PG.dtype_f(G.f[m]) 

160 

161 # works as a predictor 

162 G.status.unlocked = True 

163 

164 return None 

165 

166 def prolong(self): 

167 """ 

168 Space-time prolongation routine 

169 

170 This routine applies the spatial prolongation routine to the difference between the computed and the restricted 

171 values on the coarse level and then adds this difference to the fine values as coarse correction. 

172 """ 

173 

174 # get data for easier access 

175 F = self.fine 

176 G = self.coarse 

177 

178 PF = F.prob 

179 

180 SF = F.sweep 

181 SG = G.sweep 

182 

183 # only of the level is unlocked at least by prediction or restriction 

184 if not G.status.unlocked: 

185 raise UnlockError('coarse level is still locked, cannot use data from there') 

186 

187 # build coarse correction 

188 

189 # interpolate values in space first 

190 tmp_u = [] 

191 for m in range(1, SG.coll.num_nodes + 1): 

192 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m])) 

193 

194 # interpolate values in collocation 

195 for n in range(1, SF.coll.num_nodes + 1): 

196 for m in range(SG.coll.num_nodes): 

197 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m] 

198 

199 # re-evaluate f on fine level 

200 for m in range(1, SF.coll.num_nodes + 1): 

201 F.f[m] = PF.eval_f(F.u[m], F.time + F.dt * SF.coll.nodes[m - 1]) 

202 

203 return None 

204 

205 def prolong_f(self): 

206 """ 

207 Space-time prolongation routine w.r.t. the rhs f 

208 

209 This routine applies the spatial prolongation routine to the difference between the computed and the restricted 

210 values on the coarse level and then adds this difference to the fine values as coarse correction. 

211 """ 

212 

213 # get data for easier access 

214 F = self.fine 

215 G = self.coarse 

216 

217 SF = F.sweep 

218 SG = G.sweep 

219 

220 # only of the level is unlocked at least by prediction or restriction 

221 if not G.status.unlocked: 

222 raise UnlockError('coarse level is still locked, cannot use data from there') 

223 

224 # build coarse correction 

225 

226 # interpolate values in space first 

227 tmp_u = [] 

228 tmp_f = [] 

229 for m in range(1, SG.coll.num_nodes + 1): 

230 tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m])) 

231 tmp_f.append(self.space_transfer.prolong(G.f[m] - G.fold[m])) 

232 

233 # interpolate values in collocation 

234 for n in range(1, SF.coll.num_nodes + 1): 

235 for m in range(SG.coll.num_nodes): 

236 F.u[n] += self.Pcoll[n - 1, m] * tmp_u[m] 

237 F.f[n] += self.Pcoll[n - 1, m] * tmp_f[m] 

238 

239 return None