Coverage for pySDC/implementations/convergence_controller_classes/estimate_polynomial_error.py: 78%

94 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-01 13:12 +0000

1import numpy as np 

2 

3from qmat.lagrange import LagrangeApproximation 

4from pySDC.core.convergence_controller import ConvergenceController 

5 

6 

7class EstimatePolynomialError(ConvergenceController): 

8 """ 

9 Estimate the local error by using all but one collocation node in a polynomial interpolation to that node. 

10 While the converged collocation problem with M nodes gives a order M approximation to this point, the interpolation 

11 gives only an order M-1 approximation. Hence, we have two solutions with different order, and we know their order. 

12 That is to say this gives an error estimate that is order M. Keep in mind that the collocation problem should be 

13 converged for this and has order up to 2M. Still, the lower order method can be used for time step selection, for 

14 instance. 

15 If the last node is not the end point, we can interpolate to that node, which is an order M approximation and compare 

16 to the order 2M approximation we get from the extrapolation step. 

17 By default, we interpolate to the second to last node. 

18 """ 

19 

20 def setup(self, controller, params, description, **kwargs): 

21 """ 

22 Args: 

23 controller (pySDC.Controller.controller): The controller 

24 params (dict): The params passed for this specific convergence controller 

25 description (dict): The description object used to instantiate the controller 

26 

27 Returns: 

28 (dict): The updated params dictionary 

29 """ 

30 from pySDC.implementations.hooks.log_embedded_error_estimate import LogEmbeddedErrorEstimate 

31 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence 

32 

33 sweeper_params = description['sweeper_params'] 

34 num_nodes = sweeper_params['num_nodes'] 

35 quad_type = sweeper_params['quad_type'] 

36 

37 defaults = { 

38 'control_order': -75, 

39 'estimate_on_node': num_nodes + 1 if quad_type == 'GAUSS' else num_nodes - 1, 

40 'rel_error': False, 

41 **super().setup(controller, params, description, **kwargs), 

42 } 

43 self.comm = description['sweeper_params'].get('comm', None) 

44 

45 if self.comm: 

46 from mpi4py import MPI 

47 

48 self.prepare_MPI_datatypes() 

49 self.MPI_SUM = MPI.SUM 

50 

51 controller.add_hook(LogEmbeddedErrorEstimate) 

52 self.check_convergence = CheckConvergence.check_convergence 

53 

54 if quad_type != 'GAUSS' and defaults['estimate_on_node'] > num_nodes: 

55 from pySDC.core.errors import ParameterError 

56 

57 raise ParameterError( 

58 'You cannot interpolate with lower accuracy to the end point if the end point is a node!' 

59 ) 

60 

61 self.interpolation_matrix = None 

62 

63 return defaults 

64 

65 def reset_status_variables(self, *args, **kwargs): 

66 """ 

67 Add variable for embedded error 

68 

69 Returns: 

70 None 

71 """ 

72 self.add_status_variable_to_level('error_embedded_estimate') 

73 self.add_status_variable_to_level('order_embedded_estimate') 

74 

75 def matmul(self, A, b, xp=np): 

76 """ 

77 Matrix vector multiplication, possibly MPI parallel. 

78 The parallel implementation performs a reduce operation in every row of the matrix. While communicating the 

79 entire vector once could reduce the number of communications, this way we never need to store the entire vector 

80 on any specific rank. 

81 

82 Args: 

83 A (2d np.ndarray): Matrix 

84 b (list): Vector 

85 xp: Either numpy or cupy 

86 

87 Returns: 

88 List: Axb 

89 """ 

90 

91 if self.comm: 

92 res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])] 

93 buf = b[0] * 0.0 

94 for i in range(0, A.shape[0]): 

95 index = self.comm.rank + (1 if self.comm.rank < self.params.estimate_on_node - 1 else 0) 

96 send_buf = ( 

97 (A[i, index] * b[index]) 

98 if self.comm.rank != self.params.estimate_on_node - 1 

99 else xp.zeros_like(res[0]) 

100 ) 

101 self.comm.Allreduce(send_buf, buf, op=self.MPI_SUM) 

102 res[i] += buf 

103 return res 

104 else: 

105 return A @ xp.asarray(b) 

106 

107 def get_interpolated_solution(self, L, xp): 

108 """ 

109 Get the interpolated solution for numpy or cupy data types 

110 

111 Args: 

112 u_vec (array): Vector of solutions 

113 prob (pySDC.problem): Problem 

114 """ 

115 coll = L.sweep.coll 

116 

117 u = [ 

118 L.u[i].flatten() if L.u[i] is not None else L.u[i] 

119 for i in range(coll.num_nodes + 1) 

120 if i != self.params.estimate_on_node 

121 ] 

122 return self.matmul(self.interpolation_matrix, u, xp=xp)[0].reshape(L.prob.init[0]) 

123 

124 def post_iteration_processing(self, controller, S, **kwargs): 

125 """ 

126 Estimate the error 

127 

128 Args: 

129 controller (pySDC.Controller.controller): The controller 

130 S (pySDC.Step.step): The current step 

131 

132 Returns: 

133 None 

134 """ 

135 

136 if self.check_convergence(S): 

137 L = S.levels[0] 

138 coll = L.sweep.coll 

139 nodes = np.append(np.append(0, coll.nodes), 1.0) 

140 estimate_on_node = self.params.estimate_on_node 

141 

142 if hasattr(L.u[0], 'xp'): 

143 xp = L.u[0].xp 

144 else: 

145 xp = np 

146 

147 if self.interpolation_matrix is None: 

148 interpolator = LagrangeApproximation( 

149 points=[nodes[i] for i in range(coll.num_nodes + 1) if i != estimate_on_node] 

150 ) 

151 self.interpolation_matrix = xp.array(interpolator.getInterpolationMatrix([nodes[estimate_on_node]])) 

152 

153 u_inter = self.get_interpolated_solution(L, xp) 

154 

155 # compute end point if needed 

156 if estimate_on_node == len(nodes) - 1: 

157 if L.uend is None: 

158 L.sweep.compute_end_point() 

159 high_order_sol = L.uend 

160 rank = 0 

161 L.status.order_embedded_estimate = coll.num_nodes + 1 

162 else: 

163 high_order_sol = L.u[estimate_on_node] 

164 rank = estimate_on_node - 1 

165 L.status.order_embedded_estimate = coll.num_nodes * 1 

166 

167 rescale = float(abs(u_inter)) if self.params.rel_error else 1 

168 

169 if self.comm: 

170 buf = np.array(abs(u_inter - high_order_sol) / rescale if self.comm.rank == rank else 0.0) 

171 self.comm.Bcast(buf, root=rank) 

172 L.status.error_embedded_estimate = float(buf) 

173 else: 

174 L.status.error_embedded_estimate = abs(u_inter - high_order_sol) / rescale 

175 

176 self.debug( 

177 f'Obtained error estimate: {L.status.error_embedded_estimate:.2e} of order {L.status.order_embedded_estimate}', 

178 S, 

179 ) 

180 

181 def check_parameters(self, controller, params, description, **kwargs): 

182 """ 

183 Check if we allow the scheme to solve the collocation problems to convergence. 

184 

185 Args: 

186 controller (pySDC.Controller): The controller 

187 params (dict): The params passed for this specific convergence controller 

188 description (dict): The description object used to instantiate the controller 

189 

190 Returns: 

191 bool: Whether the parameters are compatible 

192 str: The error message 

193 """ 

194 if description['sweeper_params'].get('num_nodes', 0) < 2: 

195 return False, 'Need at least two collocation nodes to interpolate to one!' 

196 

197 return True, "" 

198 

199 

200class EstimatePolynomialErrorFiredrake(EstimatePolynomialError): 

201 def matmul(self, A, b): 

202 """ 

203 Matrix vector multiplication, possibly MPI parallel. 

204 The parallel implementation performs a reduce operation in every row of the matrix. While communicating the 

205 entire vector once could reduce the number of communications, this way we never need to store the entire vector 

206 on any specific rank. 

207 

208 Args: 

209 A (2d np.ndarray): Matrix 

210 b (list): Vector 

211 

212 Returns: 

213 List: Axb 

214 """ 

215 

216 if self.comm: 

217 res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])] 

218 buf = 0 * b[0] 

219 for i in range(0, A.shape[0]): 

220 index = self.comm.rank + (1 if self.comm.rank < self.params.estimate_on_node - 1 else 0) 

221 send_buf = ( 

222 (A[i, index] * b[index]) if self.comm.rank != self.params.estimate_on_node - 1 else 0 * res[0] 

223 ) 

224 self.comm.Allreduce(send_buf, buf, op=self.MPI_SUM) 

225 res[i] += buf 

226 return res 

227 else: 

228 res = [] 

229 for i in range(A.shape[0]): 

230 res.append(A[i, 0] * b[0]) 

231 for j in range(1, A.shape[1]): 

232 res[-1] += A[i, j] * b[j] 

233 

234 return res 

235 

236 def get_interpolated_solution(self, L): 

237 """ 

238 Get the interpolated solution for Firedrake data types 

239 We are not 100% sure that you don't need to invert the mass matrix here, but should be fine. 

240 

241 Args: 

242 u_vec (array): Vector of solutions 

243 prob (pySDC.problem): Problem 

244 """ 

245 coll = L.sweep.coll 

246 

247 u = [ 

248 L.u[i] if L.u[i] is not None else L.u[i] 

249 for i in range(coll.num_nodes + 1) 

250 if i != self.params.estimate_on_node 

251 ] 

252 return L.prob.dtype_u(self.matmul(self.interpolation_matrix, u)[0]) 

253 # return L.prob.invert_mass_matrix(self.matmul(self.interpolation_matrix, u)[0])