#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Description
-----------
Module containing the base Problem class for pySDC
"""
import logging
from typing import Any, Dict, Optional, Type, Callable
from pySDC.core.common import RegisterParams
[docs]
class WorkCounter(object):
"""
Utility class for counting iterations.
Contains one attribute `niter` initialized to zero during
instantiation, which can be incremented by calling object as
a function, e.g
>>> count = WorkCounter() # => niter = 0
>>> count() # => niter = 1
>>> count() # => niter = 2
"""
def __init__(self) -> None:
self.niter: int = 0
def __call__(self, *args: Any, **kwargs: Any) -> None:
# *args and **kwargs are necessary for gmres
self.niter += 1
[docs]
def decrement(self) -> None:
self.niter -= 1
def __str__(self) -> str:
return f'{self.niter}'
[docs]
class Problem(RegisterParams):
"""
Prototype class for problems, just defines the attributes essential to get started.
Parameters
----------
init : list of args
Argument(s) used to initialize data types.
dtype_u : type
Variable data type. Should generate a data variable using dtype_u(init).
dtype_f : type
RHS data type. Should generate a data variable using dtype_f(init).
Attributes
----------
logger: logging.Logger
custom logger for problem-related logging.
"""
logger: logging.Logger = logging.getLogger('problem')
dtype_u: Optional[Type[Any]] = None
dtype_f: Optional[Type[Any]] = None
def __init__(self, init: Any) -> None:
self.work_counters: Dict[str, WorkCounter] = {} # Dictionary to store WorkCounter objects
self.init: Any = init # Initialization parameter to instantiate data types
@property
def u_init(self) -> Any:
"""Generate a data variable for u"""
return self.dtype_u(self.init)
@property
def f_init(self) -> Any:
"""Generate a data variable for RHS"""
return self.dtype_f(self.init)
[docs]
@classmethod
def get_default_sweeper_class(cls) -> Type[Any]:
raise NotImplementedError(f'No default sweeper class implemented for {cls} problem!')
[docs]
def setUpFieldsIO(self) -> None:
"""
Set up FieldsIO for MPI with the space decomposition of this problem
"""
pass
[docs]
def getOutputFile(self, fileName: str) -> Any:
raise NotImplementedError(f'No output implemented file for {type(self).__name__}')
[docs]
def processSolutionForOutput(self, u: Any) -> Any:
return u
[docs]
def eval_f(self, u: Any, t: float) -> Any:
"""
Abstract interface to RHS computation of the ODE
Parameters
----------
u : dtype_u
Current values.
t : float
Current time.
Returns
-------
f : dtype_f
The RHS values.
"""
raise NotImplementedError('ERROR: problem has to implement eval_f(self, u, t)')
[docs]
def apply_mass_matrix(self, u: Any) -> Any: # pragma: no cover
"""Default mass matrix : identity"""
return u
[docs]
def generate_scipy_reference_solution(
self, eval_rhs: Callable, t: float, u_init: Optional[Any] = None, t_init: Optional[float] = None, **kwargs: Any
) -> Any:
"""
Compute a reference solution using `scipy.solve_ivp` with very small tolerances.
Keep in mind that scipy needs the solution to be a one dimensional array. If you are solving something higher
dimensional, you need to make sure the function `eval_rhs` takes a flattened one-dimensional version as an input
and output, but reshapes to whatever the problem needs for evaluation.
The keyword arguments will be passed to `scipy.solve_ivp`. You should consider passing `method='BDF'` for stiff
problems and to accelerate that you can pass a function that evaluates the Jacobian with arguments `jac(t, u)`
as `jac=jac`.
Args:
eval_rhs (function): Function evaluate the full right hand side. Must have signature `eval_rhs(float: t, numpy.1darray: u)`
t (float): current time
u_init (pySDC.implementations.problem_classes.Lorenz.dtype_u): initial conditions for getting the exact solution
t_init (float): the starting time
Returns:
numpy.ndarray: Reference solution
"""
import numpy as np
from scipy.integrate import solve_ivp
kwargs = {
'atol': 100 * np.finfo(float).eps,
'rtol': 100 * np.finfo(float).eps,
**kwargs,
}
u_init = self.u_exact(t=0) if u_init is None else u_init * 1.0
t_init = 0 if t_init is None else t_init
u_shape = u_init.shape
return solve_ivp(eval_rhs, (t_init, t), u_init.flatten(), **kwargs).y[:, -1].reshape(u_shape)
[docs]
def get_fig(self) -> Any:
"""
Get a figure suitable to plot the solution of this problem
Returns
-------
self.fig : matplotlib.pyplot.figure.Figure
"""
raise NotImplementedError
[docs]
def plot(self, u: Any, t: Optional[float] = None, fig: Optional[Any] = None) -> None:
r"""
Plot the solution. Please supply a figure with the same structure as returned by ``self.get_fig``.
Parameters
----------
u : dtype_u
Solution to be plotted
t : float
Time to display at the top of the figure
fig : matplotlib.pyplot.figure.Figure
Figure with the correct structure
Returns
-------
None
"""
raise NotImplementedError
[docs]
def solve_system(self, rhs: Any, dt: float, u0: Any, t: float) -> Any:
"""
Perform an Euler step.
Args:
rhs: Right hand side for the Euler step
dt (float): Step size for the Euler step
u0: Initial guess
t (float): Current time
Returns:
solution to the Euler step
"""
raise NotImplementedError
[docs]
def solve_jacobian(
self, rhs: Any, dt: float, u: Optional[Any] = None, u0: Optional[Any] = None, t: float = 0, **kwargs: Any
) -> Any:
"""
Solve the Jacobian for an Euler step, linearized around u.
This defaults to an Euler step to accommodate linear problems.
Args:
rhs: Right hand side for the Euler step
dt (float): Step size for the Euler step
u: Solution to linearize around
u0: Initial guess
t (float): Current time
Returns:
Solution
"""
return self.solve_system(rhs, dt, u0=u, t=t, **kwargs)