Coverage for pySDC/projects/Resilience/sweepers.py: 92%

107 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-04 12:37 +0000

1import numpy as np 

2from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit 

3from pySDC.implementations.sweeper_classes.imex_1st_order import imex_1st_order 

4from pySDC.core.errors import ParameterError 

5 

6 

7class efficient_sweeper: 

8 """ 

9 Replace the predict function by something that does not excessively evaluate superfluous right hand sides. 

10 """ 

11 

12 def predict(self): 

13 """ 

14 Predictor to fill values at nodes before first sweep. Skip evaluation of RHS on initial conditions. 

15 

16 Default prediction for the sweepers, only copies the values to all collocation nodes 

17 and evaluates the RHS of the ODE there 

18 """ 

19 

20 # get current level and problem description 

21 L = self.level 

22 P = L.prob 

23 

24 for m in range(1, self.coll.num_nodes + 1): 

25 # copy u[0] to all collocation nodes, evaluate RHS 

26 if self.params.initial_guess == 'spread': 

27 L.u[m] = P.dtype_u(L.u[0]) 

28 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1]) 

29 elif self.params.initial_guess == 'copy': 

30 L.f[0] = P.eval_f(L.u[0], L.time) 

31 L.u[m] = P.dtype_u(L.u[0]) 

32 L.f[m] = P.dtype_f(L.f[0]) 

33 # start with zero everywhere 

34 elif self.params.initial_guess == 'zero': 

35 L.u[m] = P.dtype_u(init=P.init, val=0.0) 

36 L.f[m] = P.dtype_f(init=P.init, val=0.0) 

37 # start with random initial guess 

38 elif self.params.initial_guess == 'random': 

39 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0]) 

40 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0]) 

41 else: 

42 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented') 

43 

44 # indicate that this level is now ready for sweeps 

45 L.status.unlocked = True 

46 L.status.updated = True 

47 

48 

49class generic_implicit_efficient(efficient_sweeper, generic_implicit): 

50 """ 

51 This sweeper has the same functionality of the `generic_implicit` sweeper, but saves a few operations at the expense 

52 of readability. 

53 """ 

54 

55 def integrate(self, Q=None): 

56 """ 

57 Integrates the right-hand side. Depending on `Q`, this may or may not be consistent with an integral 

58 approximation. 

59 

60 Args: 

61 Q (numpy.ndarray): Some sort of quadrature rule 

62 

63 Returns: 

64 list of dtype_u: containing the integral as values 

65 """ 

66 

67 # get current level and problem description 

68 L = self.level 

69 P = L.prob 

70 

71 Q = self.coll.Qmat if Q is None else Q 

72 

73 me = [] 

74 

75 # integrate RHS over all collocation nodes 

76 for m in range(1, self.coll.num_nodes + 1): 

77 # new instance of dtype_u, initialize values with 0 

78 me.append(P.dtype_u(P.init, val=0.0)) 

79 for j in range(1, self.coll.num_nodes + 1): 

80 me[-1] += L.dt * Q[m, j] * L.f[j] 

81 

82 return me 

83 

84 def update_nodes(self): 

85 """ 

86 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes 

87 

88 Returns: 

89 None 

90 """ 

91 

92 # get current level and problem description 

93 L = self.level 

94 P = L.prob 

95 

96 # only if the level has been touched before 

97 assert L.status.unlocked 

98 

99 # get number of collocation nodes for easier access 

100 M = self.coll.num_nodes 

101 

102 # gather all terms which are known already (e.g. from the previous iteration) 

103 # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau 

104 

105 # get QF(u^k) 

106 integral = self.integrate(Q=self.coll.Qmat - self.QI) 

107 for m in range(M): 

108 # add initial value 

109 integral[m] += L.u[0] 

110 # add tau if associated 

111 if L.tau[m] is not None: 

112 integral[m] += L.tau[m] 

113 

114 # do the sweep 

115 for m in range(0, M): 

116 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) 

117 rhs = P.dtype_u(integral[m]) 

118 for j in range(1, m + 1): 

119 rhs += L.dt * self.QI[m + 1, j] * L.f[j] 

120 

121 # implicit solve with prefactor stemming from the diagonal of Qd 

122 L.u[m + 1] = P.solve_system( 

123 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m] 

124 ) 

125 # update function values 

126 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m]) 

127 

128 # indicate presence of new values at this level 

129 L.status.updated = True 

130 

131 return None 

132 

133 def compute_residual(self, stage=''): 

134 lvl = self.level 

135 

136 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation: 

137 super().compute_residual(stage=stage) 

138 return None 

139 

140 res = lvl.u[0] - lvl.u[-1] 

141 for m in range(1, self.coll.num_nodes + 1): 

142 res += lvl.dt * self.coll.Qmat[-1, m] * lvl.f[m] 

143 

144 if lvl.params.residual_type[-3:] == 'abs': 

145 lvl.status.residual = abs(res) 

146 else: 

147 lvl.status.residual = abs(res) / abs(lvl.u[0]) 

148 

149 lvl.status.updated = False 

150 

151 

152class imex_1st_order_efficient(efficient_sweeper, imex_1st_order): 

153 """ 

154 Duplicate of `imex_1st_order` sweeper which is slightly more efficient at the cost of code readability. 

155 """ 

156 

157 def integrate(self, Q=None, QI=None, QE=None): 

158 """ 

159 Integrates the right-hand side (here impl + expl) 

160 

161 Args: 

162 Q (numpy.ndarray): Full quadrature rule 

163 QI (numpy.ndarray): Implicit preconditioner 

164 QE (numpy.ndarray): Explicit preconditioner 

165 

166 Returns: 

167 list of dtype_u: containing the integral as values 

168 """ 

169 

170 Q = self.coll.Qmat if Q is None else Q 

171 QI = np.zeros_like(Q) if QI is None else QI 

172 QE = np.zeros_like(Q) if QE is None else QE 

173 

174 # get current level and problem description 

175 L = self.level 

176 

177 me = [] 

178 

179 # integrate RHS over all collocation nodes 

180 for m in range(1, self.coll.num_nodes + 1): 

181 me.append(L.dt * ((Q - QI)[m, 1] * L.f[1].impl + (Q - QE)[m, 1] * L.f[1].expl)) 

182 # new instance of dtype_u, initialize values with 0 

183 for j in range(2, self.coll.num_nodes + 1): 

184 me[m - 1] += L.dt * ((Q - QI)[m, j] * L.f[j].impl + (Q - QE)[m, j] * L.f[j].expl) 

185 

186 return me 

187 

188 def update_nodes(self): 

189 """ 

190 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes 

191 

192 Returns: 

193 None 

194 """ 

195 

196 # get current level and problem description 

197 L = self.level 

198 P = L.prob 

199 

200 # only if the level has been touched before 

201 assert L.status.unlocked 

202 

203 # get number of collocation nodes for easier access 

204 M = self.coll.num_nodes 

205 

206 # gather all terms which are known already (e.g. from the previous iteration) 

207 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau 

208 

209 # get QF(u^k) 

210 integral = self.integrate(Q=self.coll.Qmat, QI=self.QI, QE=self.QE) 

211 for m in range(M): 

212 # add initial value 

213 integral[m] += L.u[0] 

214 # add tau if associated 

215 if L.tau[m] is not None: 

216 integral[m] += L.tau[m] 

217 

218 # do the sweep 

219 for m in range(0, M): 

220 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) 

221 rhs = P.dtype_u(integral[m]) 

222 for j in range(1, m + 1): 

223 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl) 

224 

225 # implicit solve with prefactor stemming from QI 

226 L.u[m + 1] = P.solve_system( 

227 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m] 

228 ) 

229 

230 # update function values 

231 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m]) 

232 

233 # indicate presence of new values at this level 

234 L.status.updated = True 

235 

236 return None 

237 

238 def compute_residual(self, stage=''): 

239 lvl = self.level 

240 

241 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation: 

242 super().compute_residual(stage=stage) 

243 return None 

244 

245 res = lvl.u[0] - lvl.u[-1] 

246 for m in range(1, self.coll.num_nodes + 1): 

247 res += lvl.dt * self.coll.Qmat[-1, m] * (lvl.f[m].impl + lvl.f[m].expl) 

248 

249 if lvl.params.residual_type[-3:] == 'abs': 

250 lvl.status.residual = abs(res) 

251 else: 

252 lvl.status.residual = abs(res) / abs(lvl.u[0]) 

253 

254 lvl.status.updated = False