Source code for implementations.problem_classes.AllenCahn_Temp_MPIFFT

import numpy as np
from mpi4py_fft import PFFT

from pySDC.core.errors import ProblemError
from pySDC.core.problem import Problem
from pySDC.implementations.datatype_classes.mesh import mesh, imex_mesh

from mpi4py_fft import newDistArray


[docs] class allencahn_temp_imex(Problem): r""" This class implements the :math:`N`-dimensional Allen-Cahn equation with periodic boundary conditions :math:`u \in [0, 1]^2` .. math:: \frac{\partial u}{\partial t} = D \Delta u - \frac{2}{\varepsilon^2} u (1 - u) (1 - 2u) - 6 d_w \frac{u - T_M}{T_M}u (1 - u) on a spatial domain :math:`[-\frac{L}{2}, \frac{L}{2}]^2`, with driving force :math:`d_w`, and :math:`N=2,3`. :math:`D` and :math:`T_M` are fixed parameters. Different initial conditions can be used, for example, circles of the form .. math:: u({\bf x}, 0) = \tanh\left(\frac{r - \sqrt{(x_i-0.5)^2 + (y_j-0.5)^2}}{\sqrt{2}\varepsilon}\right), for :math:`i, j=0,..,N-1`, where :math:`N` is the number of spatial grid points. For time-stepping, the problem is treated *semi-implicitly*, i.e., the nonlinear system is solved by Fast-Fourier Tranform (FFT) and the linear parts in the right-hand side will be treated explicitly using ``mpi4py-fft`` [1]_ to solve them. Attributes ---------- nvars : List of int tuples, optional Number of unknowns in the problem, e.g. ``nvars=[(128, 128), (64, 64)]``. eps : float, optional Scaling parameter :math:`\varepsilon`. radius : float, optional Radius of the circles. spectral : bool, optional Indicates if spectral initial condition is used. TM : float, optional Problem parameter :math:`T_M`. D : float, optional Problem parameter :math:`D`. dw : float, optional Driving force. L : float, optional Denotes the period of the function to be approximated for the Fourier transform. init_type : str, optional Initialises type of initial state. comm : bool, optional Communicator. Attributes ---------- fft : fft object Object for FFT. X : np.ogrid Grid coordinates in real space. K2 : np.ndarray Laplace operator in spectral space. dx : float Mesh width in x direction. dy : float Mesh width in y direction. References ---------- .. [1] https://mpi4py-fft.readthedocs.io/en/latest/ """ dtype_u = mesh dtype_f = imex_mesh def __init__( self, nvars=None, eps=0.04, radius=0.25, spectral=None, TM=1.0, D=10.0, dw=0.0, L=1.0, init_type='circle', comm=None, ): """Initialization routine""" if nvars is None: nvars = [(128, 128)] if not (isinstance(nvars, tuple) and len(nvars) > 1): raise ProblemError('Need at least two dimensions') # creating FFT structure ndim = len(nvars) axes = tuple(range(ndim)) self.fft = PFFT(comm, list(nvars), axes=axes, dtype=np.float64, collapse=True) # get test data to figure out type and dimensions tmp_u = newDistArray(self.fft, spectral) # add two components to contain field and temperature self.ncomp = 2 sizes = tmp_u.shape + (self.ncomp,) # invoke super init, passing the communicator and the local dimensions as init super().__init__(init=(sizes, comm, tmp_u.dtype)) self._makeAttributeAndRegister( 'nvars', 'eps', 'radius', 'spectral', 'TM', 'D', 'dw', 'L', 'init_type', 'comm', localVars=locals(), readOnly=True, ) L = np.array([self.L] * ndim, dtype=float) # get local mesh X = list(np.ogrid[self.fft.local_slice(False)]) N = self.fft.global_shape() for i in range(len(N)): X[i] = X[i] * L[i] / N[i] self.X = [np.broadcast_to(x, self.fft.shape(False)) for x in X] # get local wavenumbers and Laplace operator s = self.fft.local_slice() N = self.fft.global_shape() k = [np.fft.fftfreq(n, 1.0 / n).astype(int) for n in N[:-1]] k.append(np.fft.rfftfreq(N[-1], 1.0 / N[-1]).astype(int)) K = [ki[si] for ki, si in zip(k, s)] Ks = list(np.meshgrid(*K, indexing='ij', sparse=True)) Lp = 2 * np.pi / L for i in range(ndim): Ks[i] = (Ks[i] * Lp[i]).astype(float) K = [np.broadcast_to(k, self.fft.shape(True)) for k in Ks] K = np.array(K).astype(float) self.K2 = np.sum(K * K, 0, dtype=float) # Need this for diagnostics self.dx = self.L / nvars[0] self.dy = self.L / nvars[1]
[docs] def eval_f(self, u, t): """ Routine to evaluate the right-hand side of the problem. Parameters ---------- u : dtype_u Current values of the numerical solution. t : float Current time of the numerical solution is computed. Returns ------- f : dtype_f The right-hand side of the problem. """ f = self.dtype_f(self.init) if self.spectral: f.impl[..., 0] = -self.K2 * u[..., 0] if self.eps > 0: tmp_u = newDistArray(self.fft, False) tmp_T = newDistArray(self.fft, False) tmp_u = self.fft.backward(u[..., 0], tmp_u) tmp_T = self.fft.backward(u[..., 1], tmp_T) tmpf = -2.0 / self.eps**2 * tmp_u * (1.0 - tmp_u) * (1.0 - 2.0 * tmp_u) - 6.0 * self.dw * ( tmp_T - self.TM ) / self.TM * tmp_u * (1.0 - tmp_u) f.expl[..., 0] = self.fft.forward(tmpf) f.impl[..., 1] = -self.D * self.K2 * u[..., 1] f.expl[..., 1] = f.impl[..., 0] + f.expl[..., 0] else: u_hat = self.fft.forward(u[..., 0]) lap_u_hat = -self.K2 * u_hat f.impl[..., 0] = self.fft.backward(lap_u_hat, f.impl[..., 0]) if self.eps > 0: f.expl[..., 0] = -2.0 / self.eps**2 * u[..., 0] * (1.0 - u[..., 0]) * (1.0 - 2.0 * u[..., 0]) f.expl[..., 0] -= 6.0 * self.dw * (u[..., 1] - self.TM) / self.TM * u[..., 0] * (1.0 - u[..., 0]) u_hat = self.fft.forward(u[..., 1]) lap_u_hat = -self.D * self.K2 * u_hat f.impl[..., 1] = self.fft.backward(lap_u_hat, f.impl[..., 1]) f.expl[..., 1] = f.impl[..., 0] + f.expl[..., 0] return f
[docs] def solve_system(self, rhs, factor, u0, t): """ Simple FFT solver for the diffusion part. Parameters ---------- rhs : dtype_f Right-hand side for the linear system. factor : float Abbrev. for the node-to-node stepsize (or any other factor required). u0 : dtype_u Initial guess for the iterative solver (not used here so far). t : float Current time (e.g. for time-dependent BCs). Returns ------- me : dtype_u The solution as mesh. """ if self.spectral: me = self.dtype_u(self.init) me[..., 0] = rhs[..., 0] / (1.0 + factor * self.K2) me[..., 1] = rhs[..., 1] / (1.0 + factor * self.D * self.K2) else: me = self.dtype_u(self.init) rhs_hat = self.fft.forward(rhs[..., 0]) rhs_hat /= 1.0 + factor * self.K2 me[..., 0] = self.fft.backward(rhs_hat) rhs_hat = self.fft.forward(rhs[..., 1]) rhs_hat /= 1.0 + factor * self.D * self.K2 me[..., 1] = self.fft.backward(rhs_hat) return me
[docs] def u_exact(self, t): r""" Routine to compute the exact solution at time :math:`t`. Parameters ---------- t : float Time of the exact solution. Returns ------- me : dtype_u The exact solution. """ def circle(): tmp_me = newDistArray(self.fft, self.spectral) r2 = (self.X[0] - 0.5) ** 2 + (self.X[1] - 0.5) ** 2 if self.spectral: tmp = 0.5 * (1.0 + np.tanh((self.radius - np.sqrt(r2)) / (np.sqrt(2) * self.eps))) tmp_me[:] = self.fft.forward(tmp) else: tmp_me[:] = 0.5 * (1.0 + np.tanh((self.radius - np.sqrt(r2)) / (np.sqrt(2) * self.eps))) return tmp_me def circle_rand(): tmp_me = newDistArray(self.fft, self.spectral) ndim = len(tmp_me.shape) L = int(self.L) # get random radii for circles/spheres np.random.seed(1) lbound = 3.0 * self.eps ubound = 0.5 - self.eps rand_radii = (ubound - lbound) * np.random.random_sample(size=tuple([L] * ndim)) + lbound # distribute circles/spheres tmp = newDistArray(self.fft, False) if ndim == 2: for i in range(0, L): for j in range(0, L): # build radius r2 = (self.X[0] + i - L + 0.5) ** 2 + (self.X[1] + j - L + 0.5) ** 2 # add this blob, shifted by 1 to avoid issues with adding up negative contributions tmp += np.tanh((rand_radii[i, j] - np.sqrt(r2)) / (np.sqrt(2) * self.eps)) + 1 # normalize to [0,1] tmp *= 0.5 assert np.all(tmp <= 1.0) if self.spectral: tmp_me[:] = self.fft.forward(tmp) else: tmp_me[:] = tmp[:] return tmp_me def sine(): tmp_me = newDistArray(self.fft, self.spectral) if self.spectral: tmp = 0.5 * (2.0 + 0.0 * np.sin(2 * np.pi * self.X[0]) * np.sin(2 * np.pi * self.X[1])) tmp_me[:] = self.fft.forward(tmp) else: tmp_me[:] = 0.5 * (2.0 + 0.0 * np.sin(2 * np.pi * self.X[0]) * np.sin(2 * np.pi * self.X[1])) return tmp_me assert t == 0, 'ERROR: u_exact only valid for t=0' me = self.dtype_u(self.init, val=0.0) if self.init_type == 'circle': me[..., 0] = circle() me[..., 1] = sine() elif self.init_type == 'circle_rand': me[..., 0] = circle_rand() me[..., 1] = sine() else: raise NotImplementedError('type of initial value not implemented, got %s' % self.init_type) return me
# class allencahn_temp_imex_timeforcing(allencahn_temp_imex): # """ # Example implementing Allen-Cahn equation in 2-3D using mpi4py-fft for solving linear parts, IMEX time-stepping, # time-dependent forcing # """ # # def eval_f(self, u, t): # """ # Routine to evaluate the RHS # # Args: # u (dtype_u): current values # t (float): current time # # Returns: # dtype_f: the RHS # """ # # f = self.dtype_f(self.init) # # if self.spectral: # # f.impl = -self.K2 * u # # tmp = newDistArray(self.fft, False) # tmp[:] = self.fft.backward(u, tmp) # # if self.eps > 0: # tmpf = -2.0 / self.eps ** 2 * tmp * (1.0 - tmp) * (1.0 - 2.0 * tmp) # else: # tmpf = self.dtype_f(self.init, val=0.0) # # # build sum over RHS without driving force # Rt_local = float(np.sum(self.fft.backward(f.impl) + tmpf)) # if self.comm is not None: # Rt_global = self.comm.allreduce(sendobj=Rt_local, op=MPI.SUM) # else: # Rt_global = Rt_local # # # build sum over driving force term # Ht_local = float(np.sum(6.0 * tmp * (1.0 - tmp))) # if self.comm is not None: # Ht_global = self.comm.allreduce(sendobj=Ht_local, op=MPI.SUM) # else: # Ht_global = Rt_local # # # add/substract time-dependent driving force # if Ht_global != 0.0: # dw = Rt_global / Ht_global # else: # dw = 0.0 # # tmpf -= 6.0 * dw * tmp * (1.0 - tmp) # f.expl[:] = self.fft.forward(tmpf) # # else: # # u_hat = self.fft.forward(u) # lap_u_hat = -self.K2 * u_hat # f.impl[:] = self.fft.backward(lap_u_hat, f.impl) # # if self.eps > 0: # f.expl = -2.0 / self.eps ** 2 * u * (1.0 - u) * (1.0 - 2.0 * u) # # # build sum over RHS without driving force # Rt_local = float(np.sum(f.impl + f.expl)) # if self.comm is not None: # Rt_global = self.comm.allreduce(sendobj=Rt_local, op=MPI.SUM) # else: # Rt_global = Rt_local # # # build sum over driving force term # Ht_local = float(np.sum(6.0 * u * (1.0 - u))) # if self.comm is not None: # Ht_global = self.comm.allreduce(sendobj=Ht_local, op=MPI.SUM) # else: # Ht_global = Rt_local # # # add/substract time-dependent driving force # if Ht_global != 0.0: # dw = Rt_global / Ht_global # else: # dw = 0.0 # # f.expl -= 6.0 * dw * u * (1.0 - u) # # return f