1from pySDC.core.problem import Problem, WorkCounter
2from pySDC.implementations.datatype_classes.firedrake_mesh import firedrake_mesh, IMEX_firedrake_mesh
3import firedrake as fd
4import numpy as np
5from mpi4py import MPI
8class Heat1DForcedFiredrake(Problem):
9 r"""
10 Example implementing the forced one-dimensional heat equation with Dirichlet boundary conditions
12 .. math::
13 \frac{d u}{d t} = \nu \frac{d^2 u}{d x^2} + f
15 for :math:`x \in \Omega:=[0,1]`, where the forcing term :math:`f` is defined by
17 .. math::
18 f(x, t) = -\sin(\pi x) (\sin(t) - \nu \pi^2 \cos(t)).
20 For initial conditions with constant c
22 .. math::
23 u(x, 0) = \sin(\pi x) + c,
25 the exact solution is given by
27 .. math::
28 u(x, t) = \sin(\pi x)\cos(t) + c.
30 Here, the problem is discretized with finite elements using firedrake. Hence, the problem
31 is reformulated to the *weak formulation*
33 .. math:
34 \int_\Omega u_t v\,dx = - \nu \int_\Omega \nabla u \nabla v\,dx + \int_\Omega f v\,dx.
36 We invert the Laplacian implicitly and treat the forcing term explicitly.
37 The solvers for the arising variational problems are cached for multiple collocation nodes and step sizes.
39 Parameters
40 ----------
41 nvars : int, optional
42 Spatial resolution, i.e., numbers of degrees of freedom in space.
43 nu : float, optional
44 Diffusion coefficient :math:`\nu`.
45 c: float, optional
46 Constant for the Dirichlet boundary condition :math: `c`
47 LHS_cache_size : int, optional
48 Cache size for variational problem solvers
49 comm : MPI communicator, optional
50 Supply an MPI communicator for spatial parallelism
51 """
53 dtype_u = firedrake_mesh
54 dtype_f = IMEX_firedrake_mesh
56 def __init__(self, n=30, nu=0.1, c=0.0, order=4, LHS_cache_size=12, mesh=None, comm=None):
57 """
58 Initialization
60 Args:
61 n (int): Number of degrees of freedom
62 nu (float): Diffusion parameter
63 c (float): Boundary condition constant
64 order (int): Order of finite elements
65 LHS_cache_size (int): Size of the cache for solvers
66 mesh (Firedrake mesh, optional): Give a custom mesh, for instance from a hierarchy
67 comm (mpi4pi.Intracomm, optional): MPI communicator for spatial parallelism
68 """
69 comm = MPI.COMM_WORLD if comm is None else comm
71 # prepare Firedrake mesh and function space
72 self.mesh = fd.UnitIntervalMesh(n, comm=comm) if mesh is None else mesh
73 self.V = fd.FunctionSpace(self.mesh, "CG", order)
75 # prepare pySDC problem class infrastructure by passing the function space to super init
76 super().__init__(self.V)
77 self._makeAttributeAndRegister(
78 'n', 'nu', 'c', 'order', 'LHS_cache_size', 'comm', localVars=locals(), readOnly=True
79 )
81 # prepare caches and IO variables for solvers
82 self.solvers = {}
83 self.tmp_in = fd.Function(self.V)
84 self.tmp_out = fd.Function(self.V)
86 # prepare work counters
87 self.work_counters['solver_setup'] = WorkCounter()
88 self.work_counters['solves'] = WorkCounter()
89 self.work_counters['rhs'] = WorkCounter()
91 def eval_f(self, u, t):
92 """
93 Evaluate the right hand side.
94 The forcing term is simply interpolated to the grid.
95 The Laplacian is evaluated via a variational problem, where the mass matrix is inverted and homogeneous boundary conditions are applied.
96 Note that we cache the solver to obtain much better performance.
98 Parameters
99 ----------
100 u : dtype_u
101 Solution at which to evaluate
102 t : float
103 Time at which to evaluate
105 Returns
106 -------
107 f : dtype_f
108 The evaluated right hand side
109 """
110 # construct and cache a solver for the implicit part of the right hand side evaluation
111 if not hasattr(self, '__solv_eval_f_implicit'):
112 v = fd.TestFunction(self.V)
113 u_trial = fd.TrialFunction(self.V)
115 a = u_trial * v * fd.dx
116 L_impl = -fd.inner( * fd.nabla_grad(self.tmp_in), fd.nabla_grad(v)) * fd.dx
118 bcs = [fd.bcs.DirichletBC(self.V, fd.Constant(0), area) for area in [1, 2]]
120 prob = fd.LinearVariationalProblem(a, L_impl, self.tmp_out, bcs=bcs)
121 self.__solv_eval_f_implicit = fd.LinearVariationalSolver(prob)
123 # copy the solution we want to evaluate at into the input buffer
124 self.tmp_in.assign(u.functionspace)
126 # perform the solve using the cached solver
127 self.__solv_eval_f_implicit.solve()
129 me = self.dtype_f(self.init)
131 # copy the result of the solver from the output buffer to the variable this function returns
132 me.impl.assign(self.tmp_out)
134 # evaluate explicit part.
135 # Because it does not depend on the current solution, we can simply interpolate the expression
136 x = fd.SpatialCoordinate(self.mesh)
137 me.expl.interpolate(-(np.sin(t) - * np.pi**2 * np.cos(t)) * fd.sin(np.pi * x[0]))
139 self.work_counters['rhs']()
141 return me
143 def solve_system(self, rhs, factor, *args, **kwargs):
144 r"""
145 Linear solver for :math:`(M - factor nu * Lap) u = rhs`.
147 Parameters
148 ----------
149 rhs : dtype_f
150 Right-hand side for the nonlinear system.
151 factor : float
152 Abbrev. for the node-to-node stepsize (or any other factor required).
153 u0 : dtype_u
154 Initial guess for the iterative solver (not used here so far).
155 t : float
156 Current time.
158 Returns
159 -------
160 u : dtype_u
161 Solution.
162 """
164 # construct and cache a solver for the current factor (preconditioner entry times step size)
165 if factor not in self.solvers.keys():
167 # check if we need to evict something from the cache
168 if len(self.solvers) >= self.LHS_cache_size:
169 self.solvers.pop(list(self.solvers.keys())[0])
171 u = fd.TrialFunction(self.V)
172 v = fd.TestFunction(self.V)
174 a = u * v * fd.dx + fd.Constant(factor) * fd.inner( * fd.nabla_grad(u), fd.nabla_grad(v)) * fd.dx
175 L = fd.inner(self.tmp_in, v) * fd.dx
177 bcs = [fd.bcs.DirichletBC(self.V, fd.Constant(self.c), area) for area in [1, 2]]
179 prob = fd.LinearVariationalProblem(a, L, self.tmp_out, bcs=bcs)
180 self.solvers[factor] = fd.LinearVariationalSolver(prob)
182 self.work_counters['solver_setup']()
184 # copy solver rhs to the input buffer. Copying also to the output buffer uses it as initial guess
185 self.tmp_in.assign(rhs.functionspace)
186 self.tmp_out.assign(rhs.functionspace)
188 # call the cached solver
189 self.solvers[factor].solve()
191 # copy from output buffer to return variable
192 me = self.dtype_u(self.init)
193 me.assign(self.tmp_out)
195 self.work_counters['solves']()
196 return me
198 @fd.utils.cached_property
199 def x(self):
200 return fd.SpatialCoordinate(self.mesh)
202 def u_exact(self, t):
203 r"""
204 Routine to compute the exact solution at time :math:`t`.
206 Parameters
207 ----------
208 t : float
209 Time of the exact solution.
211 Returns
212 -------
213 me : dtype_u
214 Exact solution.
215 """
216 me = self.u_init
217 me.interpolate(np.cos(t) * fd.sin(np.pi * self.x[0]) + self.c)
218 return me