Coverage for pySDC/implementations/sweeper_classes/ParaDiagSweepers.py: 89%
71 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 06:49 +0000
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-21 06:49 +0000
1"""
2These sweepers are made for use with ParaDiag. They can be used to some degree with SDC as well, but unless you know what you are doing, you probably want another sweeper.
3"""
5from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
6from pySDC.implementations.sweeper_classes.imex_1st_order import imex_1st_order
7import numpy as np
8import scipy.sparse as sp
11class QDiagonalization(generic_implicit):
12 """
13 Sweeper solving the collocation problem directly via diagonalization of Q. Mainly made for ParaDiag.
14 Can be reconfigured for use with SDC.
16 Note that the initial conditions for the collocation problem are generally stored in node zero in pySDC. However,
17 this sweeper is intended for ParaDiag, where a node-local residual is needed as a right hand side for this sweeper
18 rather than a step local one. Therefore, this sweeper has an option `ignore_ic`. If true, the value in node zero
19 will only be used in computing the step-local residual, but not in the solves. If false, the values on the nodes
20 will be ignored in the solves and the node-zero value will be used as initial conditions. When using this as a time-
21 parallel algorithm outside ParaDiag, you should set this parameter to false, which is not the default!
23 Similarly, in ParaDiag, the solution is in Fourier space right after the solve. It therefore makes little sense to
24 evaluate the right hand side directly after. By default, this is not done! Set `update_f_evals=True` in the
25 parameters if you want to use this sweeper in SDC.
26 """
28 def __init__(self, params, level):
29 """
30 Initialization routine for the custom sweeper
32 Args:
33 params: parameters for the sweeper
34 level (pySDC.Level.level): the level that uses this sweeper
35 """
36 if 'G_inv' not in params.keys():
37 params['G_inv'] = np.eye(params['num_nodes'])
38 params['update_f_evals'] = params.get('update_f_evals', False)
39 params['ignore_ic'] = params.get('ignore_ic', True)
41 super().__init__(params, level)
43 self.set_G_inv(self.params.G_inv)
45 def set_G_inv(self, G_inv):
46 """
47 In ParaDiag, QG^{-1} is diagonalized. This function stores the G_inv matrix and computes and stores the diagonalization.
48 """
49 self.params.G_inv = G_inv
50 self.w, self.S, self.S_inv = self.computeDiagonalization(A=self.coll.Qmat[1:, 1:] @ self.params.G_inv)
52 @staticmethod
53 def computeDiagonalization(A):
54 """
55 Compute diagonalization of dense matrix A = S diag(w) S^-1
57 Args:
58 A (numpy.ndarray): dense matrix to diagonalize
60 Returns:
61 numpy.array: Diagonal entries of the diagonalized matrix w
62 numpy.ndarray: Matrix of eigenvectors S
63 numpy.ndarray: Inverse of S
64 """
65 w, S = np.linalg.eig(A)
66 S_inv = np.linalg.inv(S)
67 assert np.allclose(S @ np.diag(w) @ S_inv, A)
68 return w, S, S_inv
70 def mat_vec(self, mat, vec):
71 """
72 Compute matrix-vector multiplication. Vector can be list.
74 Args:
75 mat: Matrix
76 vec: Vector
78 Returns:
79 list: mat @ vec
80 """
81 assert mat.shape[1] == len(vec)
82 result = []
83 for m in range(mat.shape[0]):
84 result.append(self.level.prob.u_init)
85 for j in range(mat.shape[1]):
86 result[-1] += mat[m, j] * vec[j]
87 return result
89 def update_nodes(self):
90 """
91 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
93 Returns:
94 None
95 """
97 L = self.level
98 P = L.prob
99 M = self.coll.num_nodes
101 if L.tau[0] is not None:
102 raise NotImplementedError('This sweeper does not work with multi-level SDC')
104 # perform local solves on the collocation nodes, can be parallelized!
105 if self.params.ignore_ic:
106 x1 = self.mat_vec(self.S_inv, [self.level.residual[m] for m in range(M)])
107 else:
108 x1 = self.mat_vec(self.S_inv, [self.level.u[0] for _ in range(M)])
110 # get averaged state over all nodes for constructing the Jacobian
111 u_avg = P.u_init
112 if not any(me is None for me in L.u_avg):
113 for m in range(M):
114 u_avg += L.u_avg[m] / M
116 x2 = []
117 for m in range(M):
118 x2.append(P.solve_jacobian(x1[m], self.w[m] * L.dt, u=u_avg, t=L.time + L.dt * self.coll.nodes[m]))
119 z = self.mat_vec(self.S, x2)
120 y = self.mat_vec(self.params.G_inv, z)
122 # update solution and evaluate right hand side
123 for m in range(M):
124 if self.params.ignore_ic:
125 L.increment[m] = y[m]
126 else:
127 L.u[m + 1] = y[m]
128 if self.params.update_f_evals:
129 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
131 L.status.updated = True
132 return None
134 def eval_f_at_all_nodes(self):
135 L = self.level
136 P = self.level.prob
137 for m in range(self.coll.num_nodes):
138 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
140 def get_residual(self):
141 """
142 This function computes and returns the "spatially extended" residual, not the norm of the residual!
144 Returns:
145 pySDC.datatype: Spatially extended residual
147 """
148 self.eval_f_at_all_nodes()
150 # start with integral dt*Q*F
151 residual = self.integrate()
153 # subtract u and add u0 to arrive at r = dt*Q*F - u + u0
154 for m in range(self.coll.num_nodes):
155 residual[m] -= self.level.u[m + 1]
156 residual[m] += self.level.u[0]
158 return residual
160 def compute_residual(self, *args, **kwargs):
161 self.eval_f_at_all_nodes()
162 return super().compute_residual(*args, **kwargs)
165class QDiagonalizationIMEX(QDiagonalization):
166 """
167 Use as sweeper class for ParaDiag with IMEX splitting. Note that it will not work with SDC.
168 """
170 integrate = imex_1st_order.integrate