from mpi4py import MPI
from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
from pySDC.core.sweeper import Sweeper, ParameterError
import logging
class SweeperMPI(Sweeper):
MPI based sweeper where each rank administers one collocation node. Adapt sweepers to MPI by use of multiple inheritance.
See for example the `generic_implicit_MPI` sweeper, which has a class definition:
class generic_implicit_MPI(SweeperMPI, generic_implicit):
this means in inherits both from `SweeperMPI` and `generic_implicit`. The hierarchy works such that functions are first
called from `SweeperMPI` and then from `generic_implicit`. For instance, in the `__init__` function, the `SweeperMPI`
class adds a communicator and nothing else. The `generic_implicit` implicit class adds a preconditioner and so on.
It's a bit confusing because `self.params` is overwritten in the second call to the `__init__` of the core `sweeper`
class, but the `SweeperMPI` class adds parameters to the `params` dictionary, which will again be added in
def __init__(self, params):
self.logger = logging.getLogger('sweeper')
if 'comm' not in params.keys():
params['comm'] = MPI.COMM_WORLD
self.logger.debug('Using MPI.COMM_WORLD for the communicator because none was supplied in the params.')
if self.params.comm.size != self.coll.num_nodes:
raise NotImplementedError(
f'The communicator in the {type(self).__name__} sweeper needs to have one rank for each node as of now! That means we need {self.coll.num_nodes} nodes, but got {self.params.comm.size} processes.'
def comm(self):
return self.params.comm
def rank(self):
return self.comm.rank
def compute_end_point(self):
Compute u at the right point of the interval
The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False
L = self.level
P = L.prob
# check if Mth node is equal to right point and do_coll_update is false, perform a simple copy
if self.coll.right_is_node and not self.params.do_coll_update:
# a copy is sufficient
root = self.comm.Get_size() - 1
if self.comm.rank == root:
L.uend = P.dtype_u(L.u[-1])
L.uend = P.dtype_u(L.u[0])
self.comm.Bcast(L.uend, root=root)
raise NotImplementedError('require last node to be identical with right interval boundary')
return None
def compute_residual(self, stage=None):
Computation of the residual using the collocation matrix Q
stage (str): The current stage of the step the level belongs to
L = self.level
# Check if we want to skip the residual computation to gain performance
# Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual!
if stage in self.params.skip_residual_computation:
L.status.residual = 0.0 if L.status.residual is None else L.status.residual
return None
# compute the residual for each node
# build QF(u)
res = self.integrate(last_only=L.params.residual_type[:4] == 'last')
res += L.u[0] - L.u[self.rank + 1]
# add tau if associated
if L.tau[self.rank] is not None:
res += L.tau[self.rank]
# use abs function from data type here
res_norm = abs(res)
# find maximal residual over the nodes
if L.params.residual_type == 'full_abs':
L.status.residual = self.comm.allreduce(res_norm, op=MPI.MAX)
elif L.params.residual_type == 'last_abs':
L.status.residual = self.comm.bcast(res_norm, root=self.comm.size - 1)
elif L.params.residual_type == 'full_rel':
L.status.residual = self.comm.allreduce(res_norm / abs(L.u[0]), op=MPI.MAX)
elif L.params.residual_type == 'last_rel':
L.status.residual = self.comm.bcast(res_norm / abs(L.u[0]), root=self.comm.size - 1)
raise NotImplementedError(f'residual type \"{L.params.residual_type}\" not implemented!')
# indicate that the residual has seen the new values
L.status.updated = False
return None
def predict(self):
Predictor to fill values at nodes before first sweep
Default prediction for the sweepers, only copies the values to all collocation nodes
and evaluates the RHS of the ODE there
L = self.level
P = L.prob
# evaluate RHS at left point
L.f[0] = P.eval_f(L.u[0], L.time)
m = self.rank
if self.params.initial_guess == 'spread':
# copy u[0] to all collocation nodes, evaluate RHS
L.u[m + 1] = P.dtype_u(L.u[0])
L.f[m + 1] = P.eval_f(L.u[m + 1], L.time + L.dt * self.coll.nodes[m])
elif self.params.initial_guess == 'copy':
# copy u[0] and RHS evaluation to all collocation nodes
L.u[m + 1] = P.dtype_u(L.u[0])
L.f[m + 1] = P.dtype_f(L.f[0])
elif self.params.initial_guess == 'zero':
# zeros solution for u and RHS
L.u[m + 1] = P.dtype_u(init=P.init, val=0.0)
L.f[m + 1] = P.dtype_f(init=P.init, val=0.0)
raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented')
# indicate that this level is now ready for sweeps
L.status.unlocked = True
L.status.updated = True
def communicate_tau_correction_for_full_interval(self):
L = self.level
P = L.prob
if self.rank < self.comm.size - 1:
L.tau[-1] = P.u_init
self.comm.Bcast(L.tau[-1], root=self.comm.size - 1)
class generic_implicit_MPI(SweeperMPI, generic_implicit):
Generic implicit sweeper parallelized across the nodes.
Please supply a communicator as `comm` to the parameters!
rank (int): MPI rank
def integrate(self, last_only=False):
Integrates the right-hand side
last_only (bool): Integrate only the last node for the residual or all of them
list of dtype_u: containing the integral as values
L = self.level
P = L.prob
me = P.dtype_u(P.init, val=0.0)
for m in [self.coll.num_nodes - 1] if last_only else range(self.coll.num_nodes):
recvBuf = me if m == self.rank else None
L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], recvBuf, root=m, op=MPI.SUM
return me
def update_nodes(self):
Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
L = self.level
P = L.prob
# only if the level has been touched before
assert L.status.unlocked
# update the MIN-SR-FLEX preconditioner
# gather all terms which are known already (e.g. from the previous iteration)
# this corresponds to u0 + QF(u^k) - QdF(u^k) + tau
# get QF(u^k)
rhs = self.integrate()
rhs -= L.dt * self.QI[self.rank + 1, self.rank + 1] * L.f[self.rank + 1]
# add initial value
rhs += L.u[0]
# add tau if associated
if L.tau[self.rank] is not None:
rhs += L.tau[self.rank]
# build rhs, consisting of the known values from above and new values from previous nodes (at k+1)
# implicit solve with prefactor stemming from the diagonal of Qd
L.u[self.rank + 1] = P.solve_system(
L.dt * self.QI[self.rank + 1, self.rank + 1],
L.u[self.rank + 1],
L.time + L.dt * self.coll.nodes[self.rank],
# update function values
L.f[self.rank + 1] = P.eval_f(L.u[self.rank + 1], L.time + L.dt * self.coll.nodes[self.rank])
# indicate presence of new values at this level
L.status.updated = True
return None
def compute_end_point(self):
Compute u at the right point of the interval
The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False
L = self.level
P = L.prob
# check if Mth node is equal to right point and do_coll_update is false, perform a simple copy
if self.coll.right_is_node and not self.params.do_coll_update:
L.uend = P.dtype_u(L.u[0])
self.comm.Allreduce(L.dt * self.coll.weights[self.rank] * L.f[self.rank + 1], L.uend, op=MPI.SUM)
L.uend += L.u[0]
# add up tau correction of the full interval (last entry)
if L.tau[self.rank] is not None:
L.uend += L.tau[-1]
return None