Source code for implementations.problem_classes.AllenCahn_MPIFFT

import numpy as np
from mpi4py import MPI

from pySDC.implementations.problem_classes.generic_MPIFFT_Laplacian import IMEX_Laplacian_MPIFFT
from mpi4py_fft import newDistArray


[docs] class allencahn_imex(IMEX_Laplacian_MPIFFT): r""" Example implementing the :math:`2`-dimensional Allen-Cahn equation with periodic boundary conditions :math:`u \in [0, 1]^2` .. math:: \frac{\partial u}{\partial t} = \Delta u - \frac{2}{\varepsilon^2} u (1 - u) (1 - 2u) - 6 d_w u (1 - u) on a spatial domain :math:`[-\frac{L}{2}, \frac{L}{2}]^2`, driving force :math:`d_w`, and :math:`N=2,3`. Different initial conditions can be used, for example, circles of the form .. math:: u({\bf x}, 0) = \tanh\left(\frac{r - \sqrt{(x_i-0.5)^2 + (y_j-0.5)^2}}{\sqrt{2}\varepsilon}\right), for :math:`i, j=0,..,N-1`, where :math:`N` is the number of spatial grid points. For time-stepping, the problem is treated *semi-implicitly*, i.e., the linear part is solved with Fast-Fourier Transform (FFT) and the nonlinear part in the right-hand side will be treated explicitly using ``mpi4py-fft`` [1]_ to solve them. Parameters ---------- nvars : List of int tuples, optional Number of unknowns in the problem, e.g. ``nvars=(128, 128)``. eps : float, optional Scaling parameter :math:`\varepsilon`. radius : float, optional Radius of the circles. spectral : bool, optional Indicates if spectral initial condition is used. dw : float, optional Driving force. L : float, optional Denotes the period of the function to be approximated for the Fourier transform. init_type : str, optional Initialises type of initial state. comm : bool, optional Communicator for parallelization. Attributes ---------- fft : fft object Object for FFT. X : np.ogrid Grid coordinates in real space. K2 : np.1darray Laplace operator in spectral space. dx : float Mesh width in x direction. dy : float Mesh width in y direction. References ---------- .. [1] https://mpi4py-fft.readthedocs.io/en/latest/ """ def __init__( self, eps=0.04, radius=0.25, dw=0.0, init_type='circle', **kwargs, ): kwargs['L'] = kwargs.get('L', 1.0) super().__init__(alpha=1.0, dtype=np.dtype('float'), **kwargs) self._makeAttributeAndRegister('eps', 'radius', 'dw', 'init_type', localVars=locals(), readOnly=True) def _eval_explicit_part(self, u, t, f_expl): f_expl[:] = -2.0 / self.eps**2 * u * (1.0 - u) * (1.0 - 2.0 * u) - 6.0 * self.dw * u * (1.0 - u) return f_expl
[docs] def eval_f(self, u, t): """ Routine to evaluate the right-hand side of the problem. Parameters ---------- u : dtype_u Current values of the numerical solution. t : float Current time of the numerical solution is computed. Returns ------- f : dtype_f The right-hand side of the problem. """ f = self.dtype_f(self.init) f.impl[:] = self._eval_Laplacian(u, f.impl) if self.spectral: f.impl = -self.K2 * u if self.eps > 0: tmp = self.fft.backward(u) tmp[:] = self._eval_explicit_part(tmp, t, tmp) f.expl[:] = self.fft.forward(tmp) else: if self.eps > 0: f.expl[:] = self._eval_explicit_part(u, t, f.expl) self.work_counters['rhs']() return f
[docs] def u_exact(self, t, **kwargs): r""" Routine to compute the exact solution at time :math:`t`. Parameters ---------- t : float Time of the exact solution. Returns ------- me : dtype_u The exact solution. """ assert t == 0, 'ERROR: u_exact only valid for t=0' me = self.dtype_u(self.init, val=0.0) if self.init_type == 'circle': r2 = (self.X[0] - 0.5) ** 2 + (self.X[1] - 0.5) ** 2 if self.spectral: tmp = 0.5 * (1.0 + self.xp.tanh((self.radius - self.xp.sqrt(r2)) / (np.sqrt(2) * self.eps))) me[:] = self.fft.forward(tmp) else: me[:] = 0.5 * (1.0 + self.xp.tanh((self.radius - self.xp.sqrt(r2)) / (np.sqrt(2) * self.eps))) elif self.init_type == 'circle_rand': ndim = len(me.shape) L = int(self.L[0]) # get random radii for circles/spheres self.xp.random.seed(1) lbound = 3.0 * self.eps ubound = 0.5 - self.eps rand_radii = (ubound - lbound) * self.xp.random.random_sample(size=tuple([L] * ndim)) + lbound # distribute circles/spheres tmp = newDistArray(self.fft, False) if ndim == 2: for i in range(0, L): for j in range(0, L): # build radius r2 = (self.X[0] + i - L + 0.5) ** 2 + (self.X[1] + j - L + 0.5) ** 2 # add this blob, shifted by 1 to avoid issues with adding up negative contributions tmp += self.xp.tanh((rand_radii[i, j] - self.xp.sqrt(r2)) / (np.sqrt(2) * self.eps)) + 1 else: raise NotImplementedError # normalize to [0,1] tmp *= 0.5 assert self.xp.all(tmp <= 1.0) if self.spectral: me[:] = self.fft.forward(tmp) else: self.xp.copyto(me, tmp) else: raise NotImplementedError('type of initial value not implemented, got %s' % self.init_type) return me
[docs] class allencahn_imex_timeforcing(allencahn_imex): r""" Example implementing the :math:`N`-dimensional Allen-Cahn equation with periodic boundary conditions :math:`u \in [0, 1]^2` using time-dependent forcing .. math:: \frac{\partial u}{\partial t} = \Delta u - \frac{2}{\varepsilon^2} u (1 - u) (1 - 2u) - 6 d_w u (1 - u) on a spatial domain :math:`[-\frac{L}{2}, \frac{L}{2}]^2`, driving force :math:`d_w`, and :math:`N=2,3`. Different initial conditions can be used, for example, circles of the form .. math:: u({\bf x}, 0) = \tanh\left(\frac{r - \sqrt{(x_i-0.5)^2 + (y_j-0.5)^2}}{\sqrt{2}\varepsilon}\right), for :math:`i, j=0,..,N-1`, where :math:`N` is the number of spatial grid points. For time-stepping, the problem is treated *semi-implicitly*, i.e., the linear part is solved with Fast-Fourier Transform (FFT) using ``mpi4py-fft`` [1]_ and the nonlinear part in the right-hand side will be treated explicitly. """
[docs] def eval_f(self, u, t): """ Routine to evaluate the right-hand side of the problem. Parameters ---------- u : dtype_u Current values of the numerical solution. t : float Current time of the numerical solution is computed. Returns ------- f : dtype_f The right-hand side of the problem. """ f = self.dtype_f(self.init) f.impl[:] = self._eval_Laplacian(u, f.impl) if self.spectral: tmp = newDistArray(self.fft, False) tmp[:] = self.fft.backward(u, tmp) if self.eps > 0: tmpf = -2.0 / self.eps**2 * tmp * (1.0 - tmp) * (1.0 - 2.0 * tmp) else: tmpf = self.dtype_f(self.init, val=0.0) # build sum over RHS without driving force Rt_local = float(self.xp.sum(self.fft.backward(f.impl) + tmpf)) if self.comm is not None: Rt_global = self.comm.allreduce(sendobj=Rt_local, op=MPI.SUM) else: Rt_global = Rt_local # build sum over driving force term Ht_local = float(self.xp.sum(6.0 * tmp * (1.0 - tmp))) if self.comm is not None: Ht_global = self.comm.allreduce(sendobj=Ht_local, op=MPI.SUM) else: Ht_global = Ht_local # add/substract time-dependent driving force if Ht_global != 0.0: dw = Rt_global / Ht_global else: dw = 0.0 tmpf -= 6.0 * dw * tmp * (1.0 - tmp) f.expl[:] = self.fft.forward(tmpf) else: if self.eps > 0: f.expl = -2.0 / self.eps**2 * u * (1.0 - u) * (1.0 - 2.0 * u) # build sum over RHS without driving force Rt_local = float(self.xp.sum(f.impl + f.expl)) if self.comm is not None: Rt_global = self.comm.allreduce(sendobj=Rt_local, op=MPI.SUM) else: Rt_global = Rt_local # build sum over driving force term Ht_local = float(self.xp.sum(6.0 * u * (1.0 - u))) if self.comm is not None: Ht_global = self.comm.allreduce(sendobj=Ht_local, op=MPI.SUM) else: Ht_global = Ht_local # add/substract time-dependent driving force if Ht_global != 0.0: dw = Rt_global / Ht_global else: dw = 0.0 f.expl -= 6.0 * dw * u * (1.0 - u) self.work_counters['rhs']() return f