Coverage for pySDC/projects/Resilience/sweepers.py: 91%

103 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-09 14:59 +0000

1import numpy as np 

2from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit 

3from pySDC.implementations.sweeper_classes.imex_1st_order import imex_1st_order 

4from pySDC.core.errors import ParameterError 

5 

6 

7class efficient_sweeper: 

8 """ 

9 Replace the predict function by something that does not excessively evaluate superfluous right hand sides. 

10 """ 

11 

12 def predict(self): 

13 """ 

14 Predictor to fill values at nodes before first sweep. Skip evaluation of RHS on initial conditions. 

15 

16 Default prediction for the sweepers, only copies the values to all collocation nodes 

17 and evaluates the RHS of the ODE there 

18 """ 

19 

20 # get current level and problem description 

21 L = self.level 

22 P = L.prob 

23 

24 for m in range(1, self.coll.num_nodes + 1): 

25 # copy u[0] to all collocation nodes, evaluate RHS 

26 if self.params.initial_guess == 'spread': 

27 L.u[m] = P.dtype_u(L.u[0]) 

28 L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1]) 

29 # start with zero everywhere 

30 elif self.params.initial_guess == 'zero': 

31 L.u[m] = P.dtype_u(init=P.init, val=0.0) 

32 L.f[m] = P.dtype_f(init=P.init, val=0.0) 

33 # start with random initial guess 

34 elif self.params.initial_guess == 'random': 

35 L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0]) 

36 L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0]) 

37 else: 

38 raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented') 

39 

40 # indicate that this level is now ready for sweeps 

41 L.status.unlocked = True 

42 L.status.updated = True 

43 

44 

45class generic_implicit_efficient(efficient_sweeper, generic_implicit): 

46 """ 

47 This sweeper has the same functionality of the `generic_implicit` sweeper, but saves a few operations at the expense 

48 of readability. 

49 """ 

50 

51 def integrate(self, Q=None): 

52 """ 

53 Integrates the right-hand side. Depending on `Q`, this may or may not be consistent with an integral 

54 approximation. 

55 

56 Args: 

57 Q (numpy.ndarray): Some sort of quadrature rule 

58 

59 Returns: 

60 list of dtype_u: containing the integral as values 

61 """ 

62 

63 # get current level and problem description 

64 L = self.level 

65 P = L.prob 

66 

67 Q = self.coll.Qmat if Q is None else Q 

68 

69 me = [] 

70 

71 # integrate RHS over all collocation nodes 

72 for m in range(1, self.coll.num_nodes + 1): 

73 # new instance of dtype_u, initialize values with 0 

74 me.append(P.dtype_u(P.init, val=0.0)) 

75 for j in range(1, self.coll.num_nodes + 1): 

76 me[-1] += L.dt * Q[m, j] * L.f[j] 

77 

78 return me 

79 

80 def update_nodes(self): 

81 """ 

82 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes 

83 

84 Returns: 

85 None 

86 """ 

87 

88 # get current level and problem description 

89 L = self.level 

90 P = L.prob 

91 

92 # only if the level has been touched before 

93 assert L.status.unlocked 

94 

95 # get number of collocation nodes for easier access 

96 M = self.coll.num_nodes 

97 

98 # gather all terms which are known already (e.g. from the previous iteration) 

99 # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau 

100 

101 # get QF(u^k) 

102 integral = self.integrate(Q=self.coll.Qmat - self.QI) 

103 for m in range(M): 

104 # add initial value 

105 integral[m] += L.u[0] 

106 # add tau if associated 

107 if L.tau[m] is not None: 

108 integral[m] += L.tau[m] 

109 

110 # do the sweep 

111 for m in range(0, M): 

112 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) 

113 rhs = P.dtype_u(integral[m]) 

114 for j in range(1, m + 1): 

115 rhs += L.dt * self.QI[m + 1, j] * L.f[j] 

116 

117 # implicit solve with prefactor stemming from the diagonal of Qd 

118 L.u[m + 1] = P.solve_system( 

119 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m] 

120 ) 

121 # update function values 

122 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m]) 

123 

124 # indicate presence of new values at this level 

125 L.status.updated = True 

126 

127 return None 

128 

129 def compute_residual(self, stage=''): 

130 lvl = self.level 

131 

132 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation: 

133 super().compute_residual(stage=stage) 

134 return None 

135 

136 res = lvl.u[0] - lvl.u[-1] 

137 for m in range(1, self.coll.num_nodes + 1): 

138 res += lvl.dt * self.coll.Qmat[-1, m] * lvl.f[m] 

139 

140 if lvl.params.residual_type[-3:] == 'abs': 

141 lvl.status.residual = abs(res) 

142 else: 

143 lvl.status.residual = abs(res) / abs(lvl.u[0]) 

144 

145 lvl.status.updated = False 

146 

147 

148class imex_1st_order_efficient(efficient_sweeper, imex_1st_order): 

149 """ 

150 Duplicate of `imex_1st_order` sweeper which is slightly more efficient at the cost of code readability. 

151 """ 

152 

153 def integrate(self, Q=None, QI=None, QE=None): 

154 """ 

155 Integrates the right-hand side (here impl + expl) 

156 

157 Args: 

158 Q (numpy.ndarray): Full quadrature rule 

159 QI (numpy.ndarray): Implicit preconditioner 

160 QE (numpy.ndarray): Explicit preconditioner 

161 

162 Returns: 

163 list of dtype_u: containing the integral as values 

164 """ 

165 

166 Q = self.coll.Qmat if Q is None else Q 

167 QI = np.zeros_like(Q) if QI is None else QI 

168 QE = np.zeros_like(Q) if QE is None else QE 

169 

170 # get current level and problem description 

171 L = self.level 

172 

173 me = [] 

174 

175 # integrate RHS over all collocation nodes 

176 for m in range(1, self.coll.num_nodes + 1): 

177 me.append(L.dt * ((Q - QI)[m, 1] * L.f[1].impl + (Q - QE)[m, 1] * L.f[1].expl)) 

178 # new instance of dtype_u, initialize values with 0 

179 for j in range(2, self.coll.num_nodes + 1): 

180 me[m - 1] += L.dt * ((Q - QI)[m, j] * L.f[j].impl + (Q - QE)[m, j] * L.f[j].expl) 

181 

182 return me 

183 

184 def update_nodes(self): 

185 """ 

186 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes 

187 

188 Returns: 

189 None 

190 """ 

191 

192 # get current level and problem description 

193 L = self.level 

194 P = L.prob 

195 

196 # only if the level has been touched before 

197 assert L.status.unlocked 

198 

199 # get number of collocation nodes for easier access 

200 M = self.coll.num_nodes 

201 

202 # gather all terms which are known already (e.g. from the previous iteration) 

203 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau 

204 

205 # get QF(u^k) 

206 integral = self.integrate(Q=self.coll.Qmat, QI=self.QI, QE=self.QE) 

207 for m in range(M): 

208 # add initial value 

209 integral[m] += L.u[0] 

210 # add tau if associated 

211 if L.tau[m] is not None: 

212 integral[m] += L.tau[m] 

213 

214 # do the sweep 

215 for m in range(0, M): 

216 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) 

217 rhs = P.dtype_u(integral[m]) 

218 for j in range(1, m + 1): 

219 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl) 

220 

221 # implicit solve with prefactor stemming from QI 

222 L.u[m + 1] = P.solve_system( 

223 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m] 

224 ) 

225 

226 # update function values 

227 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m]) 

228 

229 # indicate presence of new values at this level 

230 L.status.updated = True 

231 

232 return None 

233 

234 def compute_residual(self, stage=''): 

235 lvl = self.level 

236 

237 if lvl.params.residual_type[:4] == 'full' or stage in self.params.skip_residual_computation: 

238 super().compute_residual(stage=stage) 

239 return None 

240 

241 res = lvl.u[0] - lvl.u[-1] 

242 for m in range(1, self.coll.num_nodes + 1): 

243 res += lvl.dt * self.coll.Qmat[-1, m] * (lvl.f[m].impl + lvl.f[m].expl) 

244 

245 if lvl.params.residual_type[-3:] == 'abs': 

246 lvl.status.residual = abs(res) 

247 else: 

248 lvl.status.residual = abs(res) / abs(lvl.u[0]) 

249 

250 lvl.status.updated = False