Coverage for pySDC / core / problem.py: 96%
45 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 09:00 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 09:00 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Description
5-----------
7Module containing the base Problem class for pySDC
8"""
10import logging
11from typing import Any, Dict, Optional, Type, Callable
13from pySDC.core.common import RegisterParams
16class WorkCounter(object):
17 """
18 Utility class for counting iterations.
20 Contains one attribute `niter` initialized to zero during
21 instantiation, which can be incremented by calling object as
22 a function, e.g
24 >>> count = WorkCounter() # => niter = 0
25 >>> count() # => niter = 1
26 >>> count() # => niter = 2
27 """
29 def __init__(self) -> None:
30 self.niter: int = 0
32 def __call__(self, *args: Any, **kwargs: Any) -> None:
33 # *args and **kwargs are necessary for gmres
34 self.niter += 1
36 def decrement(self) -> None:
37 self.niter -= 1
39 def __str__(self) -> str:
40 return f'{self.niter}'
43class Problem(RegisterParams):
44 """
45 Prototype class for problems, just defines the attributes essential to get started.
47 Parameters
48 ----------
49 init : list of args
50 Argument(s) used to initialize data types.
51 dtype_u : type
52 Variable data type. Should generate a data variable using dtype_u(init).
53 dtype_f : type
54 RHS data type. Should generate a data variable using dtype_f(init).
56 Attributes
57 ----------
58 logger: logging.Logger
59 custom logger for problem-related logging.
60 """
62 logger: logging.Logger = logging.getLogger('problem')
63 dtype_u: Optional[Type[Any]] = None
64 dtype_f: Optional[Type[Any]] = None
66 def __init__(self, init: Any) -> None:
67 self.work_counters: Dict[str, WorkCounter] = {} # Dictionary to store WorkCounter objects
68 self.init: Any = init # Initialization parameter to instantiate data types
70 @property
71 def u_init(self) -> Any:
72 """Generate a data variable for u"""
73 return self.dtype_u(self.init)
75 @property
76 def f_init(self) -> Any:
77 """Generate a data variable for RHS"""
78 return self.dtype_f(self.init)
80 @classmethod
81 def get_default_sweeper_class(cls) -> Type[Any]:
82 raise NotImplementedError(f'No default sweeper class implemented for {cls} problem!')
84 def setUpFieldsIO(self) -> None:
85 """
86 Set up FieldsIO for MPI with the space decomposition of this problem
87 """
88 pass
90 def getOutputFile(self, fileName: str) -> Any:
91 raise NotImplementedError(f'No output implemented file for {type(self).__name__}')
93 def processSolutionForOutput(self, u: Any) -> Any:
94 return u
96 def eval_f(self, u: Any, t: float) -> Any:
97 """
98 Abstract interface to RHS computation of the ODE
100 Parameters
101 ----------
102 u : dtype_u
103 Current values.
104 t : float
105 Current time.
107 Returns
108 -------
109 f : dtype_f
110 The RHS values.
111 """
112 raise NotImplementedError('ERROR: problem has to implement eval_f(self, u, t)')
114 def apply_mass_matrix(self, u: Any) -> Any: # pragma: no cover
115 """Default mass matrix : identity"""
116 return u
118 def generate_scipy_reference_solution(
119 self, eval_rhs: Callable, t: float, u_init: Optional[Any] = None, t_init: Optional[float] = None, **kwargs: Any
120 ) -> Any:
121 """
122 Compute a reference solution using `scipy.solve_ivp` with very small tolerances.
123 Keep in mind that scipy needs the solution to be a one dimensional array. If you are solving something higher
124 dimensional, you need to make sure the function `eval_rhs` takes a flattened one-dimensional version as an input
125 and output, but reshapes to whatever the problem needs for evaluation.
127 The keyword arguments will be passed to `scipy.solve_ivp`. You should consider passing `method='BDF'` for stiff
128 problems and to accelerate that you can pass a function that evaluates the Jacobian with arguments `jac(t, u)`
129 as `jac=jac`.
131 Args:
132 eval_rhs (function): Function evaluate the full right hand side. Must have signature `eval_rhs(float: t, numpy.1darray: u)`
133 t (float): current time
134 u_init (pySDC.implementations.problem_classes.Lorenz.dtype_u): initial conditions for getting the exact solution
135 t_init (float): the starting time
137 Returns:
138 numpy.ndarray: Reference solution
139 """
140 import numpy as np
141 from scipy.integrate import solve_ivp
143 kwargs = {
144 'atol': 100 * np.finfo(float).eps,
145 'rtol': 100 * np.finfo(float).eps,
146 **kwargs,
147 }
148 u_init = self.u_exact(t=0) if u_init is None else u_init * 1.0
149 t_init = 0 if t_init is None else t_init
151 u_shape = u_init.shape
152 return solve_ivp(eval_rhs, (t_init, t), u_init.flatten(), **kwargs).y[:, -1].reshape(u_shape)
154 def get_fig(self) -> Any:
155 """
156 Get a figure suitable to plot the solution of this problem
158 Returns
159 -------
160 self.fig : matplotlib.pyplot.figure.Figure
161 """
162 raise NotImplementedError
164 def plot(self, u: Any, t: Optional[float] = None, fig: Optional[Any] = None) -> None:
165 r"""
166 Plot the solution. Please supply a figure with the same structure as returned by ``self.get_fig``.
168 Parameters
169 ----------
170 u : dtype_u
171 Solution to be plotted
172 t : float
173 Time to display at the top of the figure
174 fig : matplotlib.pyplot.figure.Figure
175 Figure with the correct structure
177 Returns
178 -------
179 None
180 """
181 raise NotImplementedError
183 def solve_system(self, rhs: Any, dt: float, u0: Any, t: float) -> Any:
184 """
185 Perform an Euler step.
187 Args:
188 rhs: Right hand side for the Euler step
189 dt (float): Step size for the Euler step
190 u0: Initial guess
191 t (float): Current time
193 Returns:
194 solution to the Euler step
195 """
196 raise NotImplementedError
198 def solve_jacobian(
199 self, rhs: Any, dt: float, u: Optional[Any] = None, u0: Optional[Any] = None, t: float = 0, **kwargs: Any
200 ) -> Any:
201 """
202 Solve the Jacobian for an Euler step, linearized around u.
203 This defaults to an Euler step to accommodate linear problems.
205 Args:
206 rhs: Right hand side for the Euler step
207 dt (float): Step size for the Euler step
208 u: Solution to linearize around
209 u0: Initial guess
210 t (float): Current time
212 Returns:
213 Solution
214 """
215 return self.solve_system(rhs, dt, u0=u, t=t, **kwargs)