Coverage for pySDC / projects / StroemungsRaum / sweepers / generic_implicit_mass.py: 87%
53 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 15:19 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 15:19 +0000
1from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
4class generic_implicit_mass(generic_implicit):
6 def update_nodes(self):
7 """
8 This sweeper extends the generic_implicit sweeper from the implementations
9 package for a monolithic discretization of the incompressible Navier-Stokes
10 equations with a mass matrix. It updates the solution and right-hand side
11 values at all collocation nodes during a single sweep.
13 Returns:
14 None
15 """
17 L = self.level
18 P = L.prob
20 # only if the level has been touched before
21 assert L.status.unlocked
23 # get number of collocation nodes for easier access
24 M = self.coll.num_nodes
26 # update the MIN-SR-FLEX preconditioner
27 if self.params.QI == 'MIN-SR-FLEX':
28 self.QI = self.get_Qdelta_implicit(qd_type="MIN-SR-FLEX", k=L.status.sweep)
30 # gather all terms which are known already (e.g. from the previous iteration)
31 # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau
33 # get QF(u^k)
34 integral = self.integrate()
36 # This is somewhat ugly, but we have to apply the mass matrix on u0 only on the finest level
37 if L.level_index == 0:
38 u0 = P.apply_mass_matrix(L.u[0])
39 else:
40 u0 = L.u[0]
42 for m in range(M):
43 # get -QdF(u^k)_m
44 for j in range(1, M + 1):
45 integral[m] -= L.dt * self.QI[m + 1, j] * L.f[j]
47 # add initial value
48 integral[m] += u0
49 # add tau if associated
50 if L.tau[m] is not None:
51 integral[m] += L.tau[m]
53 # do the sweep
54 for m in range(0, M):
55 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
56 rhs = P.dtype_u(integral[m])
57 for j in range(1, m + 1):
58 rhs += L.dt * self.QI[m + 1, j] * L.f[j]
60 # implicit solve with prefactor stemming from the diagonal of Qd
61 L.u[m + 1] = P.solve_system(
62 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m]
63 )
65 # update function values
66 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
68 # indicate presence of new values at this level
69 L.status.updated = True
71 return None
73 def compute_end_point(self):
74 """
75 Compute u at the right point of the interval
77 The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False
79 Returns:
80 None
81 """
83 # get current level and problem description
84 L = self.level
85 P = L.prob
87 # check if Mth node is equal to right point and do_coll_update is false, perform a simple copy
88 if self.coll.right_is_node and not self.params.do_coll_update:
89 # a copy is sufficient
90 L.uend = P.dtype_u(L.u[-1])
91 else:
92 raise NotImplementedError('Mass matrix sweeper expect u_M = u_end')
94 return None
96 def compute_residual(self, stage=None):
97 """
98 Computation of the residual using the collocation matrix Q
100 Args:
101 stage (str): The current stage of the step the level belongs to
102 """
104 # get current level and problem description
105 L = self.level
106 P = L.prob
108 # Check if we want to skip the residual computation to gain performance
109 # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual!
110 if stage in self.params.skip_residual_computation:
111 L.status.residual = 0.0 if L.status.residual is None else L.status.residual
112 return None
114 # check if there are new values (e.g. from a sweep)
115 # assert L.status.updated
117 # compute the residual for each node
119 # build QF(u)
120 res_norm = []
121 res = self.integrate()
122 for m in range(self.coll.num_nodes):
124 # This is somewhat ugly, but we have to apply the mass matrix on u0 only on the finest level
126 if L.level_index == 0:
127 res[m] += P.apply_mass_matrix(L.u[0] - L.u[m + 1])
128 else:
129 res[m] += L.u[0] - P.apply_mass_matrix(L.u[m + 1])
130 # add tau if associated
131 if L.tau[m] is not None:
132 res[m] += L.tau[m]
134 # Due to different boundary conditions we might have to fix the residual
135 if L.prob.fix_bc_for_residual:
136 L.prob.fix_residual(res[m])
137 # use abs function from data type here
138 res_norm.append(abs(res[m]))
140 # find maximal residual over the nodes
141 L.status.residual = max(res_norm)
143 # indicate that the residual has seen the new values
144 L.status.updated = False
146 return None