Coverage for pySDC/implementations/convergence_controller_classes/estimate_polynomial_error.py: 78%
94 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-01 13:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-01 13:12 +0000
1import numpy as np
3from qmat.lagrange import LagrangeApproximation
4from pySDC.core.convergence_controller import ConvergenceController
7class EstimatePolynomialError(ConvergenceController):
8 """
9 Estimate the local error by using all but one collocation node in a polynomial interpolation to that node.
10 While the converged collocation problem with M nodes gives a order M approximation to this point, the interpolation
11 gives only an order M-1 approximation. Hence, we have two solutions with different order, and we know their order.
12 That is to say this gives an error estimate that is order M. Keep in mind that the collocation problem should be
13 converged for this and has order up to 2M. Still, the lower order method can be used for time step selection, for
14 instance.
15 If the last node is not the end point, we can interpolate to that node, which is an order M approximation and compare
16 to the order 2M approximation we get from the extrapolation step.
17 By default, we interpolate to the second to last node.
18 """
20 def setup(self, controller, params, description, **kwargs):
21 """
22 Args:
23 controller (pySDC.Controller.controller): The controller
24 params (dict): The params passed for this specific convergence controller
25 description (dict): The description object used to instantiate the controller
27 Returns:
28 (dict): The updated params dictionary
29 """
30 from pySDC.implementations.hooks.log_embedded_error_estimate import LogEmbeddedErrorEstimate
31 from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence
33 sweeper_params = description['sweeper_params']
34 num_nodes = sweeper_params['num_nodes']
35 quad_type = sweeper_params['quad_type']
37 defaults = {
38 'control_order': -75,
39 'estimate_on_node': num_nodes + 1 if quad_type == 'GAUSS' else num_nodes - 1,
40 'rel_error': False,
41 **super().setup(controller, params, description, **kwargs),
42 }
43 self.comm = description['sweeper_params'].get('comm', None)
45 if self.comm:
46 from mpi4py import MPI
48 self.prepare_MPI_datatypes()
49 self.MPI_SUM = MPI.SUM
51 controller.add_hook(LogEmbeddedErrorEstimate)
52 self.check_convergence = CheckConvergence.check_convergence
54 if quad_type != 'GAUSS' and defaults['estimate_on_node'] > num_nodes:
55 from pySDC.core.errors import ParameterError
57 raise ParameterError(
58 'You cannot interpolate with lower accuracy to the end point if the end point is a node!'
59 )
61 self.interpolation_matrix = None
63 return defaults
65 def reset_status_variables(self, *args, **kwargs):
66 """
67 Add variable for embedded error
69 Returns:
70 None
71 """
72 self.add_status_variable_to_level('error_embedded_estimate')
73 self.add_status_variable_to_level('order_embedded_estimate')
75 def matmul(self, A, b, xp=np):
76 """
77 Matrix vector multiplication, possibly MPI parallel.
78 The parallel implementation performs a reduce operation in every row of the matrix. While communicating the
79 entire vector once could reduce the number of communications, this way we never need to store the entire vector
80 on any specific rank.
82 Args:
83 A (2d np.ndarray): Matrix
84 b (list): Vector
85 xp: Either numpy or cupy
87 Returns:
88 List: Axb
89 """
91 if self.comm:
92 res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])]
93 buf = b[0] * 0.0
94 for i in range(0, A.shape[0]):
95 index = self.comm.rank + (1 if self.comm.rank < self.params.estimate_on_node - 1 else 0)
96 send_buf = (
97 (A[i, index] * b[index])
98 if self.comm.rank != self.params.estimate_on_node - 1
99 else xp.zeros_like(res[0])
100 )
101 self.comm.Allreduce(send_buf, buf, op=self.MPI_SUM)
102 res[i] += buf
103 return res
104 else:
105 return A @ xp.asarray(b)
107 def get_interpolated_solution(self, L, xp):
108 """
109 Get the interpolated solution for numpy or cupy data types
111 Args:
112 u_vec (array): Vector of solutions
113 prob (pySDC.problem): Problem
114 """
115 coll = L.sweep.coll
117 u = [
118 L.u[i].flatten() if L.u[i] is not None else L.u[i]
119 for i in range(coll.num_nodes + 1)
120 if i != self.params.estimate_on_node
121 ]
122 return self.matmul(self.interpolation_matrix, u, xp=xp)[0].reshape(L.prob.init[0])
124 def post_iteration_processing(self, controller, S, **kwargs):
125 """
126 Estimate the error
128 Args:
129 controller (pySDC.Controller.controller): The controller
130 S (pySDC.Step.step): The current step
132 Returns:
133 None
134 """
136 if self.check_convergence(S):
137 L = S.levels[0]
138 coll = L.sweep.coll
139 nodes = np.append(np.append(0, coll.nodes), 1.0)
140 estimate_on_node = self.params.estimate_on_node
142 if hasattr(L.u[0], 'xp'):
143 xp = L.u[0].xp
144 else:
145 xp = np
147 if self.interpolation_matrix is None:
148 interpolator = LagrangeApproximation(
149 points=[nodes[i] for i in range(coll.num_nodes + 1) if i != estimate_on_node]
150 )
151 self.interpolation_matrix = xp.array(interpolator.getInterpolationMatrix([nodes[estimate_on_node]]))
153 u_inter = self.get_interpolated_solution(L, xp)
155 # compute end point if needed
156 if estimate_on_node == len(nodes) - 1:
157 if L.uend is None:
158 L.sweep.compute_end_point()
159 high_order_sol = L.uend
160 rank = 0
161 L.status.order_embedded_estimate = coll.num_nodes + 1
162 else:
163 high_order_sol = L.u[estimate_on_node]
164 rank = estimate_on_node - 1
165 L.status.order_embedded_estimate = coll.num_nodes * 1
167 rescale = float(abs(u_inter)) if self.params.rel_error else 1
169 if self.comm:
170 buf = np.array(abs(u_inter - high_order_sol) / rescale if self.comm.rank == rank else 0.0)
171 self.comm.Bcast(buf, root=rank)
172 L.status.error_embedded_estimate = float(buf)
173 else:
174 L.status.error_embedded_estimate = abs(u_inter - high_order_sol) / rescale
176 self.debug(
177 f'Obtained error estimate: {L.status.error_embedded_estimate:.2e} of order {L.status.order_embedded_estimate}',
178 S,
179 )
181 def check_parameters(self, controller, params, description, **kwargs):
182 """
183 Check if we allow the scheme to solve the collocation problems to convergence.
185 Args:
186 controller (pySDC.Controller): The controller
187 params (dict): The params passed for this specific convergence controller
188 description (dict): The description object used to instantiate the controller
190 Returns:
191 bool: Whether the parameters are compatible
192 str: The error message
193 """
194 if description['sweeper_params'].get('num_nodes', 0) < 2:
195 return False, 'Need at least two collocation nodes to interpolate to one!'
197 return True, ""
200class EstimatePolynomialErrorFiredrake(EstimatePolynomialError):
201 def matmul(self, A, b):
202 """
203 Matrix vector multiplication, possibly MPI parallel.
204 The parallel implementation performs a reduce operation in every row of the matrix. While communicating the
205 entire vector once could reduce the number of communications, this way we never need to store the entire vector
206 on any specific rank.
208 Args:
209 A (2d np.ndarray): Matrix
210 b (list): Vector
212 Returns:
213 List: Axb
214 """
216 if self.comm:
217 res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])]
218 buf = 0 * b[0]
219 for i in range(0, A.shape[0]):
220 index = self.comm.rank + (1 if self.comm.rank < self.params.estimate_on_node - 1 else 0)
221 send_buf = (
222 (A[i, index] * b[index]) if self.comm.rank != self.params.estimate_on_node - 1 else 0 * res[0]
223 )
224 self.comm.Allreduce(send_buf, buf, op=self.MPI_SUM)
225 res[i] += buf
226 return res
227 else:
228 res = []
229 for i in range(A.shape[0]):
230 res.append(A[i, 0] * b[0])
231 for j in range(1, A.shape[1]):
232 res[-1] += A[i, j] * b[j]
234 return res
236 def get_interpolated_solution(self, L):
237 """
238 Get the interpolated solution for Firedrake data types
239 We are not 100% sure that you don't need to invert the mass matrix here, but should be fine.
241 Args:
242 u_vec (array): Vector of solutions
243 prob (pySDC.problem): Problem
244 """
245 coll = L.sweep.coll
247 u = [
248 L.u[i] if L.u[i] is not None else L.u[i]
249 for i in range(coll.num_nodes + 1)
250 if i != self.params.estimate_on_node
251 ]
252 return L.prob.dtype_u(self.matmul(self.interpolation_matrix, u)[0])
253 # return L.prob.invert_mass_matrix(self.matmul(self.interpolation_matrix, u)[0])