Source code for pypint.solvers.parallel_sdc

# coding=utf-8
"""

.. moduleauthor:: Torbjörn Klatt <[email protected]>
"""
from copy import deepcopy
import warnings as warnings
from collections import OrderedDict

import numpy as np

from pypint.solvers.i_iterative_time_solver import IIterativeTimeSolver
from pypint.solvers.i_parallel_solver import IParallelSolver
from pypint.communicators.message import Message
from pypint.integrators.integrator_base import IntegratorBase
from pypint.integrators.node_providers.gauss_lobatto_nodes import GaussLobattoNodes
from pypint.integrators.weight_function_providers.polynomial_weight_function import PolynomialWeightFunction
from pypint.problems import IInitialValueProblem, problem_has_exact_solution
from pypint.solvers.states.sdc_solver_state import SdcSolverState
from pypint.solvers.diagnosis import IDiagnosisValue
from pypint.solvers.diagnosis.norms import supremum_norm
from pypint.plugins.timers.timer_base import TimerBase
from pypint.utilities.threshold_check import ThresholdCheck
from pypint.utilities import assert_is_instance, assert_condition, func_name, assert_named_argument
from pypint.utilities.logging import *


# General Notes on Implementation
# ===============================
#
# Names and Meaning of Indices
# ----------------------------
# T_max (num_time_steps) | number of time steps
# N     (num_nodes)      | number of integration nodes per time step
# t                      | index of current time step; interval: [0, T_max)
# n                      | index of current node of current time step; interval: [1, N)
#                        |  the current node is always the next node, i.e. the node we are
#                        |  calculating the value for
# i                      | index of current point in continuous array of points


[docs]class ParallelSdc(IIterativeTimeSolver, IParallelSolver): """*Spectral Deferred Corrections* method for solving first order ODEs. The *Spectral Deferred Corrections* (SDC) method is described in [Minion2003]_ (Equation 2.7) Default Values: * :py:class:`.ThresholdCheck` * ``max_threshold``: 10 * ``min_threshold``: 1e-7 * ``conditions``: ``('residual', 'iterations')`` * :py:attr:`.num_time_steps`: 1 * :py:attr:`.num_nodes`: 3 Given the total number of time steps :math:`T_{max}`, number of integration nodes per time step :math:`N`, current time step :math:`t \\in [0,T_{max})` and the next integration node to consider :math:`n \\in [0, N)`. Let :math:`[a,b]` be the total time interval to integrate over. For :math:`T_{max}=3` and :math:`N=4`, this can be visualized as:: a b | | | . . | . . | . . | t 0 0 0 0 1 1 1 2 2 2 n 0 1 2 3 1 2 3 1 2 3 i 0 1 2 3 4 5 6 7 8 9 In general, the value at :math:`a` (i.e. :math:`t=n=i=0`) is the initial value. See Also -------- :py:class:`.IIterativeTimeSolver` : implemented interface :py:class:`.IParallelSolver` : mixed-in interface """ def __init__(self, **kwargs): super(ParallelSdc, self).__init__(**kwargs) IParallelSolver.__init__(self, **kwargs) del self._state self.threshold = ThresholdCheck(min_threshold=1e-7, max_threshold=10, conditions=("residual", "iterations")) self.timer = TimerBase() self._num_time_steps = 1 self._dt = 0.0 self._deltas = { 't': 0.0, 'n': np.zeros(0) } self._classic = True self.__nodes_type = GaussLobattoNodes self.__weights_type = PolynomialWeightFunction self.__num_nodes = 3 self.__exact = np.zeros(0) self.__time_points = { 'steps': np.zeros(0), 'nodes': np.zeros(0) }
[docs] def init(self, problem, integrator, **kwargs): """Initializes SDC solver with given problem and integrator. Parameters ---------- num_time_steps : :py:class:`int` Number of time steps to be used within the time interval of the problem. num_nodes : :py:class:`int` *(otional)* number of nodes per time step nodes_type : :py:class:`.INodes` *(optional)* Type of integration nodes to be used (class name, **NOT instance**). weights_type : :py:class:`.IWeightFunction` *(optional)* Integration weights function to be used (class name, **NOT instance**). classic : :py:class:`bool` *(optional)* Flag for specifying the type of the SDC sweep. :py:class:`True`: *(default)* For the classic SDC as known from the literature; :py:class:`False`: For the modified SDC as developed by Torbjörn Klatt. Raises ------ ValueError : * if given problem is not an :py:class:`.IInitialValueProblem` * if number of nodes per time step is not given; neither through ``num_nodes``, ``nodes_type`` nor ``integrator`` See Also -------- :py:meth:`.IIterativeTimeSolver.init` overridden method (with further parameters) :py:meth:`.IParallelSolver.init` mixed in overridden method (with further parameters) """ assert_is_instance(problem, IInitialValueProblem, descriptor="Initial Value Problem", checking_obj=self) assert_condition(issubclass(integrator, IntegratorBase), ValueError, message="Integrator must be an IntegratorBase: NOT %s" % integrator.__mro__[-2].__name__, checking_obj=self) super(ParallelSdc, self).init(problem, integrator=integrator, **kwargs) if 'num_time_steps' in kwargs: self._num_time_steps = kwargs['num_time_steps'] if 'num_nodes' in kwargs: self.__num_nodes = kwargs['num_nodes'] elif 'nodes_type' in kwargs and kwargs['nodes_type'].num_nodes is not None: self.__num_nodes = kwargs['nodes_type'].num_nodes elif integrator.nodes_type is not None and integrator.nodes_type.num_nodes is not None: self.__num_nodes = integrator.nodes_type.num_nodes else: raise ValueError(func_name(self) + "Number of nodes per time step not given.") if 'notes_type' in kwargs: self.__nodes_type = kwargs['notes_type'] if 'weights_type' in kwargs: self.__weights_type = kwargs['weights_type'] if 'classic' in kwargs: assert_is_instance(kwargs['classic'], bool, descriptor="Classic Flag", checking_obj=self) self._classic = kwargs['classic'] # TODO: need to store the exact solution somewhere else self.__exact = np.zeros(self.num_time_steps * (self.__num_nodes - 1) + 1, dtype=np.object)
[docs] def run(self, core, **kwargs): """Applies SDC solver to the initialized problem setup. Solves the given problem with the explicit SDC algorithm. Parameters ---------- core : :py:class:`.SdcSolverCore` core solver stepping method dt : :py:class:`float` width of the interval to work on; this is devided into the number of given time steps this solver has been initialized with See Also -------- :py:meth:`.IIterativeTimeSolver.run` : overridden method """ super(ParallelSdc, self).run(core, **kwargs) assert_named_argument('dt', kwargs, types=float, descriptor="Width of Interval", checking_obj=self) self._dt = kwargs['dt'] self._print_header() # start iterations # TODO: exact solution storage handling self.__exact[0] = self.problem.initial_value _has_work = True _previous_flag = Message.SolverFlag.none _current_flag = Message.SolverFlag.none __work_loop_count = 1 while _has_work: LOG.debug("Work Loop: %d" % __work_loop_count) _previous_flag = _current_flag _current_flag = Message.SolverFlag.none # receive dedicated message _msg = self._communicator.receive() if _msg.flag == Message.SolverFlag.failed: # previous solver failed # --> pass on the failure and abort _current_flag = Message.SolverFlag.failed _has_work = False LOG.debug("Previous Solver Failed") else: if _msg.flag == Message.SolverFlag.time_adjusted: # the previous solver has adjusted its interval # --> we need to recompute our interval _current_flag = self._adjust_interval_width() # we don't immediately start the computation of the newly computed interval # but try to pass the new interval end to the next solver as soon as possible # (this should avoid throwing away useless computation) LOG.debug("Previous Solver Adjusted Time") else: if _previous_flag in \ [Message.SolverFlag.none, Message.SolverFlag.converged, Message.SolverFlag.finished, Message.SolverFlag.time_adjusted]: # we just started or finished our previous interval # --> start a new interval _has_work = self._init_new_interval(_msg.time_point) if _has_work: # set initial values self.state.initial.solution.value = _msg.value.copy() self.state.initial.solution.time_point = _msg.time_point self.state.initial.done() LOG.debug("New Interval Initialized") # start logging output self._print_interval_header() # start global timing (per interval) self.timer.start() else: # pass LOG.debug("No New Interval Available") elif _previous_flag == Message.SolverFlag.iterating: LOG.debug("Next Iteration") else: LOG.warn("WARNING!!! Something went wrong here") if _has_work: # we are still on the same interval or have just successfully initialized a new interval # --> do the real computation LOG.debug("Starting New Solver Main Loop") # initialize a new iteration state self.state.proceed() if _msg.time_point == self.state.initial.time_point: if _previous_flag == Message.SolverFlag.iterating: LOG.debug("Updating initial value") # if the previous solver has a new initial value for us, we use it self.state.current_iteration.initial.solution.value = _msg.value.copy() _current_flag = self._main_solver_loop() if _current_flag in \ [Message.SolverFlag.converged, Message.SolverFlag.finished, Message.SolverFlag.failed]: _log_msgs = {'': OrderedDict()} if self.state.last_iteration_index <= self.threshold.max_iterations: _group = 'Converged after %d iteration(s)' % (self.state.last_iteration_index + 1) _log_msgs[''][_group] = OrderedDict() _log_msgs[''][_group] = self.threshold.has_reached(log=True) _log_msgs[''][_group]['Final Residual'] = "{:.3e}"\ .format(supremum_norm(self.state.last_iteration.final_step.solution.residual)) _log_msgs[''][_group]['Solution Reduction'] = "{:.3e}"\ .format(supremum_norm(self.state.solution .solution_reduction(self.state.last_iteration_index))) if problem_has_exact_solution(self.problem, self): _log_msgs[''][_group]['Error Reduction'] = "{:.3e}"\ .format(supremum_norm(self.state.solution .error_reduction(self.state.last_iteration_index))) else: warnings.warn("{}: Did not converged: {:s}".format(self._core.name, self.problem)) _group = "FAILED: After maximum of {:d} iteration(s)"\ .format(self.state.last_iteration_index + 1) _log_msgs[''][_group] = OrderedDict() _log_msgs[''][_group]['Final Residual'] = "{:.3e}"\ .format(supremum_norm(self.state.last_iteration.final_step.solution.residual)) _log_msgs[''][_group]['Solution Reduction'] = "{:.3e}"\ .format(supremum_norm(self.state.solution .solution_reduction(self.state.last_iteration_index))) if problem_has_exact_solution(self.problem, self): _log_msgs[''][_group]['Error Reduction'] = "{:.3e}"\ .format(supremum_norm(self.state.solution .error_reduction(self.state.last_iteration_index))) LOG.warn(" {} Failed: Maximum number iterations reached without convergence." .format(self._core.name)) print_logging_message_tree(_log_msgs) elif _previous_flag in [Message.SolverFlag.converged, Message.SolverFlag.finished]: LOG.debug("Solver Finished.") self.timer.stop() self._print_footer() else: # something went wrong # --> we failed LOG.warn("Solver failed.") _current_flag = Message.SolverFlag.failed self._communicator.send(value=self.state.current_iteration.final_step.solution.value, time_point=self.state.current_iteration.final_step.time_point, flag=_current_flag) __work_loop_count += 1 # end while:has_work is None LOG.debug("Solver Main Loop Done") return [_s.solution for _s in self._states]
@property
[docs] def state(self): """Read-only accessor for the sovler's state Returns ------- state : :py:class:`.ISolverState` """ if len(self._states) > 0: return self._states[-1] else: return None
@property
[docs] def num_time_steps(self): """Accessor for the number of time steps within the interval. Returns ------- number_time_steps : :py:class:`int` Number of time steps within the problem-given time interval. """ return self._num_time_steps
@property
[docs] def num_nodes(self): """Accessor for the number of integration nodes per time step. Returns ------- number_of_nodes : :py:class:`int` Number of integration nodes used within one time step. """ return self.__num_nodes
@property
[docs] def classic(self): """Read-only accessor for the type of SDC Returns ------- is_classic : :py:class:`bool` :py:class:`True` if it's the classic SDC as known from papers; :py:class:`False` if it's the modified SDC by Torbjörn Klatt """ return self._classic
[docs] def _init_new_state(self): """Initialize a new state for a work task Usually, this starts a new work task. The previous state, if applicable, is stored in a stack. """ if self.state: # finalize the current state self.state.finalize() # initialize solver state self._states.append(SdcSolverState(num_nodes=self.num_nodes - 1, num_time_steps=self.num_time_steps))
[docs] def _init_new_interval(self, start): """Initializes a new work interval Parameters ---------- start : :py:class:`float` start point of new interval Returns ------- has_work : :py:class:`bool` :py:class:`True` if new interval have been initialized; :py:class:`False` if no new interval have been initialized (i.e. new interval end would exceed end of time given by problem) """ assert_is_instance(start, float, descriptor="Time Point", checking_obj=self) if start + self._dt > self.problem.time_end: return False if self.state and start == self.state.initial.time_point: return False self._init_new_state() # set width of current interval self.state.delta_interval = self._dt # compute time step and node distances self._deltas['t'] = self.state.delta_interval / self.num_time_steps # width of a single time step (equidistant) # start time points of time steps self.__time_points['steps'] = np.linspace(start, start + self._dt, self.num_time_steps + 1) # initialize and transform integrator for time step width self._integrator.init(self.__nodes_type, self.__num_nodes, self.__weights_type, interval=np.array([self.__time_points['steps'][0], self.__time_points['steps'][1]], dtype=np.float)) self.__time_points['nodes'] = np.zeros((self.num_time_steps, self.num_nodes), dtype=np.float) _deltas_n = np.zeros(self.num_time_steps * (self.num_nodes - 1) + 1) # copy the node provider so we do not alter the integrator's one _nodes = deepcopy(self._integrator.nodes_type) for _t in range(0, self.num_time_steps): # transform Nodes (copy) onto new time step for retrieving actual integration nodes _nodes.interval = np.array([self.__time_points['steps'][_t], self.__time_points['steps'][_t + 1]]) self.__time_points['nodes'][_t] = _nodes.nodes.copy() for _n in range(0, self.num_nodes - 1): _i = _t * (self.num_nodes - 1) + _n _deltas_n[_i + 1] = _nodes.nodes[_n + 1] - _nodes.nodes[_n] self._deltas['n'] = _deltas_n[1:].copy() return True
[docs] def _adjust_interval_width(self): """Adjust width of time interval """ raise NotImplementedError("Time Adaptivity not yet implemented.") # return Message.SolverFlag.time_adjusted
def _main_solver_loop(self): # initialize iteration timer of same type as global timer _iter_timer = self.timer.__class__() self._print_iteration(self.state.current_iteration_index + 1) # iterate on time steps _iter_timer.start() for _current_time_step in self.state.current_iteration: # run this time step self._time_step() if self.state.current_time_step_index < len(self.state.current_iteration) - 1: self.state.current_iteration.proceed() _iter_timer.stop() # check termination criteria self.threshold.check(self.state) # log this iteration's summary if self.state.is_first_iteration: # on first iteration we do not have comparison values self._print_iteration_end(None, None, None, _iter_timer.past()) else: if problem_has_exact_solution(self.problem, self) and not self.state.is_first_iteration: # we could compute the correct error of our current solution self._print_iteration_end(self.state.solution.solution_reduction(self.state.current_iteration_index), self.state.solution.error_reduction(self.state.current_iteration_index), self.state.current_step.solution.residual, _iter_timer.past()) else: self._print_iteration_end(self.state.solution.solution_reduction(self.state.current_iteration_index), None, self.state.current_step.solution.residual, _iter_timer.past()) # finalize this iteration (i.e. TrajectorySolutionData.finalize()) self.state.current_iteration.finalize() _reason = self.threshold.has_reached() if _reason is None: # LOG.debug("solver main loop done: no reason") return Message.SolverFlag.iterating elif _reason == ['iterations']: # LOG.debug("solver main loop done: iterations") self.state.finalize() return Message.SolverFlag.finished else: # LOG.debug("solver main loop done: other") self.state.finalize() return Message.SolverFlag.converged def _time_step(self): self.state.current_time_step.delta_time_step = self._deltas['t'] for _step in range(0, len(self.state.current_time_step)): _node_index = self.state.current_time_step_index * (self.num_nodes - 1) + _step self.state.current_time_step[_step].delta_tau = self._deltas['n'][_node_index] self.state.current_time_step[_step].solution.time_point = \ self.__time_points['nodes'][self.state.current_time_step_index][_step + 1] self._print_time_step(self.state.current_time_step_index + 1, self.state.current_time_step.initial.time_point, self.state.current_time_step.last.time_point, self.state.current_time_step.delta_time_step) # for classic SDC compute integral _integral = 0.0 _integrate_values = None if self.classic: if not self.state.current_time_step.initial.rhs_evaluated: self.state.current_time_step.initial.rhs = \ self.problem.evaluate_wrt_time(self.state.current_time_step.initial.time_point, self.state.current_time_step.initial.value) _integrate_values = np.array([self.state.current_time_step.initial.rhs], dtype=self.problem.numeric_type) for _step_index in range(0, len(self.state.current_time_step)): if self.state.is_first_iteration: _integrate_values = \ np.append(_integrate_values, np.array([self.state.current_time_step.initial.rhs], dtype=self.problem.numeric_type), axis=0) else: _step = self.state.previous_iteration[self.state.current_time_step_index][_step_index] if not _step.rhs_evaluated: _step.rhs = self.problem.evaluate_wrt_time(_step.time_point, _step.value) _integrate_values = \ np.append(_integrate_values, np.array([_step.rhs], dtype=self.problem.numeric_type), axis=0) assert_condition(_integrate_values.shape[0] == self.num_nodes, ValueError, message="Number of integration values not correct: {:d} != {:d}" .format(_integrate_values.shape[0], self.num_nodes), checking_obj=self) _full_integral = 0.0 # do the actual SDC steps of this SDC sweep for _step_index in range(0, len(self.state.current_time_step)): _current_step = self.state.current_time_step[_step_index] if self.classic: _integral = self._integrator.evaluate(_integrate_values, from_node=_step_index, target_node=_step_index + 1) # we successively compute the full integral, which is used for the residual at the end _full_integral += _integral _current_step.integral = _integral.copy() # do the SDC step of this sweep self._sdc_step() if self.state.current_step_index < len(self.state.current_time_step) - 1: self.state.current_time_step.proceed() del _integrate_values # compute residual and print step details for _step_index in range(0, len(self.state.current_time_step)): _step = self.state.current_time_step[_step_index] self._core.compute_residual(self.state, step=_step, integral=_full_integral) # finalize this step (i.e. StepSolutionData.finalize()) _step.done() if _step_index > 0: _previous_time = self.state.current_time_step[_step_index - 1].time_point else: _previous_time = self.state.current_time_step.initial.time_point if problem_has_exact_solution(self.problem, self): self._print_step(_step_index + 2, _previous_time, _step.time_point, supremum_norm(_step.value), _step.solution.residual, _step.solution.error) else: self._print_step(_step_index + 2, _previous_time, _step.time_point, supremum_norm(_step.value), _step.solution.residual, None) self._print_time_step_end() # finalizing the current time step (i.e. TrajectorySolutionData.finalize) self.state.current_time_step.finalize() def _sdc_step(self): # helper variables _current_time_step_index = self.state.current_time_step_index _current_step_index = self.state.current_step_index # copy solution of previous iteration to this one if self.state.is_first_iteration: self.state.current_step.value = self.state.initial.value.copy() else: self.state.current_step.value = \ self.state.previous_iteration[_current_time_step_index][_current_step_index].value.copy() # TODO: review the custom modification # if not self.classic: # # gather values for integration and evaluate problem at given points # # initial value for this time step # _integrate_values = \ # np.array( # [self.problem.evaluate_wrt_time(self.state.current_time_step.initial.time_point, # self.state.current_time_step.initial.value.copy()) # ], dtype=self.problem.numeric_type) # # if _current_step_index > 0: # # values from this iteration (already calculated) # _from_current_iteration_range = range(0, _current_step_index) # for _index in _from_current_iteration_range: # _integrate_values = \ # np.append(_integrate_values, # np.array( # [self.problem.evaluate_wrt_time(self.state.current_time_step[_index].solution.time_point, # self.state.current_time_step[_index].solution.value.copy()) # ], dtype=self.problem.numeric_type # ), axis=0) # # # values from previous iteration # _from_previous_iteration_range = range(_current_step_index, self.num_nodes - 1) # for _index in _from_previous_iteration_range: # if self.state.is_first_iteration: # _this_value = self.problem.initial_value # else: # _this_value = self.state.previous_iteration[_current_time_step_index][_index].solution.value.copy() # _integrate_values = \ # np.append(_integrate_values, # np.array( # [self.problem.evaluate_wrt_time(self.state.current_time_step[_index].solution.time_point, # _this_value) # ], dtype=self.problem.numeric_type # ), axis=0) # assert_condition(_integrate_values.shape[0] == self.num_nodes, # ValueError, message="Number of integration values not correct: {:d} != {:d}" # .format(_integrate_values.shape[0], self.num_nodes), # checking_obj=self) # # # integrate # self.state.current_step.integral = self._integrator.evaluate(_integrate_values, # from_node=_current_step_index, # target_node=_current_step_index + 1) # del _integrate_values # # END if not self.classic # compute step self._core.run(self.state, problem=self.problem) # calculate error self._core.compute_error(self.state, problem=self.problem) # step gets finalized after computation of residual def print_lines_for_log(self): _lines = super(ParallelSdc, self).print_lines_for_log() if 'Number Nodes per Time Step' not in _lines['Integrator']: _lines['Integrator']['Number Nodes per Time Step'] = "%d" % self.__num_nodes if 'Number Time Steps' not in _lines['Integrator']: _lines['Integrator']['Number Time Steps'] = "%d" % self._num_time_steps return _lines def _print_interval_header(self): LOG.info("%s%s" % (VERBOSITY_LVL1, SEPARATOR_LVL3)) LOG.info("{} Interval: [{:.3f}, {:.3f}]" .format(VERBOSITY_LVL1, self.state.initial.time_point, self.state.initial.time_point + self._dt)) self._print_output_tree_header() def _print_output_tree_header(self): LOG.info("%s iter" % VERBOSITY_LVL1) LOG.info("%s \\" % VERBOSITY_LVL2) LOG.info("%s |- time start end delta" % VERBOSITY_LVL2) LOG.info("%s | \\" % VERBOSITY_LVL3) LOG.info("%s | |- step t_0 t_1 phi(t_1) resid err" % VERBOSITY_LVL3) LOG.info("%s | \\_" % VERBOSITY_LVL2) LOG.info("%s \\_ sol r.red err r.red resid time" % VERBOSITY_LVL1) def _print_iteration(self, _iter): _iter = self._output_format(_iter, 'int', width=5) LOG.info("%s %s" % (VERBOSITY_LVL1, _iter)) LOG.info("%s \\" % VERBOSITY_LVL2) def _print_iteration_end(self, solred, errred, resid, time): _solred = self._output_format(solred, 'exp') _errred = self._output_format(errred, 'exp') _resid = self._output_format(resid, 'exp') _time = self._output_format(time, 'float', width=6.3) LOG.info("%s \\_ %s %s %s %s" % (VERBOSITY_LVL1, _solred, _errred, _resid, _time)) def _print_time_step(self, time_step, start, end, delta): _time_step = self._output_format(time_step, 'int', width=3) _start = self._output_format(start, 'float', width=6.3) _end = self._output_format(end, 'float', width=6.3) _delta = self._output_format(delta, 'float', width=6.3) LOG.info("%s |- %s %s %s %s" % (VERBOSITY_LVL2, _time_step, _start, _end, _delta)) LOG.info("%s | \\" % VERBOSITY_LVL3) self._print_step(1, None, self.state.current_time_step.initial.time_point, supremum_norm(self.state.current_time_step.initial.solution.value), None, None) def _print_time_step_end(self): LOG.info("%s | \\_" % VERBOSITY_LVL2) def _print_step(self, step, t0, t1, phi, resid, err): _step = self._output_format(step, 'int', width=2) _t0 = self._output_format(t0, 'float', width=6.3) _t1 = self._output_format(t1, 'float', width=6.3) _phi = self._output_format(phi, 'float', width=6.3) _resid = self._output_format(resid, 'exp') _err = self._output_format(err, 'exp') LOG.info("%s | |- %s %s %s %s %s %s" % (VERBOSITY_LVL3, _step, _t0, _t1, _phi, _resid, _err)) def _output_format(self, value, _type, width=None): def _value_to_numeric(val): if isinstance(val, (np.ndarray, IDiagnosisValue)): return supremum_norm(val) else: return val if _type and width is None: if _type == 'float': width = 10.3 elif _type == 'int': width = 10 elif _type == 'exp': width = 9.2 else: width = 10 if value is None: _outstr = "{: ^{width}s}".format('na', width=int(width)) else: if _type == 'float': _outstr = "{: {width}f}".format(_value_to_numeric(value), width=width) elif _type == 'int': _outstr = "{: {width}d}".format(_value_to_numeric(value), width=width) elif _type == 'exp': _outstr = "{: {width}e}".format(_value_to_numeric(value), width=width) else: _outstr = "{: >{width}s}".format(value, width=width) return _outstr
__all__ = ['ParallelSdc']