Coverage for pySDC/core/sweeper.py: 94%

126 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-04 15:08 +0000

1import logging 

2import numpy as np 

3from qmat.qdelta import QDeltaGenerator, QDELTA_GENERATORS 

4 

5from pySDC.core.errors import ParameterError 

6from pySDC.core.collocation import CollBase 

7from pySDC.helpers.pysdc_helper import FrozenClass 

8 

9 

10# Organize QDeltaGenerator class in dict[type(QDeltaGenerator),set(str)] to retrieve aliases 

11QDELTA_GENERATORS_ALIASES = {v: set() for v in set(QDELTA_GENERATORS.values())} 

12for k, v in QDELTA_GENERATORS.items(): 

13 QDELTA_GENERATORS_ALIASES[v].add(k) 

14 

15 

16# short helper class to add params as attributes 

17class _Pars(FrozenClass): 

18 def __init__(self, pars): 

19 self.do_coll_update = False 

20 self.initial_guess = 'spread' # default value (see also below) 

21 self.skip_residual_computation = () # gain performance at the cost of correct residual output 

22 

23 for k, v in pars.items(): 

24 if k != 'collocation_class': 

25 setattr(self, k, v) 

26 

27 self._freeze() 

28 

29 

30class Sweeper(object): 

31 """ 

32 Base abstract sweeper class, provides two base methods to generate QDelta matrices: 

33 

34 - get_Qdelta_implicit(qd_type): 

35 Returns a (pySDC-type) QDelta matrix of **implicit type**, 

36 *i.e* lower triangular with zeros on the first collumn. 

37 - get_Qdelta_explicit(qd_type): 

38 Returns a (pySDC-type) QDelta matrix of **explicit type**, 

39 *i.e* strictly lower triangular with first node distance to zero on the first collumn. 

40 

41 

42 All possible QDelta matrix coefficients are generated with 

43 `qmat <https://qmat.readthedocs.io/en/latest/autoapi/qmat/qdelta/index.html>`_, 

44 check it out to see all available coefficient types. 

45 

46 Attributes: 

47 logger: custom logger for sweeper-related logging 

48 params (__Pars): parameter object containing the custom parameters passed by the user 

49 coll (pySDC.Collocation.CollBase): collocation object 

50 """ 

51 

52 def __init__(self, params, level): 

53 """ 

54 Initialization routine for the base sweeper 

55 

56 Args: 

57 params (dict): parameter object 

58 level (pySDC.Level.level): the level that uses this sweeper 

59 """ 

60 

61 self.logger = logging.getLogger('sweeper') 

62 

63 essential_keys = ['num_nodes'] 

64 for key in essential_keys: 

65 if key not in params: 

66 msg = 'need %s to instantiate step, only got %s' % (key, str(params.keys())) 

67 self.logger.error(msg) 

68 raise ParameterError(msg) 

69 

70 if 'collocation_class' not in params: 

71 params['collocation_class'] = CollBase 

72 

73 # prepare random generator for initial guess 

74 if params.get('initial_guess', 'spread') == 'random': # default value (see also above) 

75 params['random_seed'] = params.get('random_seed', 1984) 

76 self.rng = np.random.RandomState(params['random_seed']) 

77 

78 self.params = _Pars(params) 

79 

80 self.coll: CollBase = params['collocation_class'](**params) 

81 

82 if not self.coll.right_is_node and not self.params.do_coll_update: 

83 self.logger.warning( 

84 'we need to do a collocation update here, since the right end point is not a node. Changing this!' 

85 ) 

86 self.params.do_coll_update = True 

87 

88 self.__level = level 

89 self.parallelizable = False 

90 for name in ["genQI", "genQE"]: 

91 if hasattr(self, name): 

92 delattr(self, name) 

93 

94 def buildGenerator(self, qdType: str) -> QDeltaGenerator: 

95 return QDELTA_GENERATORS[qdType](qGen=self.coll.generator, tLeft=self.coll.tleft) 

96 

97 def get_Qdelta_implicit(self, qd_type, k=None): 

98 QDmat = np.zeros_like(self.coll.Qmat) 

99 if not hasattr(self, "genQI") or qd_type not in QDELTA_GENERATORS_ALIASES[type(self.genQI)]: 

100 self.genQI: QDeltaGenerator = self.buildGenerator(qd_type) 

101 QDmat[1:, 1:] = self.genQI.genCoeffs(k=k) 

102 

103 err_msg = 'Lower triangular matrix expected!' 

104 np.testing.assert_array_equal(np.triu(QDmat, k=1), np.zeros(QDmat.shape), err_msg=err_msg) 

105 if np.allclose(np.diag(np.diag(QDmat)), QDmat): 

106 self.parallelizable = True 

107 return QDmat 

108 

109 def get_Qdelta_explicit(self, qd_type, k=None): 

110 coll = self.coll 

111 QDmat = np.zeros(coll.Qmat.shape, dtype=float) 

112 if not hasattr(self, "genQE") or qd_type not in QDELTA_GENERATORS_ALIASES[type(self.genQE)]: 

113 self.genQE: QDeltaGenerator = self.buildGenerator(qd_type) 

114 QDmat[1:, 1:], QDmat[1:, 0] = self.genQE.genCoeffs(k=k, dTau=True) 

115 

116 err_msg = 'Strictly lower triangular matrix expected!' 

117 np.testing.assert_array_equal(np.triu(QDmat, k=0), np.zeros(QDmat.shape), err_msg=err_msg) 

118 if np.allclose(np.diag(np.diag(QDmat)), QDmat): 

119 self.parallelizable = True # for PIC ;) 

120 return QDmat 

121 

122 def predict(self): 

123 """ 

124 Predictor to fill values at nodes before first sweep 

125 

126 Default prediction for the sweepers, only copies the values to all collocation nodes 

127 and evaluates the RHS of the ODE there 

128 """ 

129 

130 # get current level and problem description 

131 L = self.level 

132 P = L.prob 

133 

134 # evaluate RHS at left point 

135 L.f[0] = P.eval_f(L.u[0], L.time) 

136 

137 for m in range(1, self.coll.num_nodes + 1): 

138 # copy u[0] to all collocation nodes, evaluate RHS 

139 if self.params.initial_guess == 'spread': 

140 L.u[m] = P.dtype_u(L.u[0]) 

141 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1]) 

142 # copy u[0] and RHS evaluation to all collocation nodes 

143 elif self.params.initial_guess == 'copy': 

144 L.u[m] = P.dtype_u(L.u[0]) 

145 L.f[m] = P.dtype_f(L.f[0]) 

146 # start with zero everywhere 

147 elif self.params.initial_guess == 'zero': 

148 L.u[m] = P.dtype_u(init=P.init, val=0.0) 

149 L.f[m] = P.dtype_f(init=P.init, val=0.0) 

150 # start with random initial guess 

151 elif self.params.initial_guess == 'random': 

152 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0]) 

153 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0]) 

154 else: 

155 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented') 

156 

157 # indicate that this level is now ready for sweeps 

158 L.status.unlocked = True 

159 L.status.updated = True 

160 

161 def compute_residual(self, stage=''): 

162 """ 

163 Computation of the residual using the collocation matrix Q 

164 

165 Args: 

166 stage (str): The current stage of the step the level belongs to 

167 """ 

168 

169 # get current level and problem description 

170 L = self.level 

171 

172 # Check if we want to skip the residual computation to gain performance 

173 # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual! 

174 if stage in self.params.skip_residual_computation: 

175 L.status.residual = 0.0 if L.status.residual is None else L.status.residual 

176 return None 

177 

178 # check if there are new values (e.g. from a sweep) 

179 # assert L.status.updated 

180 

181 # compute the residual for each node 

182 

183 # build QF(u) 

184 res_norm = [] 

185 L.residual = self.integrate() 

186 for m in range(self.coll.num_nodes): 

187 L.residual[m] += L.u[0] - L.u[m + 1] 

188 # add tau if associated 

189 if L.tau[m] is not None: 

190 L.residual[m] += L.tau[m] 

191 # use abs function from data type here 

192 res_norm.append(abs(L.residual[m])) 

193 

194 # find maximal residual over the nodes 

195 if L.params.residual_type == 'full_abs': 

196 L.status.residual = max(res_norm) 

197 elif L.params.residual_type == 'last_abs': 

198 L.status.residual = res_norm[-1] 

199 elif L.params.residual_type == 'full_rel': 

200 L.status.residual = max(res_norm) / abs(L.u[0]) 

201 elif L.params.residual_type == 'last_rel': 

202 L.status.residual = res_norm[-1] / abs(L.u[0]) 

203 else: 

204 raise ParameterError( 

205 f'residual_type = {L.params.residual_type} not implemented, choose ' 

206 f'full_abs, last_abs, full_rel or last_rel instead' 

207 ) 

208 

209 # indicate that the residual has seen the new values 

210 L.status.updated = False 

211 

212 return None 

213 

214 def compute_end_point(self): 

215 """ 

216 Abstract interface to end-node computation 

217 """ 

218 raise NotImplementedError('ERROR: sweeper has to implement compute_end_point(self)') 

219 

220 def integrate(self): 

221 """ 

222 Abstract interface to right-hand side integration 

223 """ 

224 raise NotImplementedError('ERROR: sweeper has to implement integrate(self)') 

225 

226 def update_nodes(self): 

227 """ 

228 Abstract interface to node update 

229 """ 

230 raise NotImplementedError('ERROR: sweeper has to implement update_nodes(self)') 

231 

232 @property 

233 def level(self): 

234 """ 

235 Returns the current level 

236 

237 Returns: 

238 pySDC.Level.level: the current level 

239 """ 

240 return self.__level 

241 

242 @level.setter 

243 def level(self, L): 

244 """ 

245 Sets a reference to the current level (done in the initialization of the level) 

246 

247 Args: 

248 L (pySDC.Level.level): current level 

249 """ 

250 from pySDC.core.level import Level 

251 

252 assert isinstance(L, Level) 

253 self.__level = L 

254 

255 @property 

256 def rank(self): 

257 return 0 

258 

259 def updateVariableCoeffs(self, k): 

260 """ 

261 Potentially update QDelta implicit coefficients if variable ... 

262 

263 Parameters 

264 ---------- 

265 k : int 

266 Index of the sweep (0 for initial sweep, 1 for the first one, ...). 

267 """ 

268 if hasattr(self, "genQI") and self.genQI.isKDependent(): 

269 qdType = type(self.genQI).__name__ 

270 self.QI = self.get_Qdelta_implicit(qdType, k=k) 

271 if hasattr(self, "genQE") and self.genQE.isKDependent(): 

272 qdType = type(self.genQE).__name__ 

273 self.QE = self.get_Qdelta_explicit(qdType, k=k)