Source code for pypint.utilities.threshold_check

# coding=utf-8
"""

.. moduleauthor:: Torbjörn Klatt <[email protected]>
"""
from collections import OrderedDict

import numpy as np

from pypint.solvers.diagnosis import IDiagnosisValue
from pypint.solvers.diagnosis.norms import supremum_norm
from pypint.utilities import assert_condition, func_name
from pypint.utilities.logging import LOG


[docs]class ThresholdCheck(object): """Threshold Checking Handler """ _default_min_threshold = 1e-7 """Default minimum threshold """ _default_max_threshold = 10 """Default maximum threshold """ _default_min_conditions = { 'error reduction': _default_min_threshold, 'solution reduction': _default_min_threshold, 'residual': _default_min_threshold, 'error': _default_min_threshold } _default_max_conditions = { 'iterations': _default_max_threshold }
[docs] def __init__(self, min_threshold=_default_min_threshold, max_threshold=_default_max_threshold, conditions=('solution reduction', 'iterations')): """ Parameters ---------- min_threshold : :py:class:`float` threshold value for minimum criteria max_threshold : :py:class:`int` threshold value for maximum criteria conditions : :py:class:`tuple` of :py:class:`str` Tuple of strings defining the active criteria. Possible values are: * "``error reduction``" * "``solution reduction``" * "``residual``" * "``error``" * "``iterations``" (defaults to: ``('solution reduction', 'iterations')``) """ self._min_threshold = min_threshold self._max_threshold = max_threshold self._conditions = {} self._set_conditions(conditions) self._reason = None
[docs] def check(self, state): """Checks thresholds of given state Parameters ---------- state : :py:class:`.ISolverState` """ self._reason = [] self._check_reduction(state) self._check_minimum('residual', state.current_iteration.final_step.solution.residual) self._check_minimum('error', state.current_iteration.final_step.solution.error) self._check_maximum('iterations', state.current_iteration_index + 1) if len(self._reason) == 0: self._reason = None
[docs] def has_reached(self, log=False, human=False): """Gives list of thresholds reached Parameters ---------- human : :py:class:`bool` if :py:class:`True` returns a human readable string listing reached thresholds (default :py:class:`False`) Returns ------- reached_thresholds : :py:class:`list` or :py:class:`None` :py:class:`list` of matched thresholds or :py:class:`None` if non reached """ if human: return "Threshold condition(s) met: {:s}".format(self._reason) if log: _r = OrderedDict() _r['Threshold condition(s) met'] = ', '.join(self._reason) return _r else: return self._reason
@property
[docs] def min_error_reduction(self): """Read-only accessor for the minimum reduction threshold for the error Returns ------- reduction_threshold : :py:class:`float` or :py:class:`None` :py:class:`None` if reduction of error is not a criteria """ if 'error reduction' in self._conditions: return self._conditions['error reduction'] else: return None
@property
[docs] def min_solution_reduction(self): """Read-only accessor for the minimum reduction threshold for the solution Returns ------- reduction_threshold : :py:class:`float` or :py:class:`None` :py:class:`None` if reduction of solution is not a criteria """ if 'solution reduction' in self._conditions: return self._conditions['solution reduction'] else: return None
@property
[docs] def min_residual(self): """Read-only accessor for the minimum residual threshold Returns ------- residual_threshold : :py:class:`float` or :py:class:`None` :py:class:`None` if residual is not a criteria """ if "residual" in self._conditions: return self._conditions["residual"] else: return None
@property
[docs] def min_error(self): """Read-only accessor for the minimum error threshold Returns ------- error_threshold : :py:class:`float` or :py:class:`None` :py:class:`None` if error is not a criteria """ if "error" in self._conditions: return self._conditions["error"] else: return None
@property
[docs] def max_iterations(self): """Read-only accessor for the maximum iterations threshold Returns ------- iterations_threshold : :py:class:`int` or :py:class:`None` :py:class:`None` if iterations is not a criteria """ if "iterations" in self._conditions: return self._conditions["iterations"] else: return None
[docs] def print_conditions(self): """Pretty-formatted string of all active criteria and their thresholds """ _outstr = "" first = True for cond in self._conditions: if not first: _outstr += ", " if cond in ThresholdCheck._default_min_conditions: _outstr += "{:s}={:.2e}".format(cond, self._conditions[cond]) elif cond in ThresholdCheck._default_max_conditions: _outstr += "{:s}={:d}".format(cond, self._conditions[cond]) first = False return _outstr
def print_lines_for_log(self): _lines = OrderedDict() for _cond in self._conditions: if _cond in ThresholdCheck._default_min_conditions: _lines[_cond] = "{:.0e}".format(self._conditions[_cond]) elif _cond in ThresholdCheck._default_max_conditions: _lines[_cond] = "{:d}".format(self._conditions[_cond]) return _lines
[docs] def compute_reduction(self, state): """Computes the reduction of the error and solution With respect to the supremum nomr of the given state's current iteration (see :py:attr:`.ISolverState.current_iteration` and :py:class:`.IIterationState`). In case no previous iteration is available, it immediatly returns. """ if not state.previous_iteration: # there is no previous iteration to compare with LOG.debug("Skipping computation of reduction: No previous iteration available.") return if state.current_iteration.final_step.solution.error: # error is given; computing reduction of it _previous_error = supremum_norm(state.previous_iteration.final_step.solution.error) _current_error = supremum_norm(state.current_iteration.final_step.solution.error) state.solution.set_error_reduction(state.current_iteration_index, abs((_previous_error - _current_error) / _previous_error * 100)) # computing reduction of solution _previous_solution = supremum_norm(state.previous_iteration.final_step.solution.value) _current_solution = supremum_norm(state.current_iteration.final_step.solution.value) state.solution.set_solution_reduction(state.current_iteration_index, abs((_previous_solution - _current_solution) / _previous_solution * 100))
def _check_reduction(self, state): self.compute_reduction(state) if state.solution.error_reduction(state.current_iteration_index): self._check_minimum('error reduction', state.solution.error_reduction(state.current_iteration_index)) if state.solution.solution_reduction(state.current_iteration_index): self._check_minimum('solution reduction', state.solution.solution_reduction(state.current_iteration_index)) def _check_minimum(self, name, value): self._check("min", name, value) def _check_maximum(self, name, value): self._check("max", name, value) def _check(self, operator, name, value): _value = supremum_norm(value) if isinstance(value, (IDiagnosisValue, np.ndarray)) else value if name in self._conditions and self._conditions[name] is not None: assert_condition(_value is not None, ValueError, message="'{:s}' is a termination condition but not available to check." .format(name[0].capitalize() + name[1:]), checking_obj=self) if operator == "min": if _value <= self._conditions[name]: LOG.debug("Minimum of {:s} reached: {:.2e} <= {:.2e}" .format(name, _value, self._conditions[name])) self._reason.append(name) elif operator == "max": if _value >= self._conditions[name]: LOG.debug("Maximum of {:s} exceeded: {:d} >= {:d}" .format(name, _value, self._conditions[name])) self._reason.append(name) else: raise ValueError("Given operator '{:s}' is invalid.".format(operator)) else: # $name is not a condition pass def _set_conditions(self, conditions): if isinstance(conditions, tuple): for cond in conditions: if cond in ThresholdCheck._default_min_conditions: self._conditions[cond] = self._min_threshold elif cond in ThresholdCheck._default_max_conditions: self._conditions[cond] = self._max_threshold else: raise ValueError(func_name(self) + "Given condition is not supported: {:s}".format(cond)) elif isinstance(conditions, dict): for cond in conditions: if cond in ThresholdCheck._default_min_conditions or \ cond in ThresholdCheck._default_max_conditions: self._conditions[cond] = conditions[cond] else: raise ValueError(func_name(self) + "Given condition is not supported: {:s}".format(cond)) elif conditions is None: pass else: raise ValueError(func_name(self) + "Given conditions can not be parsed: {:s}".format(conditions)) def __str__(self): return "ThresholdCheck(" + self.print_conditions() + ")"