Coverage for pySDC/implementations/sweeper_classes/imex_1st_order.py: 99%

73 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-09 14:59 +0000

1import numpy as np 

2 

3from pySDC.core.sweeper import Sweeper 

4 

5 

6class imex_1st_order(Sweeper): 

7 """ 

8 Custom sweeper class, implements Sweeper.py 

9 

10 First-order IMEX sweeper using implicit/explicit Euler as base integrator 

11 

12 Attributes: 

13 QI: implicit Euler integration matrix 

14 QE: explicit Euler integration matrix 

15 """ 

16 

17 def __init__(self, params): 

18 """ 

19 Initialization routine for the custom sweeper 

20 

21 Args: 

22 params: parameters for the sweeper 

23 """ 

24 

25 if 'QI' not in params: 

26 params['QI'] = 'IE' 

27 if 'QE' not in params: 

28 params['QE'] = 'EE' 

29 

30 # call parent's initialization routine 

31 super().__init__(params) 

32 

33 # IMEX integration matrices 

34 self.QI = self.get_Qdelta_implicit(qd_type=self.params.QI) 

35 self.QE = self.get_Qdelta_explicit(qd_type=self.params.QE) 

36 

37 def integrate(self): 

38 """ 

39 Integrates the right-hand side (here impl + expl) 

40 

41 Returns: 

42 list of dtype_u: containing the integral as values 

43 """ 

44 

45 L = self.level 

46 P = L.prob 

47 

48 me = [] 

49 # integrate RHS over all collocation nodes 

50 for m in range(1, self.coll.num_nodes + 1): 

51 me.append(P.dtype_u(P.init, val=0.0)) 

52 for j in range(1, self.coll.num_nodes + 1): 

53 me[m - 1] += L.dt * self.coll.Qmat[m, j] * (L.f[j].impl + L.f[j].expl) 

54 

55 return me 

56 

57 def update_nodes(self): 

58 """ 

59 Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes 

60 

61 Returns: 

62 None 

63 """ 

64 

65 # get current level and problem description 

66 L = self.level 

67 P = L.prob 

68 

69 # only if the level has been touched before 

70 assert L.status.unlocked 

71 

72 # get number of collocation nodes for easier access 

73 M = self.coll.num_nodes 

74 

75 # gather all terms which are known already (e.g. from the previous iteration) 

76 # this corresponds to u0 + QF(u^k) - QIFI(u^k) - QEFE(u^k) + tau 

77 

78 # get QF(u^k) 

79 integral = self.integrate() 

80 for m in range(M): 

81 # subtract QIFI(u^k)_m + QEFE(u^k)_m 

82 for j in range(1, M + 1): 

83 integral[m] -= L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl) 

84 # add initial value 

85 integral[m] += L.u[0] 

86 # add tau if associated 

87 if L.tau[m] is not None: 

88 integral[m] += L.tau[m] 

89 

90 # do the sweep 

91 for m in range(0, M): 

92 # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) 

93 rhs = P.dtype_u(integral[m]) 

94 for j in range(1, m + 1): 

95 rhs += L.dt * (self.QI[m + 1, j] * L.f[j].impl + self.QE[m + 1, j] * L.f[j].expl) 

96 

97 # implicit solve with prefactor stemming from QI 

98 L.u[m + 1] = P.solve_system( 

99 rhs, L.dt * self.QI[m + 1, m + 1], L.u[m + 1], L.time + L.dt * self.coll.nodes[m] 

100 ) 

101 

102 # update function values 

103 L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m]) 

104 

105 # indicate presence of new values at this level 

106 L.status.updated = True 

107 

108 return None 

109 

110 def compute_end_point(self): 

111 """ 

112 Compute u at the right point of the interval 

113 

114 The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False 

115 

116 Returns: 

117 None 

118 """ 

119 

120 # get current level and problem description 

121 L = self.level 

122 P = L.prob 

123 

124 # check if Mth node is equal to right point and do_coll_update is false, perform a simple copy 

125 if self.coll.right_is_node and not self.params.do_coll_update: 

126 # a copy is sufficient 

127 L.uend = P.dtype_u(L.u[-1]) 

128 else: 

129 # start with u0 and add integral over the full interval (using coll.weights) 

130 L.uend = P.dtype_u(L.u[0]) 

131 for m in range(self.coll.num_nodes): 

132 L.uend += L.dt * self.coll.weights[m] * (L.f[m + 1].impl + L.f[m + 1].expl) 

133 # add up tau correction of the full interval (last entry) 

134 if L.tau[-1] is not None: 

135 L.uend += L.tau[-1] 

136 

137 return None 

138 

139 def get_sweeper_mats(self): 

140 """ 

141 Returns the three matrices Q, QI, QE which define the sweeper. 

142 

143 The first row and column, corresponding to the left starting value, are removed to correspond to the notation 

144 introduced in Ruprecht & Speck, Spectral deferred corrections with fast-wave slow-wave splitting, 2016 

145 """ 

146 QE = self.QE[1:, 1:] 

147 QI = self.QI[1:, 1:] 

148 Q = self.coll.Qmat[1:, 1:] 

149 return QE, QI, Q 

150 

151 def get_scalar_problems_sweeper_mats(self, lambdas=None): 

152 """ 

153 This function returns the corresponding matrices of an IMEX-SDC sweep in matrix formulation. 

154 

155 See Ruprecht & Speck, Spectral deferred corrections with fast-wave slow-wave splitting, 2016 for the derivation. 

156 

157 Args: 

158 lambdas (numpy.ndarray): the first entry in lambdas is lambda_fast, the second is lambda_slow. 

159 """ 

160 QE, QI, Q = self.get_sweeper_mats() 

161 if lambdas is None: 

162 pass 

163 # should use lambdas from attached problem and make sure it is a scalar IMEX 

164 raise NotImplementedError("At the moment, the values for lambda have to be provided") 

165 else: 

166 lambda_fast = lambdas[0] 

167 lambda_slow = lambdas[1] 

168 nnodes = self.coll.num_nodes 

169 dt = self.level.dt 

170 LHS = np.eye(nnodes) - dt * (lambda_fast * QI + lambda_slow * QE) 

171 RHS = dt * ((lambda_fast + lambda_slow) * Q - (lambda_fast * QI + lambda_slow * QE)) 

172 return LHS, RHS 

173 

174 def get_scalar_problems_manysweep_mat(self, nsweeps, lambdas=None): 

175 """ 

176 For a scalar problem, K sweeps of IMEX-SDC can be written in matrix form. 

177 

178 Args: 

179 nsweeps (int): number of sweeps 

180 lambdas (numpy.ndarray): the first entry in lambdas is lambda_fast, the second is lambda_slow. 

181 """ 

182 LHS, RHS = self.get_scalar_problems_sweeper_mats(lambdas=lambdas) 

183 Pinv = np.linalg.inv(LHS) 

184 Mat_sweep = np.linalg.matrix_power(Pinv.dot(RHS), nsweeps) 

185 for k in range(0, nsweeps): 

186 Mat_sweep += np.linalg.matrix_power(Pinv.dot(RHS), k).dot(Pinv) 

187 return Mat_sweep