Source code for core.Sweeper

import logging
import warnings

import numpy as np
import scipy as sp
import scipy.linalg
import scipy.optimize as opt

from pySDC.core.Errors import ParameterError
from pySDC.core.Level import level
from pySDC.core.Collocation import CollBase
from pySDC.helpers.pysdc_helper import FrozenClass


# short helper class to add params as attributes
class _Pars(FrozenClass):
    def __init__(self, pars):
        self.do_coll_update = False
        self.initial_guess = 'spread'  # default value (see also below)
        self.skip_residual_computation = ()  # gain performance at the cost of correct residual output

        for k, v in pars.items():
            if k != 'collocation_class':
                setattr(self, k, v)

        self._freeze()


[docs] class sweeper(object): """ Base abstract sweeper class Attributes: logger: custom logger for sweeper-related logging params (__Pars): parameter object containing the custom parameters passed by the user coll (pySDC.Collocation.CollBase): collocation object """ def __init__(self, params): """ Initialization routine for the base sweeper Args: params (dict): parameter object """ # set up logger self.logger = logging.getLogger('sweeper') essential_keys = ['num_nodes'] for key in essential_keys: if key not in params: msg = 'need %s to instantiate step, only got %s' % (key, str(params.keys())) self.logger.error(msg) raise ParameterError(msg) if 'collocation_class' not in params: params['collocation_class'] = CollBase # prepare random generator for initial guess if params.get('initial_guess', 'spread') == 'random': # default value (see also above) params['random_seed'] = params.get('random_seed', 1984) self.rng = np.random.RandomState(params['random_seed']) self.params = _Pars(params) self.coll: CollBase = params['collocation_class'](**params) if not self.coll.right_is_node and not self.params.do_coll_update: self.logger.warning( 'we need to do a collocation update here, since the right end point is not a node. Changing this!' ) self.params.do_coll_update = True # This will be set as soon as the sweeper is instantiated at the level self.__level = None self.parallelizable = False
[docs] def get_Qdelta_implicit(self, coll, qd_type): def rho(x): return max(abs(np.linalg.eigvals(np.eye(m) - np.diag([x[i] for i in range(m)]).dot(coll.Qmat[1:, 1:])))) QDmat = np.zeros(coll.Qmat.shape) if qd_type == 'LU': QT = coll.Qmat[1:, 1:].T [_, _, U] = scipy.linalg.lu(QT, overwrite_a=True) QDmat[1:, 1:] = U.T elif qd_type == 'LU2': QT = coll.Qmat[1:, 1:].T [_, _, U] = scipy.linalg.lu(QT, overwrite_a=True) QDmat[1:, 1:] = 2 * U.T elif qd_type == 'TRAP': for m in range(coll.num_nodes + 1): QDmat[m, 1 : m + 1] = coll.delta_m[0:m] for m in range(coll.num_nodes + 1): QDmat[m, 0:m] += coll.delta_m[0:m] QDmat /= 2.0 elif qd_type == 'IE': for m in range(coll.num_nodes + 1): QDmat[m, 1 : m + 1] = coll.delta_m[0:m] elif qd_type == 'IEpar': for m in range(coll.num_nodes + 1): QDmat[m, m] = np.sum(coll.delta_m[0:m]) self.parallelizable = True elif qd_type == 'Qpar': QDmat = np.diag(np.diag(coll.Qmat)) self.parallelizable = True elif qd_type == 'GS': QDmat = np.tril(coll.Qmat) elif qd_type == 'PIC': QDmat = np.zeros(coll.Qmat.shape) self.parallelizable = True elif qd_type == 'MIN': m = QDmat.shape[0] - 1 x0 = 10 * np.ones(m) d = opt.minimize(rho, x0, method='Nelder-Mead') QDmat[1:, 1:] = np.linalg.inv(np.diag(d.x)) self.parallelizable = True elif qd_type.startswith('MIN-SR-FLEX'): m = QDmat.shape[0] - 1 try: k = abs(int(qd_type[11:])) except ValueError: k = 1 d = min(k, m) QDmat[1:, 1:] = np.diag(coll.nodes) / d self.parallelizable = True elif qd_type in ['MIN_GT', 'MIN-SR-NS']: m = QDmat.shape[0] - 1 QDmat[1:, 1:] = np.diag(coll.nodes) / m self.parallelizable = True elif qd_type == 'MIN3': m = QDmat.shape[0] - 1 x = None # These values have been obtained using Indie Solver, a commercial solver for black-box optimization which # aggregates several state-of-the-art optimization methods (free academic subscription plan) # objective function: sum over 17^2 values of lamdt, real and imaginary (WORKS SURPRISINGLY WELL!) if coll.node_type == 'LEGENDRE' and coll.quad_type == 'LOBATTO': if m == 9: # rho = 0.154786693955 x = [ 0.0, 0.14748983547536937, 0.1243753767395874, 0.08797965969063823, 0.03249792877433364, 0.06171633442251176, 0.08995295998705832, 0.1080641868728824, 0.11621787232558443, ] elif m == 7: # rho = 0.0979351256833 x = [ 0.0, 0.18827968699454273, 0.1307213945012976, 0.04545003319140543, 0.08690617895312261, 0.12326429119922168, 0.13815746843252427, ] elif m == 5: # rho = 0.0513543155235 x = [0.0, 0.2994085231050721, 0.07923154575177252, 0.14338847088077, 0.17675509273708057] elif m == 4: # rho = 0.0381589713397 x = [0.0, 0.2865524188780046, 0.11264992497015984, 0.2583063168320655] elif m == 3: # rho = 0.013592619664 x = [0.0, 0.2113181799416633, 0.3943250920445912] elif m == 2: # rho = 0 x = [0.0, 0.5] else: NotImplementedError( 'This combination of preconditioner, node type and node number is not ' 'implemented' ) elif coll.node_type == 'LEGENDRE' and coll.quad_type == 'RADAU-RIGHT': if m == 9: # rho = 0.151784861385 x = [ 0.14208076083211416, 0.1288153963623986, 0.10608601069476883, 0.07509520272252024, 0.027986167728305308, 0.05351160749903067, 0.07911315989747868, 0.09514844658836666, 0.10204992319487571, ] elif m == 7: # rho = 0.116400161888 x = [ 0.15223871397682717, 0.12625448001038536, 0.08210714764924298, 0.03994434742760019, 0.1052662547386142, 0.14075805578834127, 0.15636085758812895, ] elif m == 5: # rho = 0.0783352996958 (iteration 5355) x = [ 0.2818591930905709, 0.2011358490453793, 0.06274536689514164, 0.11790265267514095, 0.1571629578515223, ] elif m == 4: # rho = 0.057498908343 x = [0.3198786751412953, 0.08887606314792469, 0.1812366328324738, 0.23273925017954] elif m == 3: # rho = 0.038744192979 (iteration 11188) x = [0.3203856825077055, 0.1399680686269595, 0.3716708461097372] elif m == 2: # rho = 0.0208560702294 (iteration 6690) x = [0.2584092406077449, 0.6449261740461826] else: raise NotImplementedError( 'This combination of preconditioner, node type and node number is not implemented' ) elif coll.node_type == 'EQUID' and coll.quad_type == 'RADAU-RIGHT': if m == 9: # rho = 0.251820022583 (iteration 32402) x = [ 0.04067333763109274, 0.06893408176924318, 0.0944460427779633, 0.11847528720123894, 0.14153236351607695, 0.1638856774260845, 0.18569759470199648, 0.20707543960267513, 0.2280946565716198, ] elif m == 7: # rho = 0.184582997611 (iteration 44871) x = [ 0.0582690792096515, 0.09937620459067688, 0.13668728443669567, 0.1719458323664216, 0.20585615258818232, 0.2387890485242656, 0.27096908017041393, ] elif m == 5: # rho = 0.118441339197 (iteration 34581) x = [ 0.0937126798932547, 0.1619131388001843, 0.22442341539247537, 0.28385142992912565, 0.3412523013467262, ] elif m == 4: # rho = 0.0844043254542 (iteration 33099) x = [0.13194852204686872, 0.2296718892453916, 0.3197255970017318, 0.405619746972393] elif m == 3: # rho = 0.0504635143866 (iteration 9783) x = [0.2046955744931575, 0.3595744268324041, 0.5032243650307717] elif m == 2: # rho = 0.0214806480623 (iteration 6109) x = [0.3749891032632652, 0.6666472946796036] else: NotImplementedError( 'This combination of preconditioner, node type and node number is not ' 'implemented' ) else: NotImplementedError( 'This combination of preconditioner, node type and node number is not ' 'implemented' ) QDmat[1:, 1:] = np.diag(x) self.parallelizable = True elif qd_type == "MIN-SR-S": M = QDmat.shape[0] - 1 quadType = coll.quad_type nodeType = coll.node_type # Main function to compute coefficients def computeCoeffs(M, a=None, b=None): """ Compute diagonal coefficients for a given number of nodes M. If `a` and `b` are given, then it uses as initial guess: >>> a * nodes**b / M If `a` is not given, then do not care about `b` and uses as initial guess: >>> nodes / M Parameters ---------- M : int Number of collocation nodes. a : float, optional `a` coefficient for the initial guess. b : float, optional `b` coefficient for the initial guess. Returns ------- coeffs : array The diagonal coefficients. nodes : array The nodes associated to the current coefficients. """ collM = CollBase(num_nodes=M, node_type=nodeType, quad_type=quadType) QM = collM.Qmat[1:, 1:] nodesM = collM.nodes if quadType in ['LOBATTO', 'RADAU-LEFT']: QM = QM[1:, 1:] nodesM = nodesM[1:] nCoeffs = len(nodesM) if nCoeffs == 1: coeffs = np.diag(QM) else: def nilpotency(coeffs): """Function verifying the nilpotency from given coefficients""" coeffs = np.asarray(coeffs) kMats = [(1 - z) * np.eye(nCoeffs) + z * np.diag(1 / coeffs) @ QM for z in nodesM] vals = [np.linalg.det(K) - 1 for K in kMats] return np.array(vals) if a is None: coeffs0 = nodesM / M else: coeffs0 = a * nodesM**b / M with warnings.catch_warnings(): warnings.simplefilter("ignore") coeffs = sp.optimize.fsolve(nilpotency, coeffs0, xtol=1e-15) # Handle first node equal to zero if quadType in ['LOBATTO', 'RADAU-LEFT']: coeffs = np.asarray([0.0] + list(coeffs)) nodesM = np.asarray([0.0] + list(nodesM)) return coeffs, nodesM def fit(coeffs, nodes): """Function fitting given coefficients to a power law""" def lawDiff(ab): a, b = ab return np.linalg.norm(a * nodes**b - coeffs) sol = sp.optimize.minimize(lawDiff, [1.0, 1.0], method="nelder-mead") return sol.x # Compute coefficients incrementaly a, b = None, None m0 = 2 if quadType in ['LOBATTO', 'RADAU-LEFT'] else 1 for m in range(m0, M + 1): coeffs, nodes = computeCoeffs(m, a, b) if m > 1: a, b = fit(coeffs * m, nodes) QDmat[1:, 1:] = np.diag(coeffs) self.parallelizable = True elif qd_type == "VDHS": # coefficients from Van der Houwen & Sommeijer, 1991 m = QDmat.shape[0] - 1 if m == 4 and coll.node_type == 'LEGENDRE' and coll.quad_type == "RADAU-RIGHT": coeffs = [3055 / 9532, 531 / 5956, 1471 / 8094, 1848 / 7919] else: raise NotImplementedError('no VDHS diagonal coefficients for this node configuration') QDmat[1:, 1:] = np.diag(coeffs) self.parallelizable = True else: # see if an explicit preconditioner with this name is available try: QDmat = self.get_Qdelta_explicit(coll, qd_type) self.logger.warning(f'Using explicit preconditioner \"{qd_type}\" on the left hand side!') except NotImplementedError: raise NotImplementedError(f'qd_type implicit "{qd_type}" not implemented') # check if we got not more than a lower triangular matrix # TODO : this should be a regression test, not run-time ... np.testing.assert_array_equal( np.triu(QDmat, k=1), np.zeros(QDmat.shape), err_msg='Lower triangular matrix expected!' ) return QDmat
[docs] def get_Qdelta_explicit(self, coll, qd_type): QDmat = np.zeros(coll.Qmat.shape) if qd_type == 'EE': for m in range(self.coll.num_nodes + 1): QDmat[m, 0:m] = self.coll.delta_m[0:m] elif qd_type == 'GS': QDmat = np.tril(self.coll.Qmat, k=-1) elif qd_type == 'PIC': QDmat = np.zeros(coll.Qmat.shape) else: raise NotImplementedError('qd_type explicit not implemented') # check if we got not more than a lower triangular matrix np.testing.assert_array_equal( np.triu(QDmat, k=0), np.zeros(QDmat.shape), err_msg='Strictly lower triangular matrix expected!' ) return QDmat
[docs] def predict(self): """ Predictor to fill values at nodes before first sweep Default prediction for the sweepers, only copies the values to all collocation nodes and evaluates the RHS of the ODE there """ # get current level and problem description L = self.level P = L.prob # evaluate RHS at left point L.f[0] = P.eval_f(L.u[0], L.time) for m in range(1, self.coll.num_nodes + 1): # copy u[0] to all collocation nodes, evaluate RHS if self.params.initial_guess == 'spread': L.u[m] = P.dtype_u(L.u[0]) L.f[m] = P.eval_f(L.u[m], L.time + L.dt * self.coll.nodes[m - 1]) # copy u[0] and RHS evaluation to all collocation nodes elif self.params.initial_guess == 'copy': L.u[m] = P.dtype_u(L.u[0]) L.f[m] = P.dtype_f(L.f[0]) # start with zero everywhere elif self.params.initial_guess == 'zero': L.u[m] = P.dtype_u(init=P.init, val=0.0) L.f[m] = P.dtype_f(init=P.init, val=0.0) # start with random initial guess elif self.params.initial_guess == 'random': L.u[m] = P.dtype_u(init=P.init, val=self.rng.rand(1)[0]) L.f[m] = P.dtype_f(init=P.init, val=self.rng.rand(1)[0]) else: raise ParameterError(f'initial_guess option {self.params.initial_guess} not implemented') # indicate that this level is now ready for sweeps L.status.unlocked = True L.status.updated = True
[docs] def compute_residual(self, stage=''): """ Computation of the residual using the collocation matrix Q Args: stage (str): The current stage of the step the level belongs to """ # get current level and problem description L = self.level # Check if we want to skip the residual computation to gain performance # Keep in mind that skipping any residual computation is likely to give incorrect outputs of the residual! if stage in self.params.skip_residual_computation: L.status.residual = 0.0 if L.status.residual is None else L.status.residual return None # check if there are new values (e.g. from a sweep) # assert L.status.updated # compute the residual for each node # build QF(u) res_norm = [] res = self.integrate() for m in range(self.coll.num_nodes): res[m] += L.u[0] - L.u[m + 1] # add tau if associated if L.tau[m] is not None: res[m] += L.tau[m] # use abs function from data type here res_norm.append(abs(res[m])) # find maximal residual over the nodes if L.params.residual_type == 'full_abs': L.status.residual = max(res_norm) elif L.params.residual_type == 'last_abs': L.status.residual = res_norm[-1] elif L.params.residual_type == 'full_rel': L.status.residual = max(res_norm) / abs(L.u[0]) elif L.params.residual_type == 'last_rel': L.status.residual = res_norm[-1] / abs(L.u[0]) else: raise ParameterError( f'residual_type = {L.params.residual_type} not implemented, choose ' f'full_abs, last_abs, full_rel or last_rel instead' ) # indicate that the residual has seen the new values L.status.updated = False return None
[docs] def compute_end_point(self): """ Abstract interface to end-node computation """ raise NotImplementedError('ERROR: sweeper has to implement compute_end_point(self)')
[docs] def integrate(self): """ Abstract interface to right-hand side integration """ raise NotImplementedError('ERROR: sweeper has to implement integrate(self)')
[docs] def update_nodes(self): """ Abstract interface to node update """ raise NotImplementedError('ERROR: sweeper has to implement update_nodes(self)')
@property def level(self): """ Returns the current level Returns: pySDC.Level.level: the current level """ return self.__level @level.setter def level(self, L): """ Sets a reference to the current level (done in the initialization of the level) Args: L (pySDC.Level.level): current level """ assert isinstance(L, level) self.__level = L @property def rank(self): return 0