Coverage for pySDC / core / problem.py: 96%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 09:00 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4Description 

5----------- 

6 

7Module containing the base Problem class for pySDC 

8""" 

9 

10import logging 

11from typing import Any, Dict, Optional, Type, Callable 

12 

13from pySDC.core.common import RegisterParams 

14 

15 

16class WorkCounter(object): 

17 """ 

18 Utility class for counting iterations. 

19 

20 Contains one attribute `niter` initialized to zero during 

21 instantiation, which can be incremented by calling object as 

22 a function, e.g 

23 

24 >>> count = WorkCounter() # => niter = 0 

25 >>> count() # => niter = 1 

26 >>> count() # => niter = 2 

27 """ 

28 

29 def __init__(self) -> None: 

30 self.niter: int = 0 

31 

32 def __call__(self, *args: Any, **kwargs: Any) -> None: 

33 # *args and **kwargs are necessary for gmres 

34 self.niter += 1 

35 

36 def decrement(self) -> None: 

37 self.niter -= 1 

38 

39 def __str__(self) -> str: 

40 return f'{self.niter}' 

41 

42 

43class Problem(RegisterParams): 

44 """ 

45 Prototype class for problems, just defines the attributes essential to get started. 

46 

47 Parameters 

48 ---------- 

49 init : list of args 

50 Argument(s) used to initialize data types. 

51 dtype_u : type 

52 Variable data type. Should generate a data variable using dtype_u(init). 

53 dtype_f : type 

54 RHS data type. Should generate a data variable using dtype_f(init). 

55 

56 Attributes 

57 ---------- 

58 logger: logging.Logger 

59 custom logger for problem-related logging. 

60 """ 

61 

62 logger: logging.Logger = logging.getLogger('problem') 

63 dtype_u: Optional[Type[Any]] = None 

64 dtype_f: Optional[Type[Any]] = None 

65 

66 def __init__(self, init: Any) -> None: 

67 self.work_counters: Dict[str, WorkCounter] = {} # Dictionary to store WorkCounter objects 

68 self.init: Any = init # Initialization parameter to instantiate data types 

69 

70 @property 

71 def u_init(self) -> Any: 

72 """Generate a data variable for u""" 

73 return self.dtype_u(self.init) 

74 

75 @property 

76 def f_init(self) -> Any: 

77 """Generate a data variable for RHS""" 

78 return self.dtype_f(self.init) 

79 

80 @classmethod 

81 def get_default_sweeper_class(cls) -> Type[Any]: 

82 raise NotImplementedError(f'No default sweeper class implemented for {cls} problem!') 

83 

84 def setUpFieldsIO(self) -> None: 

85 """ 

86 Set up FieldsIO for MPI with the space decomposition of this problem 

87 """ 

88 pass 

89 

90 def getOutputFile(self, fileName: str) -> Any: 

91 raise NotImplementedError(f'No output implemented file for {type(self).__name__}') 

92 

93 def processSolutionForOutput(self, u: Any) -> Any: 

94 return u 

95 

96 def eval_f(self, u: Any, t: float) -> Any: 

97 """ 

98 Abstract interface to RHS computation of the ODE 

99 

100 Parameters 

101 ---------- 

102 u : dtype_u 

103 Current values. 

104 t : float 

105 Current time. 

106 

107 Returns 

108 ------- 

109 f : dtype_f 

110 The RHS values. 

111 """ 

112 raise NotImplementedError('ERROR: problem has to implement eval_f(self, u, t)') 

113 

114 def apply_mass_matrix(self, u: Any) -> Any: # pragma: no cover 

115 """Default mass matrix : identity""" 

116 return u 

117 

118 def generate_scipy_reference_solution( 

119 self, eval_rhs: Callable, t: float, u_init: Optional[Any] = None, t_init: Optional[float] = None, **kwargs: Any 

120 ) -> Any: 

121 """ 

122 Compute a reference solution using `scipy.solve_ivp` with very small tolerances. 

123 Keep in mind that scipy needs the solution to be a one dimensional array. If you are solving something higher 

124 dimensional, you need to make sure the function `eval_rhs` takes a flattened one-dimensional version as an input 

125 and output, but reshapes to whatever the problem needs for evaluation. 

126 

127 The keyword arguments will be passed to `scipy.solve_ivp`. You should consider passing `method='BDF'` for stiff 

128 problems and to accelerate that you can pass a function that evaluates the Jacobian with arguments `jac(t, u)` 

129 as `jac=jac`. 

130 

131 Args: 

132 eval_rhs (function): Function evaluate the full right hand side. Must have signature `eval_rhs(float: t, numpy.1darray: u)` 

133 t (float): current time 

134 u_init (pySDC.implementations.problem_classes.Lorenz.dtype_u): initial conditions for getting the exact solution 

135 t_init (float): the starting time 

136 

137 Returns: 

138 numpy.ndarray: Reference solution 

139 """ 

140 import numpy as np 

141 from scipy.integrate import solve_ivp 

142 

143 kwargs = { 

144 'atol': 100 * np.finfo(float).eps, 

145 'rtol': 100 * np.finfo(float).eps, 

146 **kwargs, 

147 } 

148 u_init = self.u_exact(t=0) if u_init is None else u_init * 1.0 

149 t_init = 0 if t_init is None else t_init 

150 

151 u_shape = u_init.shape 

152 return solve_ivp(eval_rhs, (t_init, t), u_init.flatten(), **kwargs).y[:, -1].reshape(u_shape) 

153 

154 def get_fig(self) -> Any: 

155 """ 

156 Get a figure suitable to plot the solution of this problem 

157 

158 Returns 

159 ------- 

160 self.fig : matplotlib.pyplot.figure.Figure 

161 """ 

162 raise NotImplementedError 

163 

164 def plot(self, u: Any, t: Optional[float] = None, fig: Optional[Any] = None) -> None: 

165 r""" 

166 Plot the solution. Please supply a figure with the same structure as returned by ``self.get_fig``. 

167 

168 Parameters 

169 ---------- 

170 u : dtype_u 

171 Solution to be plotted 

172 t : float 

173 Time to display at the top of the figure 

174 fig : matplotlib.pyplot.figure.Figure 

175 Figure with the correct structure 

176 

177 Returns 

178 ------- 

179 None 

180 """ 

181 raise NotImplementedError 

182 

183 def solve_system(self, rhs: Any, dt: float, u0: Any, t: float) -> Any: 

184 """ 

185 Perform an Euler step. 

186 

187 Args: 

188 rhs: Right hand side for the Euler step 

189 dt (float): Step size for the Euler step 

190 u0: Initial guess 

191 t (float): Current time 

192 

193 Returns: 

194 solution to the Euler step 

195 """ 

196 raise NotImplementedError 

197 

198 def solve_jacobian( 

199 self, rhs: Any, dt: float, u: Optional[Any] = None, u0: Optional[Any] = None, t: float = 0, **kwargs: Any 

200 ) -> Any: 

201 """ 

202 Solve the Jacobian for an Euler step, linearized around u. 

203 This defaults to an Euler step to accommodate linear problems. 

204 

205 Args: 

206 rhs: Right hand side for the Euler step 

207 dt (float): Step size for the Euler step 

208 u: Solution to linearize around 

209 u0: Initial guess 

210 t (float): Current time 

211 

212 Returns: 

213 Solution 

214 """ 

215 return self.solve_system(rhs, dt, u0=u, t=t, **kwargs)