Coverage for pySDC/core/sweeper.py: 98%

120 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-20 17:10 +0000

1import logging 

2import numpy as np 

3from qmat import QDELTA_GENERATORS 

4 

5from pySDC.core.errors import ParameterError 

6from pySDC.core.level import Level 

7from pySDC.core.collocation import CollBase 

8from pySDC.helpers.pysdc_helper import FrozenClass 

9 

10 

11# short helper class to add params as attributes 

12class _Pars(FrozenClass): 

13 def __init__(self, pars): 

14 self.do_coll_update = False 

15 self.initial_guess = 'spread' # default value (see also below) 

16 self.skip_residual_computation = () # gain performance at the cost of correct residual output 

17 

18 for k, v in pars.items(): 

19 if k != 'collocation_class': 

20 setattr(self, k, v) 

21 

22 self._freeze() 

23 

24 

25class Sweeper(object): 

26 """ 

27 Base abstract sweeper class, provides two base methods to generate QDelta matrices: 

28 

29 - get_Qdelta_implicit(qd_type): 

30 Returns a (pySDC-type) QDelta matrix of **implicit type**, 

31 *i.e* lower triangular with zeros on the first collumn. 

32 - get_Qdelta_explicit(qd_type): 

33 Returns a (pySDC-type) QDelta matrix of **explicit type**, 

34 *i.e* strictly lower triangular with first node distance to zero on the first collumn. 

35 

36 

37 All possible QDelta matrix coefficients are generated with 

38 `qmat <https://qmat.readthedocs.io/en/latest/autoapi/qmat/qdelta/index.html>`_, 

39 check it out to see all available coefficient types. 

40 

41 Attributes: 

42 logger: custom logger for sweeper-related logging 

43 params (__Pars): parameter object containing the custom parameters passed by the user 

44 coll (pySDC.Collocation.CollBase): collocation object 

45 """ 

46 

47 def __init__(self, params): 

48 """ 

49 Initialization routine for the base sweeper 

50 

51 Args: 

52 params (dict): parameter object 

53 

54 """ 

55 

56 # set up logger 

57 self.logger = logging.getLogger('sweeper') 

58 

59 essential_keys = ['num_nodes'] 

60 for key in essential_keys: 

61 if key not in params: 

62 msg = 'need %s to instantiate step, only got %s' % (key, str(params.keys())) 

63 self.logger.error(msg) 

64 raise ParameterError(msg) 

65 

66 if 'collocation_class' not in params: 

67 params['collocation_class'] = CollBase 

68 

69 # prepare random generator for initial guess 

70 if params.get('initial_guess', 'spread') == 'random': # default value (see also above) 

71 params['random_seed'] = params.get('random_seed', 1984) 

72 self.rng = np.random.RandomState(params['random_seed']) 

73 

74 self.params = _Pars(params) 

75 

76 self.coll: CollBase = params['collocation_class'](**params) 

77 

78 if not self.coll.right_is_node and not self.params.do_coll_update: 

79 self.logger.warning( 

80 'we need to do a collocation update here, since the right end point is not a node. Changing this!' 

81 ) 

82 self.params.do_coll_update = True 

83 

84 # This will be set as soon as the sweeper is instantiated at the level 

85 self.__level = None 

86 

87 self.parallelizable = False 

88 

89 def setupGenerator(self, qd_type): 

90 coll = self.coll 

91 try: 

92 assert QDELTA_GENERATORS[qd_type] == type(self.generator) 

93 assert self.generator.QDelta.shape[0] == coll.Qmat.shape[0] - 1 

94 except (AssertionError, AttributeError): 

95 self.generator = QDELTA_GENERATORS[qd_type]( 

96 # for algebraic types (LU, ...) 

97 Q=coll.generator.Q, 

98 # for MIN in tables, MIN-SR-S ... 

99 nNodes=coll.num_nodes, 

100 nodeType=coll.node_type, 

101 quadType=coll.quad_type, 

102 # for time-stepping types, MIN-SR-NS 

103 nodes=coll.nodes, 

104 tLeft=coll.tleft, 

105 ) 

106 except Exception as e: 

107 raise ValueError(f"could not generate {qd_type=!r} with qmat, got error : {e}") from e 

108 

109 def get_Qdelta_implicit(self, qd_type, k=None): 

110 QDmat = np.zeros_like(self.coll.Qmat) 

111 self.setupGenerator(qd_type) 

112 QDmat[1:, 1:] = self.generator.genCoeffs(k=k) 

113 

114 err_msg = 'Lower triangular matrix expected!' 

115 np.testing.assert_array_equal(np.triu(QDmat, k=1), np.zeros(QDmat.shape), err_msg=err_msg) 

116 if np.allclose(np.diag(np.diag(QDmat)), QDmat): 

117 self.parallelizable = True 

118 return QDmat 

119 

120 def get_Qdelta_explicit(self, qd_type, k=None): 

121 coll = self.coll 

122 QDmat = np.zeros(coll.Qmat.shape, dtype=float) 

123 self.setupGenerator(qd_type) 

124 QDmat[1:, 1:], QDmat[1:, 0] = self.generator.genCoeffs(k=k, dTau=True) 

125 

126 err_msg = 'Strictly lower triangular matrix expected!' 

127 np.testing.assert_array_equal(np.triu(QDmat, k=0), np.zeros(QDmat.shape), err_msg=err_msg) 

128 if np.allclose(np.diag(np.diag(QDmat)), QDmat): 

129 self.parallelizable = True # for PIC ;) 

130 return QDmat 

131 

132 def predict(self): 

133 """ 

134 Predictor to fill values at nodes before first sweep 

135 

136 Default prediction for the sweepers, only copies the values to all collocation nodes 

137 and evaluates the RHS of the ODE there 

138 """ 

139 

140 # get current level and problem description 

141 L = self.level 

142 P = L.prob 

143 

144 # evaluate RHS at left point 

145 L.f[0] = P.eval_f(L.u[0], L.time) 

146 

147 for m in range(1, self.coll.num_nodes + 1): 

148 # copy u[0] to all collocation nodes, evaluate RHS 

149 if self.params.initial_guess == 'spread': 

150 L.u[m] = P.dtype_u(L.u[0]) 

151 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1]) 

152 # copy u[0] and RHS evaluation to all collocation nodes 

153 elif self.params.initial_guess == 'copy': 

154 L.u[m] = P.dtype_u(L.u[0]) 

155 L.f[m] = P.dtype_f(L.f[0]) 

156 # start with zero everywhere 

157 elif self.params.initial_guess == 'zero': 

158 L.u[m] = P.dtype_u(init=P.init, val=0.0) 

159 L.f[m] = P.dtype_f(init=P.init, val=0.0) 

160 # start with random initial guess 

161 elif self.params.initial_guess == 'random': 

162 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0]) 

163 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0]) 

164 else: 

165 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented') 

166 

167 # indicate that this level is now ready for sweeps 

168 L.status.unlocked = True 

169 L.status.updated = True 

170 

171 def compute_residual(self, stage=''): 

172 """ 

173 Computation of the residual using the collocation matrix Q 

174 

175 Args: 

176 stage (str): The current stage of the step the level belongs to 

177 """ 

178 

179 # get current level and problem description 

180 L = self.level 

181 

182 # Check if we want to skip the residual computation to gain performance 

183 # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual! 

184 if stage in self.params.skip_residual_computation: 

185 L.status.residual = 0.0 if L.status.residual is None else L.status.residual 

186 return None 

187 

188 # check if there are new values (e.g. from a sweep) 

189 # assert L.status.updated 

190 

191 # compute the residual for each node 

192 

193 # build QF(u) 

194 res_norm = [] 

195 res = self.integrate() 

196 for m in range(self.coll.num_nodes): 

197 res[m] += L.u[0] - L.u[m + 1] 

198 # add tau if associated 

199 if L.tau[m] is not None: 

200 res[m] += L.tau[m] 

201 # use abs function from data type here 

202 res_norm.append(abs(res[m])) 

203 

204 # find maximal residual over the nodes 

205 if L.params.residual_type == 'full_abs': 

206 L.status.residual = max(res_norm) 

207 elif L.params.residual_type == 'last_abs': 

208 L.status.residual = res_norm[-1] 

209 elif L.params.residual_type == 'full_rel': 

210 L.status.residual = max(res_norm) / abs(L.u[0]) 

211 elif L.params.residual_type == 'last_rel': 

212 L.status.residual = res_norm[-1] / abs(L.u[0]) 

213 else: 

214 raise ParameterError( 

215 f'residual_type = {L.params.residual_type} not implemented, choose ' 

216 f'full_abs, last_abs, full_rel or last_rel instead' 

217 ) 

218 

219 # indicate that the residual has seen the new values 

220 L.status.updated = False 

221 

222 return None 

223 

224 def compute_end_point(self): 

225 """ 

226 Abstract interface to end-node computation 

227 """ 

228 raise NotImplementedError('ERROR: sweeper has to implement compute_end_point(self)') 

229 

230 def integrate(self): 

231 """ 

232 Abstract interface to right-hand side integration 

233 """ 

234 raise NotImplementedError('ERROR: sweeper has to implement integrate(self)') 

235 

236 def update_nodes(self): 

237 """ 

238 Abstract interface to node update 

239 """ 

240 raise NotImplementedError('ERROR: sweeper has to implement update_nodes(self)') 

241 

242 @property 

243 def level(self): 

244 """ 

245 Returns the current level 

246 

247 Returns: 

248 pySDC.Level.level: the current level 

249 """ 

250 return self.__level 

251 

252 @level.setter 

253 def level(self, L): 

254 """ 

255 Sets a reference to the current level (done in the initialization of the level) 

256 

257 Args: 

258 L (pySDC.Level.level): current level 

259 """ 

260 assert isinstance(L, Level) 

261 self.__level = L 

262 

263 @property 

264 def rank(self): 

265 return 0 

266 

267 def updateVariableCoeffs(self, k): 

268 """ 

269 Potentially update QDelta implicit coefficients if variable ... 

270 

271 Parameters 

272 ---------- 

273 k : int 

274 Index of the sweep (0 for initial sweep, 1 for the first one, ...). 

275 """ 

276 if self.params.QI == 'MIN-SR-FLEX': 

277 self.QI = self.get_Qdelta_implicit(qd_type="MIN-SR-FLEX", k=k)