Step-6: Advanced PFASST controllers

We discuss controller implementations, features and parallelization of PFASST controllers in this step.

Part A: The nonMPI controller

pySDC comes with (at least) two controllers: the standard, non-MPI controller we have used so far and the MPI_parallel one. The nonMPI controller can be used to run simulations without having to worry about parallelization and MPI installations. By monitoring the convergence, this controller can already give a detailed idea of how PFASST will work for a given problem.

Important things to note:

  • If you don’t want to deal with parallelization and/or are only interested in SDC, MLSDC or convergence of PFASST, use the nonMPI controller.

  • If you care for parallelization, use the MPI controller, see Part C.

Full code: pySDC/tutorial/step_6/A_run_non_MPI_controller.py

from pathlib import Path

from pySDC.helpers.stats_helper import get_sorted

from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI
from pySDC.implementations.problem_classes.HeatEquation_ND_FD import heatNd_unforced
from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
from pySDC.implementations.transfer_classes.TransferMesh import mesh_to_mesh


def main(num_proc_list=None, fname=None, multi_level=True):
    """
    A simple test program to run PFASST

    Args:
        num_proc_list: list of number of processes to test with
        fname: filename/path for output
        multi_level (bool): do multi-level run or single-level
    """

    if multi_level:
        description, controller_params, t0, Tend = set_parameters_ml()
    else:
        assert all(num_proc == 1 for num_proc in num_proc_list), (
            'ERROR: single-level run can only use 1 processor, got %s' % num_proc_list
        )
        description, controller_params, t0, Tend = set_parameters_sl()

    Path("data").mkdir(parents=True, exist_ok=True)
    f = open('data/' + fname, 'w')
    # loop over different numbers of processes
    for num_proc in num_proc_list:
        out = 'Working with %2i processes...' % num_proc
        f.write(out + '\n')
        print(out)

        # instantiate controllers
        controller = controller_nonMPI(num_procs=num_proc, controller_params=controller_params, description=description)

        # get initial values on finest level
        P = controller.MS[0].levels[0].prob
        uinit = P.u_exact(t0)

        # call main functions to get things done...
        uend, stats = controller.run(u0=uinit, t0=t0, Tend=Tend)

        # compute exact solution and compare with both results
        uex = P.u_exact(Tend)
        err = abs(uex - uend)

        out = 'Error vs. exact solution: %12.8e' % err
        f.write(out + '\n')
        print(out)

        # filter statistics by type (number of iterations)
        iter_counts = get_sorted(stats, type='niter', sortby='time')

        # compute and print statistics
        for item in iter_counts:
            out = 'Number of iterations for time %4.2f: %1i ' % (item[0], item[1])
            f.write(out + '\n')
            print(out)

        f.write('\n')
        print()

        assert all(item[1] <= 8 for item in iter_counts), "ERROR: weird iteration counts, got %s" % iter_counts

    f.close()


def set_parameters_ml():
    """
    Helper routine to set parameters for the following multi-level runs

    Returns:
        dict: dictionary containing the simulation parameters
        dict: dictionary containing the controller parameters
        float: starting time
        float: end time
    """
    # initialize level parameters
    level_params = {}
    level_params['restol'] = 5e-10
    level_params['dt'] = 0.125

    # initialize sweeper parameters
    sweeper_params = {}
    sweeper_params['quad_type'] = 'RADAU-RIGHT'
    sweeper_params['num_nodes'] = [3]
    sweeper_params['QI'] = 'LU'

    # initialize problem parameters
    problem_params = {}
    problem_params['nu'] = 0.1  # diffusion coefficient
    problem_params['freq'] = 2  # frequency for the test value
    problem_params['nvars'] = [63, 31]  # number of degrees of freedom for each level
    problem_params['bc'] = 'dirichlet-zero'  # boundary conditions

    # initialize step parameters
    step_params = {}
    step_params['maxiter'] = 50
    step_params['errtol'] = 1e-05

    # initialize space transfer parameters
    space_transfer_params = {}
    space_transfer_params['rorder'] = 2
    space_transfer_params['iorder'] = 6

    # initialize controller parameters
    controller_params = {}
    controller_params['logger_level'] = 30
    controller_params['all_to_done'] = True  # can ask the controller to keep iterating all steps until the end
    controller_params['predict_type'] = 'pfasst_burnin'  # activate iteration estimator

    # fill description dictionary for easy step instantiation
    description = {}
    description['problem_class'] = heatNd_unforced
    description['problem_params'] = problem_params
    description['sweeper_class'] = generic_implicit
    description['sweeper_params'] = sweeper_params
    description['level_params'] = level_params
    description['step_params'] = step_params
    description['space_transfer_class'] = mesh_to_mesh
    description['space_transfer_params'] = space_transfer_params

    # set time parameters
    t0 = 0.0
    Tend = 1.0

    return description, controller_params, t0, Tend


def set_parameters_sl():
    """
    Helper routine to set parameters for the following multi-level runs

    Returns:
        dict: dictionary containing the simulation parameters
        dict: dictionary containing the controller parameters
        float: starting time
        float: end time
    """
    # initialize level parameters
    level_params = {}
    level_params['restol'] = 5e-10
    level_params['dt'] = 0.125

    # initialize sweeper parameters
    sweeper_params = {}
    sweeper_params['quad_type'] = 'RADAU-RIGHT'
    sweeper_params['num_nodes'] = 3
    sweeper_params['QI'] = 'LU'

    # initialize problem parameters
    problem_params = {}
    problem_params['nu'] = 0.1  # diffusion coefficient
    problem_params['freq'] = 2  # frequency for the test value
    problem_params['nvars'] = 63  # number of degrees of freedom for each level
    problem_params['bc'] = 'dirichlet-zero'  # boundary conditions

    # initialize step parameters
    step_params = {}
    step_params['maxiter'] = 50

    # initialize controller parameters
    controller_params = {}
    controller_params['logger_level'] = 30

    # fill description dictionary for easy step instantiation
    description = {}
    description['problem_class'] = heatNd_unforced
    description['problem_params'] = problem_params
    description['sweeper_class'] = generic_implicit
    description['sweeper_params'] = sweeper_params
    description['level_params'] = level_params
    description['step_params'] = step_params

    # set time parameters
    t0 = 0.0
    Tend = 1.0

    return description, controller_params, t0, Tend


if __name__ == "__main__":
    main(num_proc_list=[1], fname='step_6_A_sl_out.txt', multi_level=False)
    main(num_proc_list=[1, 2, 4, 8], fname='step_6_A_ml_out.txt', multi_level=True)

Results:

Working with  1 processes...
Error vs. exact solution: 2.87627033e-07
Number of iterations for time 0.00: 8 
Number of iterations for time 0.12: 8 
Number of iterations for time 0.25: 8 
Number of iterations for time 0.38: 8 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 7 

Working with  1 processes...
Error vs. exact solution: 2.87300679e-07
Number of iterations for time 0.00: 4 
Number of iterations for time 0.12: 4 
Number of iterations for time 0.25: 3 
Number of iterations for time 0.38: 3 
Number of iterations for time 0.50: 3 
Number of iterations for time 0.62: 3 
Number of iterations for time 0.75: 3 
Number of iterations for time 0.88: 3 

Working with  2 processes...
Error vs. exact solution: 2.87272106e-07
Number of iterations for time 0.00: 4 
Number of iterations for time 0.12: 4 
Number of iterations for time 0.25: 4 
Number of iterations for time 0.38: 4 
Number of iterations for time 0.50: 4 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  4 processes...
Error vs. exact solution: 2.87294206e-07
Number of iterations for time 0.00: 5 
Number of iterations for time 0.12: 5 
Number of iterations for time 0.25: 5 
Number of iterations for time 0.38: 5 
Number of iterations for time 0.50: 5 
Number of iterations for time 0.62: 5 
Number of iterations for time 0.75: 5 
Number of iterations for time 0.88: 5 

Working with  8 processes...
Error vs. exact solution: 2.87290945e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 7 

Part B: Odd temporal distribution

Accidentally, the numbers of parallel processes used in Part A are always dividers of the number of steps. Yet, this does not need to be the case. All controllers are capable of handling odd distributions, e.g. too few or too many processes for the steps (or for the las block). This is demonstrated here, where the code from Part A is called again with odd number of parallel steps.

Important things to note:

  • This capability may become useful if adaptive time-stepping is used. The controllers check for currently active steps and only those will compute the next block.

  • This also works for/with SDC and MLSDC, where in the case of varying time-step sizes the overall number of steps is not given at the beginning.

Full code: pySDC/tutorial/step_6/B_odd_temporal_distribution.py

from pySDC.tutorial.step_6.A_run_non_MPI_controller import main as main_A


def main():
    """
    A simple test program to do check PFASST for odd numbers of processes
    """
    main_A(num_proc_list=[3, 5, 7, 9], fname='step_6_B_out.txt', multi_level=True)


if __name__ == "__main__":
    main()

Results:

Working with  3 processes...
Error vs. exact solution: 2.87358935e-07
Number of iterations for time 0.00: 5 
Number of iterations for time 0.12: 5 
Number of iterations for time 0.25: 5 
Number of iterations for time 0.38: 4 
Number of iterations for time 0.50: 4 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  5 processes...
Error vs. exact solution: 2.87358097e-07
Number of iterations for time 0.00: 6 
Number of iterations for time 0.12: 6 
Number of iterations for time 0.25: 6 
Number of iterations for time 0.38: 6 
Number of iterations for time 0.50: 6 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  7 processes...
Error vs. exact solution: 2.87271747e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 3 

Working with  9 processes...
Error vs. exact solution: 2.87290945e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 7 

Part C: MPI parallelization

Since PFASST is actually a parallel algorithm, executing it in parallel e.g. using MPI might be an interesting exercise. To do this, pySDC comes with the MPI-parallelized controller, namely controller_MPI. It is supposed to yield the same results as the non-MPI counterpart and this is what we are demonstrating here (at least for one particular example). The actual code of this part is rather short, since the only task is to call another snippet (playground_parallelization.py) with different number of parallel processes. This is realized using Python’s subprocess library and we check at the end if each call returned normally. Now, the snippet called by the example is basically the same code as used by Parts A and B. We can use the results of Parts A and B to compare with and we expect the same number of iterations, the same accuracy and the same difference between the two flavors as in Part A (up to machine precision).

Important things to note:

  • The additional Python script playground_parallelization.py contains the code to run the MPI-parallel controller. To this end, we import the routine set_parameters from Part A to ensure that we use the same set of parameters for all runs.

  • This example also shows how the statistics of multiple MPI processes can be gathered and processed by rank 0, see playground_parallelization.py.

  • The controller needs a working installation of mpi4py. Since this is not always easy to achieve and since debugging a parallel program can cause a lot of headaches, the non-MPI controller performs the same operations in serial.

  • The somewhat weird notation with the current working directory cwd is due to the corresponding test, which, run by nosetests, has a different working directory than the tutorial.

Full code: pySDC/tutorial/step_6/C_MPI_parallelization.py and pySDC/tutorial/step_6/playground_parallelization.py

import os
import subprocess


def main(cwd):
    """
    A simple test program to test MPI-parallel PFASST controllers
    Args:
        cwd: current working directory
    """

    # try to import MPI here, will fail if things go wrong (and not in the subprocess part)
    try:
        import mpi4py

        del mpi4py
    except ImportError:
        raise ImportError('petsc tests need mpi4py')

    # Set python path once
    my_env = os.environ.copy()
    my_env['PYTHONPATH'] = '../../..:.'
    my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml'

    # set list of number of parallel steps (even)
    num_procs_list = [1, 2, 4, 8]

    # set up new/empty file for output
    fname = 'step_6_C1_out.txt'
    f = open(cwd + '/../../../data/' + fname, 'w')
    f.close()

    # run code with different number of MPI processes
    for num_procs in num_procs_list:
        print('Running code with %2i processes...' % num_procs)
        cmd = (
            'mpirun -np ' + str(num_procs) + ' python playground_parallelization.py ../../../../data/' + fname
        ).split()
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, cwd=cwd)
        # while True:
        #     line = p.stdout.readline()
        #     print(line)
        #     if not line: break
        p.wait()
        assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % (
            p.returncode,
            num_procs,
        )

    # set list of number of parallel steps (odd)
    num_procs_list = [3, 5, 7, 9]

    # set up new/empty file for output
    fname = 'step_6_C2_out.txt'
    f = open(cwd + '/../../../data/' + fname, 'w')
    f.close()

    # run code with different number of MPI processes
    for num_procs in num_procs_list:
        print('Running code with %2i processes...' % num_procs)
        cmd = (
            'mpirun -np ' + str(num_procs) + ' python playground_parallelization.py ../../../../data/' + fname
        ).split()
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, cwd=cwd)
        # while True:
        #     line = p.stdout.readline()
        #     print(line)
        #     if not line: break
        p.wait()
        assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % (
            p.returncode,
            num_procs,
        )


if __name__ == "__main__":
    main('.')
import sys
from pathlib import Path

from mpi4py import MPI

from pySDC.helpers.stats_helper import get_sorted
from pySDC.implementations.controller_classes.controller_MPI import controller_MPI
from pySDC.tutorial.step_6.A_run_non_MPI_controller import set_parameters_ml

if __name__ == "__main__":
    """
    A simple test program to do MPI-parallel PFASST runs
    """

    # set MPI communicator
    comm = MPI.COMM_WORLD

    # get parameters from Part A
    description, controller_params, t0, Tend = set_parameters_ml()

    # instantiate controllers
    controller = controller_MPI(controller_params=controller_params, description=description, comm=comm)
    # get initial values on finest level
    P = controller.S.levels[0].prob
    uinit = P.u_exact(t0)

    # call main functions to get things done...
    uend, stats = controller.run(u0=uinit, t0=t0, Tend=Tend)

    # filter statistics by type (number of iterations)
    iter_counts = get_sorted(stats, type='niter', sortby='time')

    # combine statistics into list of statistics
    iter_counts_list = comm.gather(iter_counts, root=0)

    rank = comm.Get_rank()
    size = comm.Get_size()

    if rank == 0:
        # we'd need to deal with variable file names here (for testing purpose only)
        if len(sys.argv) == 2:
            fname = sys.argv[1]
        else:
            fname = 'step_6_B_out.txt'

        Path("data").mkdir(parents=True, exist_ok=True)
        f = open('data/' + fname, 'a')
        out = 'Working with %2i processes...' % size
        f.write(out + '\n')
        print(out)

        # compute exact solutions and compare with both results
        uex = P.u_exact(Tend)
        err = abs(uex - uend)

        out = 'Error vs. exact solution: %12.8e' % err
        f.write(out + '\n')
        print(out)

        # build one list of statistics instead of list of lists, the sort by time
        iter_counts_gather = [item for sublist in iter_counts_list for item in sublist]
        iter_counts = sorted(iter_counts_gather, key=lambda tup: tup[0])

        # compute and print statistics
        for item in iter_counts:
            out = 'Number of iterations for time %4.2f: %1i ' % (item[0], item[1])
            f.write(out + '\n')
            print(out)

        f.write('\n')
        print()

        assert all(item[1] <= 8 for item in iter_counts), "ERROR: weird iteration counts, got %s" % iter_counts

Results:

Working with  1 processes...
Error vs. exact solution: 2.87300679e-07
Number of iterations for time 0.00: 4 
Number of iterations for time 0.12: 4 
Number of iterations for time 0.25: 3 
Number of iterations for time 0.38: 3 
Number of iterations for time 0.50: 3 
Number of iterations for time 0.62: 3 
Number of iterations for time 0.75: 3 
Number of iterations for time 0.88: 3 

Working with  2 processes...
Error vs. exact solution: 2.87272106e-07
Number of iterations for time 0.00: 4 
Number of iterations for time 0.12: 4 
Number of iterations for time 0.25: 4 
Number of iterations for time 0.38: 4 
Number of iterations for time 0.50: 4 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  4 processes...
Error vs. exact solution: 2.87294206e-07
Number of iterations for time 0.00: 5 
Number of iterations for time 0.12: 5 
Number of iterations for time 0.25: 5 
Number of iterations for time 0.38: 5 
Number of iterations for time 0.50: 5 
Number of iterations for time 0.62: 5 
Number of iterations for time 0.75: 5 
Number of iterations for time 0.88: 5 

Working with  8 processes...
Error vs. exact solution: 2.87290945e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 7 

Working with  3 processes...
Error vs. exact solution: 2.87358935e-07
Number of iterations for time 0.00: 5 
Number of iterations for time 0.12: 5 
Number of iterations for time 0.25: 5 
Number of iterations for time 0.38: 4 
Number of iterations for time 0.50: 4 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  5 processes...
Error vs. exact solution: 2.87358097e-07
Number of iterations for time 0.00: 6 
Number of iterations for time 0.12: 6 
Number of iterations for time 0.25: 6 
Number of iterations for time 0.38: 6 
Number of iterations for time 0.50: 6 
Number of iterations for time 0.62: 4 
Number of iterations for time 0.75: 4 
Number of iterations for time 0.88: 4 

Working with  7 processes...
Error vs. exact solution: 2.87271747e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 3 

Working with  9 processes...
Error vs. exact solution: 2.87290945e-07
Number of iterations for time 0.00: 7 
Number of iterations for time 0.12: 7 
Number of iterations for time 0.25: 7 
Number of iterations for time 0.38: 7 
Number of iterations for time 0.50: 7 
Number of iterations for time 0.62: 7 
Number of iterations for time 0.75: 7 
Number of iterations for time 0.88: 7