Source code for mmf.solve.solver_interface

r"""
===================
 Iterative solvers
===================
This module contains the interface for solving root-finding and.or
fixed-point problems of the forms::

    0 == G(x)
    x == F(x)

These can be related through the relationship `F(x) = x - G(x)`, 
however, both forms have their use:

1) The `0 == G(x)` form makes the objective function explicit and may
   be useful for specifying tolerances.  One also may have an
   expression in terms of `G` for calculating a good step (for
   example, if the Jacobian can be calculated).  Thus, the function
   :meth:`Problem.G` returns a pair `(G, dx)` containing a suggested
   step (if it can be calculated).

   This form also implicitly suggests that a good fixed-point
   iteration may not be known (which is why we always include a
   suggested step `dx`).
2) The `x == F(x)` form naturally suggests an iteration with `dx = x -
   F(x)` and so :meth:`Problem.F` returns only the next step.

   If present, this form is used to form the objective function `G(x)
   = x - F(x)` by memory-limited quasi-Newton methods such as the
   Broyden iteration which stores the inverse Jacobian as a matrix of
   the form `B = I + D` where `D` is rank deficient.  The ideally
   convergent iteration `F(x) = x_sol` (always converging in a single
   step) would have $\partial F = 0$, hence `D = 0` and so this is
   ideally captured by the rank deficient approximate Jacobian.
These interfaces address the following functionality goals:

- Provide a history of the state of a solution so that the
  calculation can be reproduced.
- Allow for interaction with the solver, such as plotting/displaying
  intermediate results, prematurely terminating the solver, and
  possibly even adjusting the solution during the solution.
- Error recovery and reproduction.

The interface is defined by three main interfaces:

.. autosummary::
   :nosignatures:

   IState
   IProblem
   ISolver

The general usage is something like::

  problem = Problem(...)
  solver = Solver(...)
  state, niter = solver.solve(problem)
  new_state, niter = solver.solve(problem, state)

  # Archive
  file = open(filename, 'w')
  file.write(new_state.get_history())
  file.close

  ...
  # Restore
  env = {}
  execfile(filename, env)
  state = env['state']

Problems and solvers are considered independent with the state
information being stored in the return state object.

:class:`IState`
  Specifies :attr:`IState.x_`, the current state of the solution.  The
  abstraction employed here is that a state may be the result of three
  "transformations":
    
  1. Initialization from a :meth:`IProblem.get_initial_state()` call.
  2. Initialization from direct assignment of `x`.
  3. Transformation from a previous state by a :class:`ISolver` object.

  The latter keeps track of the history of the state through the
  previous transformations/initializations so that the computation can
  be repeated.

:class:`IProblem` Specifies :meth:`IProblem.initial_guess()` which
   implicitly defines the state class and problem size.  A problem
   must implement the :class:`IProblem` interface and specify at least
   one of the functions `G(x)` (to be set to zero) or `F(x)` (to find
   a fixed-point) by defining :meth:`IProblem.step`

   - `state0 =` :meth:`IProblem.get_initial_state()`:
      This provides the initial guess `x0` which must implement the
      :class:`IState` interface.  This also defining the size of the
      problem so the solver can allocate resources etc.
   - `(x1, G1,  dx1, F1) = `:meth:`IProblem.step()`:
     Returns the new step `x1`, the objective function `G1 = G(x1)`,
     the suggested step `dx1`, and the fixed-point iteration
     `F1=F(x1)` based on the step `x` suggested by the solver.  If the
     problem compute the Newton step ``dx1 = -np.dot(j_inv, G1)`` (as
     it will indicate by displaying :attr:`IProblem.newton` = `True`),
     then this should also be returned, otherwise, one should set
     ``dx1 = -G(x)`` as a default step.

     The new step `x1` need not be either of the natural steps `x0 +
     dx` or `F1` allowing the problem more control, to enforce for
     example constraints or prevent the solver from wandering into
     unphysical regions of solution space.
        
:class:`ISolver`
   Provides a means for solving a problem.  Once initialized, this
   provides a method to transform an initial guess (provided either by
   a Problem or by another State) into a new State.
  

.. todo:: Add and test support for a "global" dictionary  used to allow the
   solver and problem to maintain state during the solution of a
   problem.  Strictly, the calculation should not depend on this
   dictionary, but it can be used to store useful diagnostic
   information about the progress of the solution.  Also,
   the dictionary can be used to maintain flags that control the
   iteration, thus allowing the user to alter or affect the iteration
   (from another process or thread).  This use of a global dictionary
   has not been fleshed out yet but the idea is to allow one to use
   the :mod:`mmf.async` module for remote control.

To facilitate usage, partial implementations are provided:

.. rubric:: States

.. autosummary::

   BareState
   State
   State_x
      
.. rubric:: Problems

.. autosummary::

   Problem
   PlotProblem
   Problem_x
 
.. rubric:: Solvers

.. autosummary::

   Solver

As an example and to get started, look at the code for :class:`Scale`
and :class:`State2`.

Tolerances
----------
The general plan for determining if a parameter $x$ is close enough to
$x_0$ is to consider the condition `err <= 1` where::

   abs_err = abs(x - x_0)
   rel_err = abs_err/(abs(x_0) + _TINY)
   err = min(abs_err/(abs_tol + _TINY), rel_err/(rel_tol + _TINY))

This is `True` if either the absolute error is less than `abs_tol` or
the relative error is less than `rel_tol`.  This is what we mean below
by $\abs{x - x_0} < \epsilon$.

The real questions are:

1) What to use for $x$ and $x_0$?

   There are two forms of solver here: Root finding $G(x) = 0$ and
   fixed-point finding $x = F(x)$.  In general these are related by $G(x)
   = x - F(x)$ but some problems may define both.  There is no
   reference scale for $G(x)$ as it should be zero, so only the
   absolute tolerance :attr:`ISolver.G_tol` makes sense here.

   It would be ideal if we could also provide a tolerance on the step
   $\abs{x-x_0} < \epsilon_x$, but this is only easy to implement with
   bracketing algorithms.  Instead, we may provide a heuristic based
   on the Jacobian, but in general this is difficult to trust if the
   Jacobian is not exact.

2) What to do when there is a vector of parameters?

   The actual tolerance criterion used is:

   .. math::
      \Norm{\frac{\vect{G}(x)}{\vect{G}_{\text{typ}}}}_{p} \leq \epsilon_{G}

   where the user can specify both the vector of typical scales
   $\vect{G}_{\text{typ}}$ through :attr:`IProblem.G_scale` and the
   type of norm can be specified through :attr:`ISolver.norm_p`.

Examples
--------
Another way to use this is when representing functions in a discrete
basis.  In this case, one parametrizes the number of points used, and
would like to be able to continue an iterative solution while
increasing the resolution.  Here is how it is done.

We will use as an example, the solution of the differential equation
:math:`f''(x) = f(x)` with boundary conditions
:math:`f(-1)=f(1)=\cosh(1)`.

"""

from __future__ import division
from __future__ import with_statement
__docformat__ = "restructuredtext en"

import sys
import time
from copy import copy, deepcopy
import UserDict
import warnings
try:
    from warnings import catch_warnings # pylint: disable-msg=E0611
except ImportError:
    from contextlib import contextmanager
    @contextmanager
    def catch_warnings(record=False):
        warning_list = []
        def catch_showwarning(message, category, filename, lineno,
                              file=None, line=None):
            warning_list.append(
                warnings.formatwarning(message, category, filename,
                                       lineno))

        try:
            old_showwarning = warnings.showwarning
            warnings.showwarning = catch_showwarning
            yield warning_list
        finally:
            warnings.showwarning = old_showwarning
        
import numpy as np

from zope.interface import Interface, Attribute, implements
from mmf.objects import StateVars, Container, process_vars
from mmf.objects import ClassVar, Computed, Required, Excluded
from mmf.objects import Internal, Deleted
import mmf.archive
import mmf.utils

# Module defaults
__all__ = ['IterationError',
           'IProblem', 'IPlotProblem', 'IState', 'ISolver',
           'ISolverData', 'IJinv',
           'Solver', 'SolverData', 'Problem', 'PlotProblem', 'Problem_x',
           'BareState', 'State', 'State_x', 'IMesg', 'Mesg',
           'StatusMesg',
           'solve', 'fixedpoint']
_FINFO = np.finfo(float)
_TINY = _FINFO.tiny                     # pylint: disable-msg=E1101
_ABS_TOL = 1e-12
_REL_TOL = 1e-12
_ABS_XTOL = 1e-12
_REL_XTOL = 1e-12
_MESSAGE_INDENT = 2
_VERBOSITY = 0
_INDENT_AMOUNT = 4
_DEBUG = False

[docs]class IMesg(Interface): r"""Interface to a messaging object.""" messages = Attribute("List of `(mesg, level)` pairs.") show = Attribute(""":class`Container` of booleans controlling which messages should be displayed.""") store = Attribute(""":class`Container` of booleans controlling which messages should be stored.""")
[docs] def new(self): """Return a new IMesg instance for displaying sub messages."""
[docs] def msg(self, msg, type): """Add `msg` to the list of messages with specified `type` and display if `type` is `True` in `show`. Parameters ---------- msg : str Message text. type : str, optional Message type. """
[docs] def error(self, msg): """Add an error message."""
[docs] def warn(self, msg): """Add a warning message."""
[docs] def status(self, msg): """Display a status message."""
[docs] def info(self, msg): """Add an information message"""
[docs] def iter(self, msg): """Display an iteration message."""
[docs] def delete(self): """Delete the messenger."""
[docs] def delete_sub(self): """Clear all message sub queues."""
[docs] def clear(self): """Clear bar and all message subqueues."""
[docs]class Mesg(StateVars): """Print the displayed messages immediately. This is the default implementation. Examples -------- >>> mesgs = Mesg() >>> mesgs.error('An error occurred!') -Error: An error occurred! >>> mesgs.status('You are here.') -Status: You are here. Info and Warnings are not displayed by default, but the are stored >>> mesgs.info('3+3 = 6') >>> mesgs.warn('Hang on!') >>> mesgs.show Container(info=False, status=True, warn=False, error=True, iter=False) >>> mesgs.store Container(info=True, status=True, warn=True, error=True, iter=False) Let's turn on warnings: >>> mesgs.show.warn = True >>> mesgs.warn('Hang on again!') -Warning: Hang on again! and also turn on iteration messages. These are still not stored: >>> mesgs.show.iter = True >>> for n in xrange(3): ... mesgs.iter('%i of 3' % n) -Iter: 0 of 3 -Iter: 1 of 3 -Iter: 2 of 3 Here are all of the messages so far. Note that the both messages were stored >>> mesgs.messages # doctest: +NORMALIZE_WHITESPACE [('-Error: An error occurred!', 'error'), ('-Status: You are here.', 'status'), ('-Info: 3+3 = 6', 'info'), ('-Warning: Hang on!', 'warn'), ('-Warning: Hang on again!', 'warn')] """ implements(IMesg) _state_vars = [ ('show', Container(error=True, warn=False, status=True, iter=False, info=False), """The `True` entries should be displayed."""), ('store', Container(error=True, warn=True, status=True, iter=False, info=True), """The `True` entries should be stored in :attr:`messages`"""), ('messages', [], "List of stored messages")] process_vars() def new(self): return self.__class__(show=self.show, store=self.store) def msg(self, msg, level='info'): if level not in self.store: raise ValueError( "Message type '%s' not recognized" % (level, )) if getattr(self.store, level, False): self.messages.append((msg, level)) if getattr(self.show, level, False): print(msg) def error(self, msg): """Add an error message with a default level of `0`.""" self.msg("-Error: %s" % (msg,), level='error') def warn(self, msg): """Add a warning message with a default level of `10`.""" self.msg("-Warning: %s" % (msg,), level='warn') def status(self, msg): """Display a status message with a default level of `-1.""" self.msg("-Status: %s" % (msg,), level='status') def info(self, msg): """Display a message about the state of an iteration etc. but do not log.""" self.msg("-Info: %s" % (msg,), level='info') def iter(self, msg): """Display a message about the state of an iteration etc. but do not log.""" self.msg("-Iter: %s" % (msg,), level='iter') def _inactive(self, *v, **kw): raise ValueError("Messenger is not active.") def delete(self): self.__dict__['msg'] = self._inactive def delete_sub(self): """Clear all message sub queues.""" def clear(self): """Clear all message subqueues.""" pass
_MESGS = Mesg() # Default object. Should generally not be used.
[docs]class StatusMesg(Mesg): """This class acts like a message lists, but prints the current message in a status-bar without using up screen space. Use an instance as the default argument for `mesgs`. """ """ Examples -------- >>> mesgs = StatusMesg() >>> mesgs.error('An error occurred!') -Error: An error occurred! <BLANKLINE> >>> mesgs.status('You are here.') \x1b[A-Status: You are here. <BLANKLINE> Iterations are displayed in a status bar: >>> mesgs.show.iter = True >>> for n in xrange(3): ... mesgs.iter('%i of 3' % n) -Iter: 0 of 3 \x1b[A\r\x1b[K-Iter: 1 of 3 \x1b[A\r\x1b[K-Iter: 2 of 3 Note that anything that is stored but not displayed also appears in the status bar as an iteration message. >>> mesgs.info('3+3 = 6') \x1b[A\r\x1b[K-Info: 3+3 = 6 Regular messages will appear and remain, but unless you clear the status bar, the status message will still be displayed (in this case the last info statement). >>> mesgs.error('Still showing status bar') \x1b[A\r\x1b[K\x1b[A-Error: Still showing status bar <BLANKLINE> -Info: 3+3 = 6 Here we clear it. >>> mesgs.clear() \x1b[A\r\x1b[K >>> mesgs.error('Status bar gone.') \x1b[A\r\x1b[K\x1b[A-Error: Status bar gone. Here are the stored messages: >>> mesgs.messages # doctest: +NORMALIZE_WHITESPACE [('-Error: An error occurred!', 'error'), ('-Status: You are here.', 'status'), ('-Info: 3+3 = 6', 'info'), ('-Error: Still showing status bar', 'error'), ('-Error: Status bar gone.', 'error')] """ _state_vars = [ ('keep_last', True, r"""If `True`, then the last iteration message will remain on the display, otherwise it will be deleted."""), ('_sb', Computed, r"""Reference to the StatusBar instance. When all of these are deleted, then the output will revert to normal behaviour"""), ('_bar', Computed, ":class:`mmf.utils.StatusBar` instance."), ('_bars', Computed, "List of all subbars."), ] process_vars() def __init__(self, *v, **kw): Mesg.__init__(self, *v, **kw) self._sb = mmf.utils.StatusBar() self._bar = self._sb.new_bar(keep_last=self.keep_last) self._bars = [] def new(self): new = self.__class__(show=self.show, store=self.store, keep_last=self.keep_last) self._bars.append(new._bar) return new def msg(self, msg, level='info'): """This prints stored messages and adds displayed messages to the status bar.""" if level not in self.store: raise ValueError( "Message type '%s' not recognized" % (level, )) if getattr(self.store, level, False): self.messages.append((msg, level)) if getattr(self.show, level, False): self._bar.print_(msg) else: self._bar.msg(msg) elif getattr(self.show, level, False): self._bar.msg(msg) def delete(self): """Delete the status bar. It cannot be used after this""" self._bar.delete() self.clear() def delete_sub(self): """Clear all message sub queues.""" while self._bars: b = self._bars.pop() b.clear() b.delete() def clear(self): """Clear all message queues, including self.""" self.delete_sub() self._bar.clear() def __del__(self): del self._sb
[docs]class IterationError(Exception): """Iteration has failed for some reason."""
[docs]class IState(Interface): r"""Interface for the state of a solution. The state keeps track of: - The current set of parameters x. - The history of the state. One can perform arbitrary manipulations of the state by housing these in the appropriate "Solver" object which perform these manipulations. Usage:: state = State(x_=...) x = state.x_ x1 = F(x) state.x_=x1 # Without using a solver """ x_ = Attribute( r"""This is a flat representation of the state of the object. It should be implemented as a property with get and set functionality. Setting :attr:`x_` should clear the history and initialize it with an archived version of :attr:`x_`. We use the name :attr:`x_` to avoid clashes with meaningful parameters which might be called `x`.""") sensitive = Attribute( r"""List of indices of sensitive parameters. These are parameters that must be adjusted slowly or only once a degree of convergence has been achieved. For example, the chemical potentials in a grand canonical ensemble. Solvers may treat these specially.""") solver_data = Attribute( r"""Solver specific data representing the state of the solver (for example, gradient info). This may be used by subsequent solvers if they are related, but is typically ignored.""") converged = Attribute( r"""Boolean like object signifying if state is from a converged calculation.""") messages = Attribute( r"""Messages accumulated during the iteration, such as reasons for termination.""") store_history = Attribute( r"""If `True`, then a record of the solver history will be saved. This can be somewhat slow and is designed for large scale solver applications where the number of iterations is small, but the cost of each iteration is large. When using the solver in inner loops, this should be set to `False` to speed the evaluation.""") history = Attribute( r"""History of the state as a sequence of strings to be executed in order to reproduce the current state. `history[0]`: should be an archive that can be evaluated to define the variable 'state' (which is the initial state). This is either the result of a :meth:`IProblem.get_initial_state()` or a direct assignment to :attr:`x_`. `history[1:]`: should be a series of ``(solver, problem, iterations)`` tuples that specify which solver and problems were used (and how many iterations) to obtain the n'th state from the previous state. By specifying the number of iterations, one allows for calculations that were interrupted before they reached convergence. (The actual format is not a requirement for the interface, only that the function :meth:`IState.get_history()` can return the appropriate archived string.) The references to solvers and problems should be kept in a careful manner: if the solver or problem instances are modified, then somehow these objects should be archived in the state prior to the modification so that the representation is faithful. However, archiving each copy could lead to a history bloat. This represents a bit of a design problem.""")
[docs] def record_history(self, problem, solver=None, iterations=None): """Add problem, solver and iterations to :attr:`history`. If `solver` and `iterations` are `None`, then record a copy by `problem`. This information should be used by :meth:`history` to reproduce the state. Should respect :attr:`store_history`. """ raise NotImplementedError
[docs] def get_history(self): """Return a string that can be evaluated to define the state:: env = {} exec(self.get_history(), env) state = env['state'] """ raise NotImplementedError
[docs] def solver_set_x_(self, x_): """This should be called by the solver during iterations in order to set `x` without resetting the history. As such it should not directly set `self.x_` which would trigger reset of the history. Should respect :attr:`store_history`. """ raise NotImplementedError
[docs] def copy_from(self, *v, **kwargs): r"""Return a copy:: state.copy_from(a=..., b=...) state.copy_from(state, a=..., b=...) This allows one to convert from two slightly incompatible forms. See :class:`State_x` for an example. This should be a deep copy of the object: no data should be held in common. Parameters ---------- *v : IState , optional State to copy. If not provided, then uses `state=self`. **kwargs : dict, optional Any additional parameters should be used to initialize those values. """ # The following is a skeleton: if 0 == len(v): state = self elif 1 == len(v): state = v[0] else: raise ValueError("copy_from accepts only one positional "+ "arg: `state`") return self.__class__(deepcopy(state), **kwargs)
[docs] def copy_from_x_(self, x_): """Return a copy of `self` with a newly initialized `x`. """
[docs]class IProblem(Interface): r"""Interface for an iterative problem: the solver's goal is to find a solution to the problem ``G(x) = 0``. The function :meth:`G()`, specified through :meth:`step()`, must accept a real vector `x` of the same dimension as returned by :meth:`get_initial_state()` and return the same. If possible, the problem should be organized so that it converges on its own. The solvers will try to accelerate this convergence. """ globals = Attribute(r""""Global" dictionary for storing intermediate results. These must not be used for modifying the calculation, but provide access to mid-iteration calculational details for user feedback.""") state0 = Attribute(r"""Dummy instance of the state class used to encapsulate the solution.""") has_dx = Attribute(r"""`bool`. If `True`, then the problem can provide a downhill step. This can be computed by passing the `compute_dx=True` flag to :meth:`step`.""") has_F = Attribute(r"""`bool`. If `True`, then the problem can compute a reasonable fixed-point iteration `F`. This can be computed by passing the `compute_F=True` flag to :meth:`step`.""") has_G = Attribute(r"""`bool`. If `True`, then the problem can compute an objective function `G` that will be used to determine convergence. This can be computed by passing the `compute_G=True` flag to :meth:`step`.""") G_scale = Attribute(r"""Typical scale for `G(x)`. The scaled vector `G/G_scale` is used for convergence testing and in some solvers to scale the approximate Jacobian.""") x_scale = Attribute(r"""Typical scale for `x`. The scaled vector `x/x_scale` is used for convergence testing and in some solvers to scale the approximate Jacobian.""") j0_diag = Attribute(r"""Some solvers assume that the approximate Jacobian $\partial\vect{G}/\partial\vect{x} = \diag(j_0)$. The default implementations will approximate this as `j0_diag = G_scale/x_scale` but if you know that the function has a drastically different behaviour, then you should override this.""") active = Attribute( r"""This should be a boolean array of indices such that `x_[active]` returns an array of all the active parameters. Note that if `active` is `()`, then all the parameters are active. In this way, only a subset of the parameters may be worked on, but other fixed parameters are still maintained.""")
[docs] def get_initial_state(self): """Return a state implementing :class:`IState` representing the initial state for the problem."""
[docs] def solver_update(self, state, p_dict): """This is called by the solver to implement the schedule (see :attr:`ISolver.schedule`). It should update the problem and `state` according to the parameters in `p_dict`, returning the updated state. It should be careful not to obliterate the state history. Parameters ---------- state : :interface:`IState` This is the current state object. p_dict : dict Dictionary of updated parameters. If the problem is an instance of :class:`StateVars`, then this could be passed to :meth:`StateVars.initialize`. """
[docs] def step(self, x, x0=None, G0=None, dx0=None, F0=None, compute_G=False, compute_dx=False, compute_F=False, extrapolate=None, iter_data=None, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): r"""Return `(x1, G1, dx1, F1)` where `x1` is the new solution estimate. Parameters ---------- x : array Step suggested by the solver. Real 1-d array of the same length as returned by :meth:`__len__` x0 : array Previous step and the point at which `G0`, `dx0`, and `F0` are evaluated. If this is `None`, then this is the first step of a new solver cycle. G0 : array Objective function `G0(x0)` evaluated at `x0`. dx0 : array Suggested step from `x0` based on `G0`. F0 : array Fixed-point iteration `F(x0)` evaluated at `x0`. compute_G, compute_dx, compute_F : bool These flags are set depending on what is needed. A problem could always compute `G1`, `dx1`, and `F1` to be safe, but if these are expensive, then this allows :meth:`step` to compute only the required values. extrapolate : function The extrapolator is a function ``x_new = extrapolate(x_controls)`` that returns the solver's extrapolated step based on the specified control parameters. `x_controls` should be a pair `(inds, vals)` of indices into the array `x` and values such that the step should preserve ``x[ind] = c*vals + (1-c)*x0[ins]`` for some ``0 < c =< 1``. (I.e. this is a convex extrapolation from `x0` to `x`). .. note:: Extrapolation typically requires some linear algebra (such as solving a linear system) on the submatrix of the Jacobian in the basis spanned by the elements in `x_controls`. If the problem is large, this may be prohibitive unless the number of entries in `x_controls` is sufficiently small. If the solver contains information about the function `G` such as gradient information, then the extrapolated state `x_new` will incorporate this information extrapolating the step in such a way as to satisfy the controls. iter_data : object This is the object returned by :meth:`iter_data`. It is passed in here so that properties of the current state that depend on the full iteration can be included here. Note, this function may be called several times during a single iteration depending on the solver, whereas :meth:`iter_data` is only called once. abs_tol, rel_tol : float If possible, `G(x)` should be calculated to the specified absolute and relative tolerances. This feature is used by some solvers to allow faster computations when far from the solution. mesgs : IMesgs If there are any problems, then an exception should be raised ans/or a message sent through this. Returns ------- x1 : array (real and same shape as `x`). This is the new solution step. It need not be either of the natural steps `x0 + dx0` or `F0`, allowing the problem more control, for example, to enforce constraints or prevent the solver from wandering into unphysical regions of solution space. G1 : array (real and same shape as `x`) or None Objective function `G(x1)` evaluated at the suggested point `x1`. Compute if :attr:`has_G` and `compute_G` are both `True`. This objective function is generally used for convergence checking. dx1 : array (real and same shape as `x`) or None Suggested step from `x1` based on the value of `G1`. Compute if :attr:`has_dx` and `compute_dx` are both `True`. If possible, this should be a downhill direction such as the full Newton step ``dx = -j_inv*G(x1)``. F1 : array (real and same shape as `x`) or None Fixed-point function `F(x1)` evaluated at the suggested point `x1`. Compute if :attr:`has_F` and `compute_F` are both `True`. This is used as both a step $x \mapsto F(x)$ and for convergence $\norm{x - F(x)}$ testing when no other objective function is provided. Notes ----- This provides a mechanism for the problem to control the iterations, for example, enforcing constraints. A typical algorithm would proceed as follows: 1) Inspect `x`. If any small number of parameters needs to be adjusted to satisfy constraints etc. then these should be specified in a dictionary `x_controls`. 2) Call `x_new = extrapolate(x_controls)` to get the solvers suggestion including the controls. 3) Adjust `x_new` if needed. 4) Compute `G1=`, `dx1`, and `F1`. 5) Return `(x_new, G1, dx1, F1)` Here is some pseudo-code:: x1 = x if not okay(x1): <Determine where we need x[i_n] = v_n> x_controls = ([i_1, i_2, ...], [v_1, v_2, ...]) x1 = extrapolate(x_controls) if not okay(x1): <Fix x1 to make it okay> x1 = ... G1 = dx1 = F1 = None if self.has_G and compute_G: G1 = ... if self.has_dx and compute_dx: dx1 = ... if self.has_F and compute_F: F1 = ... return (x1, G1, dx1, F1) """
[docs] def iter_data(self, x, x0, G0, dx0, F0, solver, data=None): r"""Return data from the iteration for plotting etc. This optional method is called by the solver after each iteration with the current solution `x`. The object it returns will be passed to :meth:`plot_iter` as the keyword argument `data`. This allows the problem to calculate and accumulate results for plotting or for recording the evolution/convergence history. This can be retrieved from `state.solver_data.iter_data`. .. note:: The problem can set `state.solver_data.iter_data` directly if it likes during the iteration, but since solvers might perform processing, one cannot be guaranteed that this will be in sync with `x` etc. One may do this for speed if needed though. Parameters ---------- x : array Current guess. x0 : array, None Previous guess. G0 : array, None `G0 = G(x0)`. dx0 : array, None Previous recommended step. If :attr:`newton`, then ``dx = -np.dot(j_inv, G0)`` is the full Newton step. F0 : array, None `F0 = F(x0)`. solver : ISolver Current solver. This can be queried for convergence requirements etc. data : object or None Previous data object from last iteration. The first iteration this is `None`. If you modify the `iter_data` object in :meth:`G` or :meth:`step`, then this modified object will be passed in here, so be sure to include this information in the return value. Typically one just mutates `data` and returns it. """
def __len__(self): """Return the length of the solution vector."""
[docs] def pre_solve_hook(self, state): r"""If this is defined, it will be called by the solver before solving."""
[docs]class IPlotProblem(IProblem): r"""Generalization of :class:`IProblem` that provide methods for plotting results during the iterations."""
[docs] def plot_iteration(self, data): r"""Plot the current iteration. Parameters ---------- data : object This is the object returned by :meth:`iter_data`. """
[docs]class IJinv(Interface): r"""Interface of a matrix-like object that represents the inverse Jacobian. """ x = Attribute("""Current point. Should be read only. (Use :meth:`set_point` to change.)""") g = Attribute("""Objective at the current point. Should be read only. (Use :meth:`set_point` to change.)""") x0 = Attribute("""Point where the Jacobian is valid. Should be read only. (Use :meth:`update_point` to change.)""") g0 = Attribute("""Objective at :attr:`x0`. Should be read only. (Use :meth:`update_point` to change.)""") def __mul__(self, dg): r"""Return the matrix multiplication `j_inv*dg`.""" def __rmul__(self, dxt): r"""Return the matrix multiplication `dxt*j_inv`."""
[docs] def update(self, x0=None, g0=None): r"""Update the inverse Jacobian so that it is valid at the point `(x0, g0)` (use :attr:`x` and :attr:`g` by default). Does not necessarily add `x0` and `g0` (use :meth:`add_point`) or set the current point (use :meth:`set_point`) but may do either or both depending on the algorithm. """
[docs] def add_point(self, x, g): r"""Add the point `(x, g)` to the Jacobian information. Does not necessarily update the Jacobian (use :meth:`update`) but may. Does not change the current point (use :meth:`set_point`)."""
[docs] def set_point(self, x, g): r"""Add `x` and `g` and make this the current point `(x, g)` and update the Jacobian to be valid here. Return a measure of the incompatibility of the new data with the current approximation. This need not "add" the point to the current data set (the functionality provided by :meth:`add_point`). """
[docs] def incompatibility(self, x, g, x0, g0): r"""Return a dimensionless measure of the compatibility of a step from `x0` to `x` with the current Jacobian. An incompatibility of 0 means perfectly compatible (this is what should happen if the function is linear and the Jacobian is correct along the specified step). """
[docs] def reset(self): r"""Reset the Jacobian to a diagonal form."""
[docs] def get_x(self, x_controls=None): r"""Return `x`: the Broyden step to get to the root of `G(x)`. This is the downhill Newton step from the point specified by (:attr:`x0`, :attr:`g0`) that satisfies the constraints `x_controls`. Parameters ---------- x_controls : (array, array), optional Tuple `(inds, vals)` indices and the respective values of the control parameters. """
[docs]class ISolverData(Interface): """Interface for solver data objects.""" iter_data = Attribute("""This is the object returned by :meth:`IProblem.iter_data`.""") iteration_number = Attribute( "This is the current iteration number.")
[docs] def new(self, solver_data=None): r"""Return a new instance of the appropriate `solver_data`. We allow the possibility of copying data here. Parameters ---------- solver_data : ISolverData Previous solver data. Provided if the solution was computed from a previous solver. If the solver was compatible data should be used, otherwise a new instance . """
[docs] def change_basis(self, f, f_inv_t=None): """Use the function `x_new = f(x)` to update the data corresponding to a change of basis. Raise an :except:`AttributeError` if the change cannot be implemented (or don't define this method.) Parameters ---------- f : function Change of basis transformation (should be essentially matrix multiplication, but for large bases one might want to prevent forming this matrix explicitly.) f_inv_t : function Transpose of the inverse of the function. If this is provided, then it should be used to transform the bra's, otherwise, use `f`. """
[docs]class ISolver(Interface): """Interface for solver objects.""" G_tol = Attribute("""Tolerance for each component of `G(x)/G_scale`. This may be a vector or a scalar. See also attr:`IProblem.G_scale`. Convergence is assumed if all components satisfy both this and (:attr:`x_tol` or :attr:`x_tol_rel`).""") x_tol = Attribute("""Tolerance for each component of `x/x_scale`. This may be a vector or a scalar. See also attr:`IProblem.x_scale`. Convergence is assumed if all components satisfy both (:attr:`x_tol` or :attr:`x_tol_rel`) and :attr:`G_tol`.""") x_tol_rel = Attribute("""Relative tolerance for each component of `x`. This may be a vector or a scalar. Convergence is assumed if all components satisfy both (:attr:`x_tol` or :attr:`x_tol_rel`) and :attr:`G_tol`.""") norm_p = Attribute("""Norm to use for tolerances $\norm{\cdot}_{p}$. Passed to the `ord` parameter of :func:`numpy.linalg.norm`.""") last_state = Attribute("""The last good state. This may be used after an exception is raised to obtain the state that caused the exception or the last good state before the exception occured (in the case of an external interrupt for example. To resume the computation, one should be able to call :meth:`ISolver.solve()` with this state as an input (i.e. this state should be complete with the history set etc. To ensure this, use :meth:`IState.solver_set_x_()`). """) schedule = Attribute("""List of pairs of dictionaries `(p_dict, s_dict)` determining the schedule. The solver the solution will be run through the standard solver with the solver and state updated with these arguments at each stage of the schedule. The problem will be updated through the :meth:`IProblem.solver_update` to allow the state to be properly updated and to have its history modified. For example, one might like to start solving with low-resolution and low accuracy, slowly increasing these for example.""") mesgs = Attribute("""Object implementing the :interface:`IMesgs` interface. This is used for communicating messages with the user.""") def __init__(self, state0): """Must provide copy-construction facilities from another object `state0`."""
[docs] def solve(self, problem, state, iterations=None): """Return `(new_state, niter)` where `new_state` is the new state representing the solution to the specified problem starting from the specified state. Requirements: 1) If `iterations` is specified, exactly this number of steps should be executed. 2) Convergence should check at least the :attr:`G_tol` and (:attr:`x_tol` or :attr:`x_tol_rel`) criteria using :attr:IProblem.G_scale` and :attr:`IProblem.x_scale`. Convergence should occur only if both of these are satisfied (although other criteria may also be implemented.) 3) The new state should have `state.solver_set_x_()` called to set the solution and set `state.solver_data`. 4) The argument `state` is not to be mutated unless expressly advertised, so generally a copy should be made. 5) :meth:`IProblem.iter_data` must be called each iteration. 6) :meth:`IProblem.step` must be used to query the function values (check the flags :attr:`IProblem.has_F`, :attr:`IProblem.has_G`, and :attr:`IProblem.has_dx` for capabilities). 7) The schedule :attr:`schedule` should be followed. Parameters ---------- problem : IProblem Instance of the problem to solve. state : IState Initial state. (Usually obtained by calling `problem.get_initial_state()`). iterations : int Number of iterations to perform. This is intended to be used to reproduce a state from a history by executing an exact number of iterations. See Also -------- :meth:`Solver.solve` : This is the reference implementation. Please follow this example if you are going to implement this method to make sure that a step is not forgotten. """
[docs] def print_iteration_info(self, iteration_number, state, start_time, mesg=None): """Print iteration information to :interface:`IMesg` object `mesg` (if not `None`)."""
class IConverged(Interface): """Boolean like object that represents convergence information.""" def __non_zero__(self): """Return True if converged, otherwise False.""" raise NotImplementedError ############################################################ # Specializations and partial implementations
[docs]class Problem(StateVars): """Partial implementation of :class:`IProblem` interface. Only :attr:`state0` and one of :meth:`G` or :meth:`F` need2 to be defined (though both can be). """ implements(IProblem) _state_vars = [ ('state0', Required, """Instance of the state class used to encapsulate the solution. If :meth:`get_initial_state()` is not overloaded, then it returns a copy of this. .. note:: If calculating a valid/reasonable initial state is costly, then this should be done in a customized :meth:`get_initial_state` so that initialization is not slowed down."""), ('globals', {}, r""""Global" dictionary for storing intermediate results. These must not be used for modifying the calculation, but provide access to mid-iteration calculational details for user feedback."""), ('has_G', ClassVar(False), r"""If the problem has an objective function, then set this to `True` and return this from :meth:`G` or :meth:`step`"""), ('has_dx', ClassVar(False), r"""If the problem can provide a downhill step, then set this to `True` and return this from :meth:`G` or :meth:`step`"""), ('has_F', ClassVar(False), r"""If the problem has a fixed-point iteration, then set this to `True` and return this from :meth:`F` or :meth:`step`"""), ('x_scale', 1, r"""Typical scale for `x. The scaled vector `x/x_scale` is used for convergence testing. """), ('G_scale', ClassVar(1), r"""Typical scale for `G(x). The scaled vector `G/G_scale` is used for convergence testing. Ths problem must compute this based on some user-friendly way of specifying the scales. .. note:: See :attr:`G_scale_parameters` for a user friendly way of setting this that will work if the state is an instance of :class:`State`. To use this, redefine this as a :class:`Computed` attribute and call :meth:`define_G_scale` at the end of the constructor (after :attr:`state0` has been initialized). """), ('G_scale_parameters', {}, r"""Dictionary of parameter names with associated `G_scales`. """), ('active', ClassVar(()), r"""This should be a boolean array of indices such that `x_[active]` returns an array of all the active parameters. The problem should provide some user-friendly way of specifying which parameters are active and them populate this. .. note:: The default version just sets this to `()`. See :attr:`active_parameters` and :attr:`inactive_parameters` for defaults that will work if the state is an instance of :class:`State`. In this case, redefine this as a :class:`Computed` attribute and call :meth:`define_active` at the end of the constructor (after :attr:`state0` has been initialized) to define this. """), ('active_parameters', set(), r"""This should be a set of the names of all the active parameters. It is used to define :attr:`active`. This will be updated by :meth:`__init__` to include all active parameters (any inactive parameters will be subtracted). See :attr:`active`. """), ('inactive_parameters', set(), r"""This should be a set of the names of all the inactive parameters (those held fixed). It is used to define :attr:`active`. This will modify :attr:`active_parameters`. See :attr:`active`. """), ] process_vars() ########################################################## # Required methods: At least one of G or F needs to be defined by # the subclass.
[docs] def G(self, x, iter_data=None, compute_dx=False, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): r"""Return `(G, dx)` where `G=G(x)` and `x + dx` is the recommended step. The solver's goal is to find a solution ``G(x) == 0``. Parameters ---------- x : array Real 1-d array of the same length as returned by :meth:`__len__` iter_data : object This is the object returned by :meth:`iter_data`. It is passed in here so that properties of the current state that depend on the full iteration can be included here. Note, this function may be called several times during a single iteration depending on the solver, whereas :meth:`iter_data` is only called once. The solvers will always pass this, but a default should be provided so users can easily call it. compute_dx : bool If :attr:`newton` is `True` and this is `True`, then the method must return a tuple `(G_x, dx)` where ``dx = -j_inv*G_x`` is the full Newton step. abs_tol, rel_tol : float If possible, `G(x)` should be calculated to the specified absolute and relative tolerances. This feature is used by some solvers to allow faster computations when far from the solution but may be ignored. mesgs : IMesgs If there are any problems, then an exception should be raised or a message notified through this. Returns ------- G : array Result of ``G = G(x)`` (must be a real 1-d array of the same length as `x`). dx : array Suggested step based on `G`. If :attr:`newton`, then this should be the full Newton step ``dx = -j_inv*G(x)``. If no reasonable step is known, then this can be `None`. Raises ------ NotImplementedError Raised if the subclass does not define :meth:`F`. Notes ----- This default version returns `G(x) = x - F(x)` and `dx = F(x) - x`. This is based on the assumption that $x \mapsto F(x)$ is almost a convergent iteration. Despite providing both for both `G` and `dx`, this will not set the flags :attr:`has_G` or :attr:`has_dx` which must be set by the user if this default is okay. """ if self.__class__.F.im_func is Problem.F.im_func: raise NotImplementedError( "Class %s defined neither F nor G" % (self.__class__.__name__, )) else: F = self.F(x, iter_data=iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) G = x - F dx = None if compute_dx: # Although this is simple, it makes a copy which could # be expensive if x is large. dx = -G return G, dx
[docs] def F(self, x, iter_data=None, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): r"""Return `F = F(x)`. The solver's goal is to find a solution ``F(x) == x``. Some solvers assume that the iteration $x \mapsto F(x)$ is almost convergent. Parameters ---------- x : array Real 1-d array of the same length as returned by :meth:`__len__` iter_data : object See :meth:`G`. abs_tol, rel_tol : float See :meth:`G` mesgs : IMesgs See :meth:`G`. Returns ------- F : array Result of ``F = F(x)`` (must be a real 1-d array of the same length as `x`). Raises ------ NotImplementedError Raised if the subclass does not define :meth:`G`. Notes ----- This default version returns `F(x) = x + dx` if :meth:`G` returns `dx`, otherwise it returns `F(x) = x - G(x)`. Despite providing for both `F`, this will not set the flags :attr:`has_F` which must be set by the user if this default is okay. """ if self.__class__.G.im_func is Problem.G.im_func: raise NotImplementedError( "Class %s defined neither F nor G" % (self.__class__.__name__, )) else: G, dx = self.G(x, iter_data=iter_data, compute_dx=True, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) if dx is None: F = x - G else: F = x + dx return F ########################################################## #@property #def state0(self): # """Return the State class by extracting it from # get_initial_state() # """ # return self.get_initial_state()
[docs] def get_initial_state(self): """Return a valid flat vector representing the initial state for the problem.""" return self.state0.copy_from()
[docs] def iter_data(self, x, x0, G0, dx0, F0, solver, data=None): r"""Return data from the iteration for plotting etc. This optional method is called by the solver after each iteration with the current solution `x`. The object it returns will be passed to :meth:`plot_iter` as the keyword argument `data`. This allows the problem to calculate and accumulate results for plotting or for recording the evolution/convergence history. This can be retrieved from `state.solver_data.iter_data`. Parameters ---------- x : array Current guess. x0 : array, None Previous guess. G0 : array or tuple or None `G0 = G(x0)` was the previous step. If :attr:`newton`, then this is the tuple `(G0, dx0)` where ``dx0 = -np.dot(j_inv, G0)`` is the full Newton step. solver : ISolver Current solver. This can be queried for convergence requirements etc. data : object or None Previous data object from last iteration. The first iteration this is `None`. """ pass
[docs] def step(self, x, x0=None, G0=None, dx0=None, F0=None, compute_G=False, compute_dx=False, compute_F=False, extrapolate=None, iter_data=None, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): r"""Return `(x1, G1, dx1, F1)` where `x1` is the new solution estimate. Parameters ---------- x : array Step suggested by the solver. Real 1-d array of the same length as returned by :meth:`__len__` x0 : array Previous step and the point at which `G0`, `dx0`, and `F0` are evaluated. If this is `None`, then this is the first step of a new solver cycle. G0 : array Objective function `G0(x0)` evaluated at `x0`. dx0 : array Suggested step from `x0` based on `G0`. F0 : array Fixed-point iteration `F(x0)` evaluated at `x0`. compute_G, compute_dx, compute_F : bool These flags are set depending on what is needed. A problem could always compute `G1`, `dx1`, and `F1` to be safe, but if these are expensive, then this allows :meth:`step` to compute only the required values. extrapolate : function The extrapolator is a function ``x_new = extrapolate(x_controls)`` that returns the solver's extrapolated step based on the specified control parameters. `x_controls` should be a pair `(inds, vals)` of indices into the array `x` and values such that the step should preserve ``x[ind] = c*vals + (1-c)*x0[ins]`` for some ``0 < c =< 1``. (I.e. this is a convex extrapolation from `x0` to `x`). .. note:: Extrapolation typically requires some linear algebra (such as solving a linear system) on the submatrix of the Jacobian in the basis spanned by the elements in `x_controls`. If the problem is large, this may be prohibitive unless the number of entries in `x_controls` is sufficiently small. If the solver contains information about the function `G` such as gradient information, then the extrapolated state `x_new` will incorporate this information extrapolating the step in such a way as to satisfy the controls. iter_data : object This is the object returned by :meth:`iter_data`. It is passed in here so that properties of the current state that depend on the full iteration can be included here. Note, this function may be called several times during a single iteration depending on the solver, whereas :meth:`iter_data` is only called once. abs_tol, rel_tol : float If possible, `G(x)` should be calculated to the specified absolute and relative tolerances. This feature is used by some solvers to allow faster computations when far from the solution. mesgs : IMesgs If there are any problems, then an exception should be raised or a message notified through this. Returns ------- x1 : array (real and same shape as `x`). This is the new solution step. It need not be either of the natural steps `x0 + dx0` or `F0`, allowing the problem more control, for example, to enforce constraints or prevent the solver from wandering into unphysical regions of solution space. G1 : array (real and same shape as `x`) or None Objective function `G(x1)` evaluated at the suggested point `x1`. Compute if :attr:`has_G` and `compute_G` are both `True`. This objective function is generally used for convergence checking. dx1 : array (real and same shape as `x`) or None Suggested step from `x1` based on the value of `G1`. Compute if :attr:`has_dx` and `compute_dx` are both `True`. If possible, this should be a downhill direction such as the full Newton step ``dx = -j_inv*G(x1)``. F1 : array (real and same shape as `x`) or None Fixed-point function `F(x1)` evaluated at the suggested point `x1`. Compute if :attr:`has_F` and `compute_F` are both `True`. This is used as both a step $x \mapsto F(x)$ and for convergence $\norm{x - F(x)}$ testing when no other objective function is provided. Notes ----- This provides a mechanism for the problem to control the iterations, for example, enforcing constraints. A typical algorithm would proceed as follows: 1) Inspect `x`. If any small number of parameters needs to be adjusted to satisfy constraints etc. then these should be specified in a dictionary `x_controls`. 2) Call `x_new = extrapolate(x_controls)` to get the solvers suggestion including the controls. 3) Adjust `x_new` if needed. 4) Compute `G1=`, `dx1`, and `F1`. 5) Return `(x_new, G1, dx1, F1)` Here is some pseudo-code:: x1 = x if not okay(x1): <Determine where we need x[i_n] = v_n> x_controls = ([i_1, i_2, ...], [v_1, v_2, ...]) x1 = extrapolate(x_controls) if not okay(x1): <Fix x1 to make it okay> x1 = ... G1 = dx1 = F1 = None if self.has_G and compute_G: G1 = ... if self.has_dx and compute_dx: dx1 = ... if self.has_F and compute_F: F1 = ... return (x1, G1, dx1, F1) """ x1 = x G1 = dx1 = F1 = None if ((self.has_G and compute_G) or (self.has_dx and compute_dx)): G1, dx1 = self.G(x, iter_data=iter_data, compute_dx=compute_dx, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) if self.has_F and compute_F: if compute_G and (self.__class__.F is Problem.F or self.__class__.G is Problem.G): # This is an optimization to prevent two calls to F or # G. It duplicates the code in the default F and G. if self.__class__.F is Problem.F and dx1 is not None: F1 = x1 + dx1 else: F1 = x1 - G1 else: F1 = self.F(x, iter_data=iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) return (x1, G1, dx1, F1)
[docs] def solver_update(self, state, p_dict): """This is called by the solver to implement the schedule (see :attr:`ISolver.schedule`). It should update the problem and `state` according to the parameters in `p_dict`, returning the updated state. It should be careful not to obliterate the state history. Parameters ---------- state : IState This is the current state object. p_dict : dict Dictionary of updated parameters. If the problem is an instance of :class:`StateVars`, then this could be passed to :meth:`StateVars.initialize`. """ self.initialize(**p_dict) return state
def __len__(self): """Return the length of the solution vector.""" return len(self.get_initial_state().x_) @classmethod def _post_hook_process_vars(cls, results, _G=G, _F=F): """We set default values of :attr:`has_G` and :attr:`has_F` based on whether or not the new class redefines :meth:`F` and :meth:`G`. Notes ----- Members of the class block scope are not accessible in contained blocks so we must pass the functions explicitly as defaults here. See Also -------- :func:`process_vars` """ if cls.F.im_func is not _F: results['has_F'] = True if cls.G.im_func is not _G: results['has_G'] = True
[docs] def define_active(self): r"""Define :attr:`active` from :attr:`active_parameters` and :attr:`inactive_parameters` using :meth:`State.indices` of :attr:`state0`. """ if 'active' in self._class_vars: raise AttributeError( "To use define_active() you must make 'active' Computed:\n" + " (Add ('active', Computed) to your %s's _state_vars list)" % (self.__class__.__name__,)) parameters = self.state0.parameters if not self.active_parameters: self.active_parameters = set(parameters) else: self.active_parameters.intersection_update(parameters) if (set(parameters) == self.active_parameters and not self.inactive_parameters): self.active = () else: self.active_parameters = (self.active_parameters - self.inactive_parameters) self.active = np.zeros(len(self.state0.x_), dtype=bool) for p in self.active_parameters: offset, n = self.state0.indices(p) self.active[offset:offset+n] = True
[docs] def define_G_scale(self): r"""Define :attr:`G_scale` from :attr:`G_scale_parameters` using :meth:`State.indices` of :attr:`state0`. """ if 'G_scale' in self._class_vars: raise AttributeError( "To use define_G_scale() you must make G_scale Computed:\n" + " (Add ('G_scale', Computed) to your %s's _state_vars list)" % (self.__class__.__name__,)) parameters = self.state0.parameters if not self.G_scale_parameters: self.G_scale = 1 else: self.G_scale = np.ones(self.state0.x_.shape, dtype=float) for p in parameters: offset, n = self.state0.indices(p) if p in self.G_scale_parameters: self.G_scale[offset:offset+n] = self.G_scale_parameters[p] else: self.G_scale[offset:offset+n] = 1
@property
[docs] def j0_diag(self): r"""Scale of diagonals of Jacobian `dG/dx`. This default is computed as a property `G_scale/x_scale`. """ return np.divide(self.G_scale, self.x_scale)
[docs]class PlotProblem(Problem): r"""Partial implementation of :class:`IPlotProblem` interface. As with :class:`Problem`, only :meth:`G` and :meth:`get_initial_state` need to be defined. """ implements(IPlotProblem) _state_vars = [ ('n_prev_plots', 5, \ """Number of previous plots to show (as light yellow)"""), ('_plot_key', Internal((111, )), \ """Current key formed from the args to subplot. Used to store previous plotted points."""), ('_plot_data', Internal({}), \ """Dictionary with lists of plot arguments for previous plot data."""), ('_prev_plots', Internal([]), \ """List of previous plots."""), ('_no_plot', Internal(False), """If `True`, then don't draw, just store data."""), ] process_vars() ########################################################## # Optional methods: These need to be defined by subclasses. def plot_iter(self, plt, data): r"""Called by :meth:`plot_iteration` to plot the current iteration. If the methods of `plt` are used to plot the results, then the results will be stored so that the previous :attr:`n_prev_plots` can also be plotted in light yellow for comparison. If this is not desired, then call :mod:`matplotlib` methods directly. Presently `plt = self`, so inspect the current object for the methods available to support this feature like :meth:`plot` and :meth:`contour`. .. note:: This method should not clear the figure etc. as this is done by :meth:`plot_iteration`. Parameters ---------- plt : object Object with methods like `plot` and `contour` which should be used to allow for automatic plotting of previous iterations. data : object Object returned by :meth:`iter_data`. This should be used to compute and store results to be plotted. """ pass def iter_data(self, x, x0, G0, dx0, F0, solver, data=None): r"""Return data from the iteration for plotting etc. This optional method is called by the solver after each iteration with the current solution `x`. The object it returns will be passed to :meth:`plot_iter` as the keyword argument `data`. This allows the problem to calculate and accumulate results for plotting or for recording the evolution/convergence history. Parameters ---------- x : array Current guess. x0 : array, None Previous guess. G0 : array or tuple or None `G0 = G(x0)` was the previous step. If :attr:`newton`, then this is the tuple `(G0, dx)` where ``dx = -np.dot(j_inv, G0)`` is the full Newton step. solver : ISolver Current solver. This can be queried for convergence requirements etc. data : object or None Previous data object from last iteration. The first iteration this is `None`. """ return None def gca(self): import pylab return pylab.gca() def xlabel(self, *v, **kw): import pylab if not self._no_plot: return pylab.xlabel(*v, **kw) else: return None def ylabel(self, *v, **kw): import pylab if not self._no_plot: return pylab.ylabel(*v, **kw) else: return None def title(self, *v, **kw): import pylab if not self._no_plot: return pylab.title(*v, **kw) else: return None def subplot(self, *v): import pylab self._plot_key = v if not self._no_plot: pylab.subplot(*v) def plot(self, *v, **kw): import pylab if 'no_hist' not in kw: self._plot_data.setdefault(self._plot_key, []).append( (pylab.plot, deepcopy(v), kw)) if not self._no_plot: pylab.plot(*v, **kw) def semilogy(self, *v, **kw): import pylab if 'no_hist' not in kw: self._plot_data.setdefault(self._plot_key, []).append( (pylab.semilogy, deepcopy(v), kw)) if not self._no_plot: pylab.semilogy(*v, **kw) def contour(self, *v, **kw): import pylab if 'no_hist' not in kw: self._plot_data.setdefault(self._plot_key, []).append( (pylab.contour, deepcopy(v), kw)) if not self._no_plot: pylab.contour(*v, **kw) def plot_iteration(self, x, x0, G0, dx0, F0, problem, solver_data, trial=False, plot=False, mesgs=_MESGS): r"""Plot the current iteration. This version calls :meth:`plot_iter` to plot the current iteration and plots the previous :attr:`n_prev_plots` iterations in lighter yellow. Parameters ---------- x : array New attempted step x0, G0, dx0, F0 : array Old step and objective functions etc. problem : IProblem solver_data : ISoverData This will be used to set `solver_data.iter_data`. trial : bool If `True`, then the present step will not be coloured in black (it will be red instead) and it will not be added to the history. This should be set for trial steps. plot : bool Pass the `solver.plot` flag here. """ solver_data.iter_data = problem.iter_data( x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, solver=self, data=solver_data.iter_data) # pylint: disable-msg=E1101 if plot and IPlotProblem.providedBy(problem): try: # Don't fail on plotting! import pylab self._prev_plots = getattr( self, '_prev_plots', [])[-(self.n_prev_plots):] pylab.ioff() if trial: # Save trial data self._plot_data = {} self._no_plot = True try: self.plot_iter(plt=self, data=solver_data.iter_data) except: raise finally: self._no_plot = False if trial: c = 'red' for v in self._plot_data: pylab.subplot(*v) for cmd, v_, kw_ in self._plot_data[v]: if cmd is pylab.contour: kw_['colors'] = [c] else: kw_['color'] = c kw_['label'] = None cmd(*v_, **kw_) else: pylab.clf() n_c = self.n_prev_plots c0 = np.array((1.0, 1.0, 1.0)) c1 = np.array((0.9, 0.9, 0.0)) for n, prev in enumerate(self._prev_plots): c = (n + 1)/n_c*c1 + (1 - (1 + n)/n_c)*c0 for v in prev: pylab.subplot(*v) for cmd, v_, kw_ in prev[v]: if cmd is pylab.contour: kw_['colors'] = [c] else: kw_['color'] = c kw_['label'] = None cmd(*v_, **kw_) self._plot_data = {} self.plot_iter(plt=self, data=solver_data.iter_data) self._prev_plots.append(self._plot_data) pylab.ion() pylab.draw() except Exception, e: mesgs.warn("Plotting error %s:" % (str(e), ))
[docs]class Problem_x(PlotProblem): """Partial implementation of :class:`IProblem` interface. Same as :class:`Problem` but with an additional attribute :attr:`fn_rep` representing a discretization. To be used with :class:`State_x`, and provides an updated :meth:`solver_update` utilizing this. """ _state_vars = [ ('fn_rep', Required, \ """Object for converting between functions and data. Must implement the :class:`IFnRep` interface."""), ] process_vars() def solver_update(self, state, p_dict): """This is called by the solver to implement the schedule (see :attr:`ISolver.schedule`). It should update the problem and `state` according to the parameters in `p_dict`, returning the updated state. It should be careful not to obliterate the state history. Parameters ---------- state : IState This is the current state object. p_dict : dict Dictionary of updated parameters. If the problem is an instance of :class:`StateVars`, then this could be passed to :meth:`StateVars.initialize`. """ if id(self.fn_rep) == id(state.fn_rep): # We need to keep the old fn_rep in the state so it can do # an update state.fn_rep = deepcopy(state.fn_rep) self.initialize(**p_dict) state._change_fn_rep(self.fn_rep) return state
class Converged(StateVars): """Boolean like object that represents convergence information. Convergence is True if any condition is met. >>> bool(Converged(Step=False, Func=False)) False >>> bool(Converged(Step=True, Func=False)) True >>> bool(Converged(Step=False, Func=True)) True >>> bool(Converged(Step=np.bool_(True), Func=True)) True """ implements(IConverged) _state_vars = [ ('Step', False, \ "The step size is smaller than specified."), ('Func', False, \ "G(x) is smaller than specified."), ('err', None, \ "Maximum absolute error (for reporting purposes)"), ] process_vars() def __nonzero__(self): """Return True if converged, otherwise False.""" return bool(self.Step or self.Func) def __str__(self): """Single-line string rep for nice reporting.""" return ("Step: %s, Func: %s, err: %g" % (self.Step, self. Func, self.err) )
[docs]class SolverData(StateVars): """Default implementation of solver data objects.""" implements(ISolverData) _state_vars = [ ('iter_data', None, \ r"""This is the object returned by :meth:`IProblem.iter_data`."""), ('iteration_number', 0), ('j_inv_G', NotImplemented, """An object implementing :interface:`IJinv` that acts like a matrix approximating the inverse of the Jacobian for `G` at the current step."""), ('j_inv_F', NotImplemented, """An object implementing :interface:`IJinv` that acts like a matrix approximating the inverse of the Jacobian for `G=x-F` at the current step."""), ] process_vars()
[docs] def new(self, solver_data=None, with_self=False): """If `type(solver_data)` is a subclass of `type(self), then return a copy constructed instance of of `solver_data`, otherwise return a new blank copy. Parameters ---------- solver_data : ISolverData with_self : bool If `True`, then include all the data from `self` with appropriate entries overwritten from `solver_data`. Examples -------- >>> from broyden import JinvBroyden >>> s0 = SolverData() >>> s1 = SolverData(j_inv_F=JinvBroyden()) >>> class SD(SolverData): ... _state_vars = [('b', None)] ... process_vars() >>> s2 = SD() >>> s3 = s2.new(solver_data=s1) >>> s3.b = 3 >>> s3.j_inv_F JinvBroyden() >>> s4 = s1.new(s3) >>> s4.j_inv_F JinvBroyden() >>> s5 = s3.new(s0) >>> s5.j_inv_F Traceback (most recent call last): ... AttributeError: 'SD' object has no attribute 'j_inv_F' >>> s5 = s3.new(s0, with_self=True) >>> s5.j_inv_F JinvBroyden() """ if with_self: res = self.__class__(self) else: res = self.__class__() if issubclass(type(self), type(solver_data)): args = dict(solver_data.items()) else: args = dict([(key, getattr(solver_data, key)) for key in self._vars if hasattr(solver_data, key)]) res.initialize(**args) return res
[docs]class Solver(StateVars): """Default partial implementation for solver objects. The user only needs to overload iterate() in order to provide a useful subclass.""" implements(ISolver) _state_vars = [ ('solver_data0', SolverData(), "Blank SolverData instance."), ('G_tol', _ABS_TOL, "See :attr:`ISolver.G_tol`"), ('x_tol', _ABS_TOL, "See :attr:`ISolver.x_tol`"), ('x_tol_rel', _REL_TOL, "See :attr:`ISolver.x_tol_rel`"), ('tol_safety_factor', 10, \ """"Increase the tolerances when computing function by this amount so that roundoff error will not affect derivative computations etc."""), ('norm_p', None, "See :attr:`ISolver.norm_p`"), ('min_abs_step', _ABS_TOL, "Fail if step size falls below this"), ('min_rel_step', _REL_TOL, "Fail if relative step size falls below this"), ('max_iter', np.inf, "Maximum number of iterations."), ('max_time', np.inf, "Maximum time."), ('max_step', np.inf, r"""To prevent too large steps (mainly to prevent converging to an undesired solution) the step size of any component will never be larger than this."""), ('schedule', [], r"""List of pairs of dictionaries `(fn_rep_dict, solver_dict)` determining the schedule. The solver the solution will be run through the standard solver with the solver and state updated with these arguments at each stage of the schedule. One might like to start solving with low-resolution and low accuracy, slowly increasing these for example."""), ('verbosity', _VERBOSITY, \ "Level of verbosity: 0 means quiet"), ('message_indent', _MESSAGE_INDENT), ('mesgs', Mesg(), "Messenger object"), ('last_state', Excluded, \ """The last good state. This may be used after an exception is raised to obtain the state that caused the exception or the last good state before the exception occured (in the case of an external interrupt for example. To resume the computation, one should be able to call :meth:`ISolver.solve()` with this state as an input (i.e. this state should be complete with the history set etc. To ensure this, use :meth:`IState.solver_set_x_()`). """), ('plot', False, \ """If the problem implements :class:`IPlotProblem` and this is `True`, then the results are plotted after each iteration. """), ('debug', False, """If `True`, then store some debugging information in :attr:`_debug`."""), ('_debug', Excluded(Container()), """Debugging data."""), ('_f_evals', Internal(0), r"""Total number of function evaluations. Used by :meth:`solve` and :meth:`problem_step`."""), ] process_vars() ########################################################## # Required methods: These need to be defined by subclasses.
[docs] def iterate(self, x, problem, solver_data, x0=None, G0=None, dx0=None, F0=None, mesgs=None): r"""Perform a single iteration and return the triplet `(x_new, (x, G, dx, F))`. Mutate `solver_data` if used. Parameters ---------- x : array Step suggested by the solver. Real 1-d array of the same length as returned by :meth:`__len__` x0 : array Previous step and the point at which `G0`, `dx0`, and `F0` are evaluated. If this is `None`, then this is the first step of a new solver cycle. G0 : array Objective function `G0(x0)` evaluated at `x0`. dx0 : array Suggested step from `x0` based on `G0`. F0 : array Fixed-point iteration `F(x0)` evaluated at `x0`. problem : IProblem Problem instance implementing :class:`IProblem`. solver_data : ISolverData Solver data instance implementing :class:`ISolverData`. mesgs : IMesgs Messaging should be sent through this instance. It will be a new status bar in the current messaging window. Returns ------- x_new : array Proposed step. The solver guarantees that this will be the next argument `x` to iterate. This should not have been tested yet (i.e. `G(x_new)` has not yet been calculated) and may not be a very good approximation (for example, a line search may still need to be performed.) x, G, dx, F : array (real and same shape as `x`). Objective function, `G(x)`, recommended step `dx`, and fixed-point objective `F(x)` evaluated at `x`. See :meth:`IProblem.step` for details. .. note:: These must have been validated by the problem, being returned at some point by :meth:`IProblem.step`. The solver must decide which of these is needed and set the appropriate `compute_*` flags when calling :meth:`IProblem.step`. Raises ------ IterationError This may be raised if the algorithm is forced to terminate because a root cannot be found. The current state will be saved, messages printed, and then the exception raised again. Notes ----- Here is a sample default implementation:: mesgs = self.mesgs.new() (x1, G1, dx1, F1) = problem.step( x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, compute_G=True, compute_dx=True, compute_F=True, extrapolate=lambda _:x, abs_tol=self.abs_tol, rel_tol=self.rel_tol, iter_data=solver.iter_data, mesgs=mesgs) if dx1 is None: if F1 is None: # No information is known about the step here! One # should probably be careful and take a small step # until one learns about the function. x_new = x1 - G1 else: # Assume that x -> F(x) provides a good iteration. x_new = F1 else: x_new = x1 + dx1 return (x_new, (x1, G1, dx1, F1)) """ raise NotImplementedError() ##########################################################
[docs] def print_iteration_info(self, iteration_number, state, start_time): """Print iteration information.""" elapsed_time = time.time() - start_time message = ("Iteration: %5i Elapsed time: %f Converged: %s" % (iteration_number, elapsed_time, state.converged)) self.mesgs.iter(message)
def _G(self, problem, x, iter_data, mesgs): """Simple call to problem.G with current tolerances.""" return problem.G(x, iter_data=iter_data, abs_tol=self.abs_tol, rel_tol=self.rel_tol, mesgs=mesgs) def _step(self, problem, iter_data, x, x0, G0, dx0, F0, compute_G, compute_dx, compute_F, mesgs): r"""Simple call to :meth:`step` with current tolerances and extrapolate.""" return problem.step(x, x0=x0, G0=G0, dx0=dx0, F0=F0, compute_G=compute_G, compute_dx=compute_dx, compute_F=compute_F, extrapolate=\ lambda xc:self._extrapolate(x, x0, xc), iter_data=iter_data, abs_tol=self.abs_tol, rel_tol=self.rel_tol, mesgs=mesgs) def _extrapolate(self, x, x0, x_controls): r"""Redefine this to return a corrected step if information exists.""" x = copy(x) inds, vals = x_controls inds = np.asarray(inds, dtype=int) vals = np.asarray(vals) x[inds] = vals return x
[docs] def solve(self, problem, state, iterations=None): """Return the new state representing the solution to the specified problem starting from the specified state. If iterations is specified, exactly this number of steps should be executed. The new state should have :meth:`State.solver_set_x_` called to set the solution (optional: and :meth:`State.solver_data set`). Note: state may be mutated. Parameters ---------- problem : IProblem Instance of the problem to solve. state : IState Initial state. (Usually obtained by calling `problem.get_initial_state()`). iterations : int Number of iterations to perform. This is intended to be used to reproduce a state from a history by executing an exact number of iterations. """ state = problem.state0.copy_from(state) self.last_state = state if self.debug: self._debug.steps = [] # Preserve clean copies for the the archival process. We # won't modify the originals solver_ = deepcopy(self) problem_ = deepcopy(problem) if self.schedule: schedule = self.schedule else: schedule = [({}, {})] x = state.x_ if problem.active is not (): # There are specific active variables, the # rest should be held fixed. inactive = ~problem.active x_inactive = x[inactive] start_time = time.time() messages = {} self._f_evals = 0 state.solver_data = solver_.solver_data0.new( getattr(state, 'solver_data', None), with_self = True) state.solver_data.iteration_number = 0 try: if hasattr(problem_, 'pre_solve_hook'): problem_.pre_solve_hook(state=state) # Invariant: # Either iteration_number = 0 and state history is # valid or calling state.record_history() will make it # valid. Note if an exception is thrown, the state # itself may not be valid, but the last history will # be. # # Caveat: if an exception is thrown before # iteration_number is incremented, then the state # history may be one step behind. for (p_dict, s_dict) in schedule: if p_dict: state = problem_.solver_update(state, p_dict) if s_dict: solver_.initialize(**s_dict) x = state.x_ x0 = G0 = dx0 = F0 = None state.converged = False if iterations: solver_.max_iter = min( solver_.max_iter, iterations - state.solver_data.iteration_number) failed = False while (not state.converged and not failed and solver_._has_resources( state, start_time, state.solver_data.iteration_number)): if IPlotProblem.providedBy(problem): problem.plot_iteration( x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem, solver_data=state.solver_data, plot=self.plot, trial=False) mesgs = self.mesgs.new() try: ####################################### # Here is the solver step. Don't blink! ####################################### with catch_warnings(record=True) as ws: if _DEBUG and x0 is not None: x0_ = 1*x0; F0_ = 1*F0 if (_DEBUG and abs(F0 - problem_.F(x0)).max() > 1e-12): import pdb;pdb.set_trace() (x_, (x1, G1, dx1, F1)) = solver_.iterate( x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem_, solver_data=state.solver_data, mesgs=mesgs) if (_DEBUG and abs(F1 - problem_.F(x1)).max() > 1e-12): import pdb;pdb.set_trace() if self.debug: self._debug.steps.append((x1, G1, dx1, F1)) except IterationError, err: exc_info = sys.exc_info() failed = True reason = err finally: # Process messages and delete messenger. self.mesgs.messages.extend(mesgs.messages) self.mesgs.delete_sub() if problem.active is not (): # We can force x_ to have the correct values, # but cannot force x1, etc. as this might # affect active G1, F1 fields. Instead, we # raise an exception if the solver does not # respect the active behaviours. x_[inactive] = x_inactive if (x1 is not None and not np.allclose(x1[inactive], x_inactive)): raise ValueError( "Inactive parts of x1 changed by solver.") if (G1 is not None and not np.allclose(G1[inactive], 0)): raise ValueError( "Inactive parts of G1 not zero from solver.") if (F1 is not None and not np.allclose(F1[inactive], x_inactive)): raise ValueError( "Inactive parts of F1 changed by solver.") for w in ws: self.mesgs.warn(str(w)) if failed: self.mesgs.error("Failed: %r" % str(reason)) else: # Update Jacobians if x0 is not None: if x1 is not None: # Add new step. x_update = x1 g_update = G1 f_update = F1 else: # Initial step, add base point. x_update = x0 g_update = G0 f_update = F0 if (hasattr(state.solver_data, 'j_inv_G') and state.solver_data.j_inv_G is not NotImplemented and g_update is not None): state.solver_data.j_inv_G.update( x0=x_update, g0=g_update) if (hasattr(state.solver_data, 'j_inv_F') and state.solver_data.j_inv_F is not NotImplemented and f_update is not None): state.solver_data.j_inv_F.update( x0=x_update, g0=(x_update - f_update)) if self.mesgs.messages: messages[state.solver_data .iteration_number] = self.mesgs.messages self.mesgs.messages = [] state.solver_data.iteration_number += 1 if not failed: # Originally we had x_=x1 here. Why not x_? state.solver_set_x_(x_=x_) try: state.converged = solver_.converged( x=x_, x1=x1, G1=G1, dx1=dx1, F1=F1, x0=x0, G0=G0, dx0=dx0, F0=F0, solver_data=state.solver_data, problem=problem) except IterationError, err: exc_info = sys.exc_info() failed = True reason = err # Enforce maximum step size here dx = x_ - x1 scale = (np.linalg.norm(dx, ord=self.norm_p) /self.max_step) if scale > 1: dx /= scale x_ = x1 + dx solver_.print_iteration_info( state.solver_data.iteration_number, state=state, start_time=start_time) if failed: raise exc_info[1], None, exc_info[2] x, x0, G0, dx0, F0 = x_, x1, G1, dx1, F1 except: raise finally: # If there are any exceptions we catch them, set the state # history so it is complete, and then throw the exception. self.mesgs.clear() if 0 < state.solver_data.iteration_number: state.messages.append(messages) state.record_history( problem=problem, solver=self, iterations=state.solver_data.iteration_number) return state, state.solver_data.iteration_number
def _has_resources(self, state, start_time, iteration_number): """Return True if there are resources to proceed with the calculation. (I.e. iterations or time).""" has_resources = True if iteration_number >= self.max_iter: has_resources = False state.messages.append( "No more iterations left: max_iter = %i"%self.max_iter) elif (time.time() - start_time) >= self.max_time: has_resources = False state.messages.append( "No more time left: max_time = %f"%self.max_time) return has_resources def _converged(self, x, x1, G1, dx1, F1, x0, G0, dx0, F0, problem): """Return Converged instance specifying if convergence criteria are met. This uses the basic :class:`Converged` class. Parameters ---------- x : array Suggested step. x1, G1, dx1, F1 : array Last point at which `G1 = G(x1)` etc. were evaluated. x0, G0, dx0, F0 : array Previous point at which `G0 = G(x0)` etc. were evaluated. """ if G1 is None: if F1 is None: raise ValueError( "Problem must define at least one of F or G") if x1 is None: # Possible initial step return False G1 = x1 - F1 if F1 is None: F1 = x1 - G1 err_G = np.linalg.norm(G1/(problem.G_scale*self.G_tol + _TINY), ord=self.norm_p) if dx1 is None: if x0 is None or x1 is None: # Possible initial step. return False dx1 = np.abs(x1 - x0) #dx1 = np.abs(J1*(x1 - x0))/(np.abs(G1 - G0) + _TINY) err_x_abs = np.abs(dx1/(problem.x_scale*self.x_tol + _TINY)) err_x_rel = np.abs(dx1/(np.abs(x1)*self.x_tol_rel + _TINY)) err_x = np.minimum(err_x_abs, err_x_rel) converged = Converged() #converged.Step = err_x.max() <= 1 converged.err = np.minimum(err_x, err_G).max() converged.err = err_G.max() converged.Func = err_G <= 1 if converged.Step and not converged.Func: # Possibly raise a warning that the steps are becoming # small. #raise IterationError("Failure: Min step size reached.") pass return converged
[docs] def converged(self, x, x1, G1, dx1, F1, x0, G0, dx0, F0, solver_data, problem): r"""This should return a :class:`Converged` instance with the convergence state. You can overload this. The default method is :meth:`_converged` which does the basis tolerance checking. If there is another reason for termination because of an error, then an :exc:`IterationError` may be raised. """ return self._converged(x=x, x1=x1, G1=G1, dx1=dx1, F1=F1, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem)
[docs] def problem_step(self, problem, solver_data, x, x0=None, G0=None, dx0=None, F0=None, compute_G=False, compute_dx=False, compute_F=False, extrapolate=None, iter_data=None, abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS, trial=True): r"""This should be used instead of calling `problem.step()` as it does the following: 0) Calls `problem.step()`. 1) Adjusts `x`, `G`, and `F` to exclude the inactive parameters. 2) Calls `problem.plot_iterations` with `trial=True`. 3) Increments :attr:`_f_evals`. 4) Increases the tolerances by :attr:`tol_safety_factor`. """ if abs_tol is None: abs_tol = self.G_tol abs_tol /= self.tol_safety_factor rel_tol /= self.tol_safety_factor inactive = () if problem.active is not (): inactive = np.logical_not(problem.active) if x0 is None: x_inactive = x[inactive] else: x_inactive = x0[inactive] x, G, dx, F = problem.step(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, compute_G=compute_G, compute_dx=compute_dx, compute_F=compute_F, extrapolate=extrapolate, iter_data=solver_data.iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) if inactive is not (): x[inactive] = x_inactive if dx is not None: dx[inactive] = 0 if G is not None: G[inactive] = 0 if F is not None: F[inactive] = x_inactive self._f_evals += 1 if IPlotProblem.providedBy(problem): problem.plot_iteration( x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem, solver_data=solver_data, plot=self.plot, trial=trial) return x, G, dx, F
[docs] def problem_F(self, problem, solver_data, x, x0=None, G0=None, dx0=None, F0=None, extrapolate=None, iter_data=None, abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS, trial=True): r"""This should be used instead of calling `problem.F()` as it does the following: 0) Calls `problem.F()` or `problem.step()`. 1) Adjusts `F` to fix the inactive parameters. 2) Calls `problem.plot_iterations` with `trial=True`. 3) Increments :attr:`_f_evals`. 4) Increases the tolerances by :attr:`tol_safety_factor`. """ if IPlotProblem.providedBy(problem): problem.plot_iteration(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem, solver_data=solver_data, plot=self.plot, trial=trial) if abs_tol is None: abs_tol = self.G_tol abs_tol /= self.tol_safety_factor rel_tol /= self.tol_safety_factor if hasattr(problem, 'F'): F = problem.F(x=x, iter_data=solver_data.iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) else: _x, _G, _dx, F = problem.step(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, compute_G=False, compute_dx=False, compute_F=True, extrapolate=extrapolate, iter_data=solver_data.iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) if problem.active is not (): inactive = np.logical_not(problem.active) F[inactive] = x[inactive] self._f_evals += 1 return F
[docs] def problem_G(self, problem, solver_data, x, x0=None, G0=None, dx0=None, F0=None, compute_dx=False, extrapolate=None, iter_data=None, abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS, trial=True): r"""This should be used instead of calling `problem.F()` as it does the following: 0) Calls `problem.F()` or `problem.step()`. 1) Adjusts `F` to fix the inactive parameters. 2) Calls `problem.plot_iterations` with `trial=True`. 3) Increments :attr:`_f_evals`. 4) Increases the tolerances by :attr:`tol_safety_factor`. """ if abs_tol is None: abs_tol = self.G_tol abs_tol /= self.tol_safety_factor rel_tol /= self.tol_safety_factor if IPlotProblem.providedBy(problem): problem.plot_iteration(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, problem=problem, solver_data=solver_data, plot=self.plot, trial=trial) if hasattr(problem, 'G'): G, dx = problem.G(x=x, iter_data=solver_data.iter_data, compute_dx=compute_dx, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) else: _x, G, dx, _F = problem.step(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, compute_G=True, compute_dx=compute_dx, compute_F=False, extrapolate=extrapolate, iter_data=solver_data.iter_data, abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs) if problem.active is not (): inactive = np.logical_not(problem.active) G[inactive] = 0 if dx is not None: dx[inactive] = 0 self._f_evals += 1 return G, dx
class Scale(Solver): """Example Solver class that multiplies the solution by c. We need to define this here so that the doctests for State work. """ _state_vars = [('c', Required, "Multiplicative factor")] process_vars() def solve(self, problem, state, iterations=None): state = deepcopy(state) state.solver_set_x_(x_=state.x_*self.c) state.record_history(solver=self, problem=problem, iterations=1) return state, 1
[docs]class BareState(StateVars): """Default implementation of a bare :class:`IState`. This only uses the flat representation of the state provided by :attr:`x_`. >>> s = BareState(x_=[1, 2, 3]) >>> s # doctest: +ELLIPSIS BareState(x_=[1, 2, 3]) >>> s1 = BareState(s) >>> print s1 # doctest: +ELLIPSIS x_=[1, 2, 3] converged=False messages=[] sensitive=[] store_history=False history=[] Excluded: _solver_setting_x_=False >>> class Scale(Solver): ... "Example Solver class that multiplies the solution by c." ... _state_vars = [('c', Required, "Multiplicative factor")] ... process_vars() ... def solve(self, problem, state, ... iterations=None): ... state = deepcopy(state) ... state.solver_set_x_(x_=state.x_*self.c) ... state.record_history(problem=problem, solver=self, ... iterations=1) ... return state, 1 >>> times = Scale(c=2) >>> state0 = BareState(x_=np.array([1, 2, 3]), store_history=True) >>> state1, its = times.solve(problem=None, state=state0) >>> times.c = 3 >>> state2, its = times.solve(problem=None, state=state1) >>> state2.x_ array([ 6, 12, 18]) >>> s = state2.get_history() >>> d = {} >>> exec(s, d) >>> d['state'].x_ array([ 6, 12, 18]) """ implements(IState) _state_vars = [ ('x_', Required, """This is a flat representation of the state of the object. It should be implemented as a property with get and set functionality. Setting :attr:`x_` should clear the history and initialize it with an archived version of :attr:`x_`. We use the name :attr:`x_` to avoid clashes with meaningful parameters which might be called ``x``."""), ('solver_data', NotImplemented, \ """Solver specific data representing the state of the solver (for example, gradient info). This may be used by subsequent solvers if they are related, but is typically ignored."""), ('converged', False, \ """Boolean like object signifying if state is from a converged calculation."""), ('messages', [], \ """Messages accumulated during the iteration, such as reasons for termination."""), ('sensitive', [], \ """List of indices of sensitive parameters. These are parameters that must be adjusted slowly or only once a degree of convergence has been achieved. For example, the chemical potentials in a grand canonical ensemble. Solvers may treat these specially."""), ('store_history', False, """If `True`, then a record of the solver history will be saved. This can be somewhat slow and is designed for large scale solver applications where the number of iterations is small, but the cost of each iteration is large. When using the solver in inner loops, this should be set to `False` to speed the evaluation. This is typically set by the solver when :meth:`ISolver.solve` is called, so the default here is `False` for efficiency."""), ('history', [], \ """History of the state as a sequence of strings to be executed in order to reproduce the current state. ``history[0]``: should be an archive that can be evaluated to define the variable 'state' (which is the initial state). This is either the result of a :meth:`IProblem.get_initial_state()` or a direct assignment to :attr:`x_`. ``history[1:]``: should be a series of ``(solver, problem, iterations)`` tuples that specify which solver and problems were used (and how many iterations) to obtain the n'th state from the previous state. By specifying the number of iterations, one allows for calculations that were interrupted before they reached convergence. (The actual format is not a requirement for the interface, only that the function :meth:`IState.get_history()` can return the appropriate archived string.) The references to solvers and problems should be kept in a careful manner: if the solver or problem instances are modified, then somehow these objects should be archived in the state prior to the modification so that the representation is faithful. However, archiving each copy could lead to a history bloat. This represents a bit of a design problem."""), ('_solver_setting_x_', Excluded(False), \ """Flag that a solver is setting x, and hence not to reset history"""), ] process_vars()
[docs] def __init__(self, *varargin, **kwargs): """This version of BareState is implemented as a :class:`mmf.objects.StateVars` subclass, thus, __init__ is called each time `self.x_` is directly set. We use this functionality to reset the history. Thus: ***Subclasses must call State.__init__() explicitly, or else reproduce this functionality.*** """ if self.store_history and 'x_' in kwargs: self._x_set()
def _reset(self): """Reset the state, clearing out solver_data, history etc.""" try: del self.solver_data except AttributeError: pass self.converged = False self.history = [] self.messages = [] def _x_set(self): """Record the setting of x_ in history.""" if self.store_history and not self._solver_setting_x_: self._reset() self.history.append(self.archive('state'))
[docs] def record_history(self, problem, solver=None, iterations=None): """Add problem, solver and iterations to self.history. This information should be used by history() to reproduce the state. """ if self.store_history: a = mmf.archive.Archive() if solver: a.insert(problem=problem, solver=solver) self.history.append((str(a), iterations)) else: a.insert(problem=problem) self.history.append((str(a), ))
[docs] def get_history(self): """Return a string that can be evaluated to define the state:: env = {} exec(self.get_history(), env) state = env['state'] Raises ------ ValueError If there is no history (i.e. if :attr:`store_history` is `False`). """ if not self.store_history: raise ValueError("get_history() called on a state with " + "store_history=False") a = mmf.archive.Archive() a.insert(history=self.history) hist_lines = [self.history[0]] for a_i in self.history[1:]: if 2 == len(a_i): archive, iterations = a_i hist_lines.append(archive) hist_lines.append("state, niter = solver.solve(" "problem=problem, " "state=state, " "iterations=%i)"%iterations) else: archive = a_i[0] hist_lines.append(archive) hist_lines.append("state = problem.state0.copy_from(state)") return "\n".join(hist_lines)
[docs] def solver_set_x_(self, x_): """This should set x and include an entry in the history log indicating that the present values were the result of a call to (or could be reproduced by a call to) `solver.solve(problem, self, iterations)`. This must be careful not to directly set `self.x_` which would trigger reset of the history. """ self._solver_setting_x_ = True self.x_ = x_ self._solver_setting_x_ = False
[docs] def copy_from(self, *v, **kwargs): r"""Return a copy:: state.copy_from(a=..., b=...) state.copy_from(state, a=..., b=...) This allows one to convert from two slightly incompatible forms. See :class:`State_x` for an example. This should be a deep copy of the object: no data should be held in common. Parameters ---------- *v : IState , optional State to copy. If not provided, then uses `state=self`. **kwargs : dict, optional Any additional parameters should be used to initialize those values. Notes ----- This version just uses the copy constructor. """ if 0 == len(v): state = self elif 1 == len(v): state = v[0] else: raise ValueError("copy_from accepts only one positional "+ "arg: `state`") return self.__class__(deepcopy(state), **kwargs)
[docs] def copy_from_x_(self, x_): """Return a copy of `self` initialized from `x`. """ new = deepcopy(self) new.x_ = copy(x_) return new
[docs]class State(BareState, UserDict.DictMixin): """Default implementation of :class:`IState` with parameters. Here :attr:`x_` is defined through a list of parameters that provide specialized access to the state vector :attr:`x_`. To do this, the user must define :attr:`parameters` which is a list of the parameter names. These parameters are stored independently and the actual :attr:`x_` is assembled as needed from these (a cache may be used to speed this up if performance is a problem). The vector :attr:`x_` will be defined by packing these together in the order defined by the parameter_names list, where each element is converted to a one-dimensional array sequence using the following rules:\n 1) If the object has a method called 'ravel' then this is used, 2) otherwise :func:`numpy.ravel()` is used. 3) If :attr:`force_real` then any complex parameters are flattened with the real array first followed by the imaginary array. The default or initial values supplied define the shape of the vector :attr:`x_`. .. note:: In order for the setting of attributes to work properly, "array" attributes (i.e. those having a length) must return a reference so that they can be assigned efficiently without reallocation using the `flat` attribute or using slicing. (In particular, they must not return a copy). Examples -------- >>> class State2(State): ... _state_vars = [ ... 'a', 'b', 'c', ... ('parameters', ClassVar(['a', 'b', 'c'])), ... ('force_real', ClassVar(True))] ... process_vars() >>> s = State2(a=1, b=[2.1, 2.2], c=3+3j) >>> s.x_ # doctest: +NORMALIZE_WHITESPACE array([ 1. , 2.1, 2.2, 3. , 3. ]) Note that we can define parameters dynamically if required: >>> class State3(State): ... _ignore_name_clash = ['parameters'] ... _state_vars = ['a', 'b', 'c', ... ('parameters', Required), ... ('force_real', True)] ... process_vars() >>> s = State3(a=1, b=[2.1, 2.2], c=3+3j, ... parameters=['a', 'b', 'c']) >>> s.x_ # doctest: +NORMALIZE_WHITESPACE array([ 1. , 2.1, 2.2, 3. , 3. ]) >>> s.parameters = ['a', 'b'] >>> s.x_ # doctest: +NORMALIZE_WHITESPACE array([ 1. , 2.1, 2.2]) >>> class Scale(Solver): ... "Example Solver class that multiplies the solution by c." ... _state_vars = [('c', Required, "Multiplicative factor")] ... process_vars() ... def solve(self, problem, state, iterations=None): ... state = deepcopy(state) ... state.solver_set_x_(x_=state.x_*self.c), ... state.record_history(solver=self, problem=problem, ... iterations=1) ... return state, 1 >>> times = Scale(c=2) >>> state0 = State2(a=1, b=[2, 3], c=4+5j, store_history=True) >>> state1, niter = times.solve(problem=None, state=state0) >>> times.c = 3 >>> state2, niter = times.solve(problem=None, state=state1) >>> [state2.a, state2.b, state2.c] [6.0, [12.0, 18.0], (24+30j)] >>> state2.x_ # doctest: +NORMALIZE_WHITESPACE array([ 6., 12., 18., 24., 30.]) >>> s = state2.get_history() >>> d = {} >>> exec(s, d) >>> d['state'].x_ # doctest: +NORMALIZE_WHITESPACE array([ 6., 12., 18., 24., 30.]) """ _state_vars = [ ('x_', Deleted, "Now this is a property"), ('_x_', Excluded, "Temporary cache for parameter `x_`"), ('parameters', ClassVar(Required), \ """Parameter name list. The vector :attr:`x_` is a sequence of these, flattened. One can directly access the appropriate part of :attr:`x_` using these attribute names"""), ('sensitive_parameters', ClassVar([]), \ """Name list of sensitive parameters."""), ('force_real', True, \ """Force :attr:`x_` to be real if using :attr:`parameter_names`. Complex parameters are flattened, first reals, followed by imaginaries."""), ('sensitive', Computed, \ """List of indices of sensitive parameters."""), ('_packing_profile', Excluded, \ """Information used to pack and unpack :attr:`x_` into the parameters. This is a dictionary with the following keys: `'vars'`: list of tuples `(name, cmplx)` where `name` is the name of the variable (as in :attr:`parameters`) and `cmplx` is a flag indicating whether or not the parameter is complex. `'packers'`: dict of names informing with method to use to pack. The keys are the :attr:`parameters` and the names are strings. Presently the packers are one of either :meth:`_pack_array` for arrays or :meth:`_pack_num` for numbers. `'indices'`: dict of tuples `(offset, n)` where there `n` parameters start at offset `offset` in :attr:`_x`. If the parameter is complex, then there are `2n` data-points, first the real values, and then the imaginary values (if :attr:`force_real` is `True`). """), ('use_cache', False, \ r"""If `True`, then use a cache to speed the forming of :attr:`x_` when requested. If `False`, then :attr:`x_` will be assembled each time. The default is `False` because that is safer (there may be ways of changing the object so that this gets out of sync."""), ] process_vars() def __str__(self): """Return a pretty string showing the state.""" lines = [" %s = %r" % (k, getattr(self, k)) for k in (self.parameters + ['converged'])] return "\n".join(lines)
[docs] def __init__(self, *v, **kw): """This version of State is implemented as a :class:`mmf.objects.StateVars` subclass, thus, :meth:`__init__` is called each time :attr:`x_` is directly set. We use this functionality to reset the history. Thus: ***Subclasses must call State.__init__() explicitly, or else reproduce this functionality.*** """ if (1 == len(v) and hasattr(v[0], 'items')): # Called as a copy constructor. State has been copied from # other by __new__. You do not need to copy the # attributes here, but may need to copy calculated # parameters. other = v[0] if self.use_cache: self.__dict__['_x_'] = other.x_ self.sensitive = other.sensitive for k in kw: # Set any auxiliary values. These override x_ setattr(self, k, kw[k]) elif 0 < len(v): # Another argument passed without kw. Error here. raise ValueError( "State accepts only keyword arguments.") else: if 'parameters' in kw or (set(self.parameters) - set(kw)): self._define_packing() if self.use_cache: self.__dict__['_x_'] = self._compute_x_() self.sensitive = [] for v in self.sensitive_parameters: if v in self.parameters: offset, n = self._packing_profile['indices'][v] self.sensitive.extend(np.arange(offset, offset + n)) if 0 < len(set(self.parameters).intersection(kw)): # Some parameter being set. if self.use_cache: self.__dict__['_x_'] = self._compute_x_() if self.store_history: self._x_set() if 'converged' in kw: # Keep convergence information self.converged = kw['converged']
def _compute_x_(self): """Compute and return the vector representation of :attr:`x_`. Called in `__init__` to ensure that `self.x_` is up to date. """ x_s = [] for (v, cmplx) in self._packing_profile['vars']: val = np.ravel(getattr(self, v)) if cmplx: x_s.append(val.real) x_s.append(val.imag) else: assert np.isreal(val).all() x_s.append(np.real(val)) return np.hstack(x_s)
[docs] def get_x_(self): r"""Return `x_`""" if self.use_cache: if '_x_' in self.__dict__: return self.__dict__['_x_'] else: self.__dict__.setdefault('_x_', self._compute_x_()) else: return self._compute_x_()
[docs] def set_x_(self, x_): """Set the vector x_ directly.""" self.suspend() # Don't recompute until all parameters are set. for (v, cmplx) in self._packing_profile['vars']: offset, n = self._packing_profile['indices'][v] pack = getattr(self, self._packing_profile['packers'][v]) if cmplx: val = x_[offset:offset+n] + 1j*x_[offset+n:offset+2*n] else: val = x_[offset:offset+n] pack(v, val) self.resume() # Okay, now reinitialize if self.use_cache: self.__dict__['_x_'][::] = x_[::]
x_ = property(get_x_, set_x_, doc="""`x_` is no longer required because data is specified by parameters. We still need to access it though.""")
[docs] def indices(self, parameter): """Return `(offset, n)` where `self.x_[np.arange(offset, offset+n)]` are the values corresponding to `parameter` in the array :attr:`x_`.""" return self._packing_profile['indices'][parameter]
def _pack_array(self, name, val): """Set property `name` of `self` from `val` assuming it is an array. If `len(val) == len(name)`, then the item is mutated, otherwise it is copied.""" a = getattr(self, name) try: mutate = (len(a) == len(val)) except: mutate = False if mutate: try: a.flat = val except AttributeError: # Allows one to use lists. a[:] = val else: setattr(self, name, deepcopy(val)) def _pack_num(self, name, val): """Set property name of self from val as a scalar.""" setattr(self, name, val[0]) def _define_packing(self): """This defines the packing structure based on the definition of :attr:`parameters`. It should be called whenever :attr:`parameters` is changed.""" # By this point, all of the parameter values must have # been set, so we can construct the packing profile. vars = [] packers = {} indices = {} cmplx = {} offset = 0 for v in self.parameters: val = getattr(self, v) try: len(val) packers[v] = '_pack_array' except TypeError: packers[v] = '_pack_num' val = np.ravel(val) cmplx = (self.force_real and np.iscomplexobj(val)) vars.append((v, cmplx)) n = len(val) indices[v] = (offset, n) offset += n if cmplx: offset += n self._packing_profile = dict(vars=vars, packers=packers, indices=indices) def _post_hook__new__(self, *varargin, **kwargs): """We break the default :class:`mmf.objects.StateVars` behaviour a bit here because we allow the user to specify :attr:`x_` via the named parameters upon construction. We do this here so that these calculation are only performed upon construction rather than when recalculating. """ if hasattr(self, 'parameters'): self._define_packing() return self def __getitem__(self, name): r"""Allows the state to be used as a dictionary.""" return getattr(self, name) def __setitem__(self, name, value): r"""Allows the state to be used as a dictionary.""" return setattr(self, name, value) def __delitem__(self, name): r"""Allows the state to be used as a dictionary.""" raise NotImplementedError("Cannot delete State items: %s" % (repr(name),))
[docs] def keys(self): return [v for v in self._vars if v not in set(State._vars)]
[docs]class State_x(State): """Another default implementation of :class:`IState` with parameters. This is the same as :class:`State` except that it allows parameters with names that end with `_x` to be accessed or set as functions via the name without the `_x` using :attr:`fn_rep` -- which must implement :class:`IFnRep` -- to convert between the two. In order to properly implement assignment, the underlying data is always mutated (thus, even if a parameter is set using a function, this function is converted to a discrete representation in the basis for storage.) .. note:: There is preliminary support for a change of basis through :meth:`_change_fn_rep`. This is called by :meth:`copy_from` if the :attr:`fn_rep` changes. Note that *all* attributes who's name ends in `'_x'` are transformed to the new basis, not just those listed in :attr:`parameters`. Examples -------- >>> from slda import fn_rep >>> fn_rep = fn_rep.FnRep(x=np.array([0, 1, 4])) >>> class State1(State_x): ... _state_vars = [ ... 'a', 'b_x', 'c_x', ... ('parameters', ClassVar(['a', 'b_x', 'c_x'])), ... ('force_real', ClassVar(True)), ... ('fn_rep', ClassVar(fn_rep))] ... process_vars() >>> s = State1(a=1, b_x=[0.0, 0.1, 0.2], c=np.sqrt) >>> s.x_ # doctest: +NORMALIZE_WHITESPACE array([ 1. , 0. , 0.1, 0.2, 0. , 1. , 2. ]) """ _state_vars = [ ('fn_rep', Required, \ """Object for converting between functions and data. Must implement the :class:`IFnRep` interface."""), ] process_vars()
[docs] def __init__(self, *v, **kw): State.__init__(self, *v, **kw)
def __getattr__(self, name): """Provide access to functions for state variables applied to basis points.""" try: ans = State.__getattribute__(self, name) except AttributeError: ans = None if ans is None: try: ans = State.__getattribute__(self, 'fn_rep').function( State.__getattribute__(self, name + "_x")) except AttributeError: ans = None if ans is None: raise AttributeError("%s object has no attribute %s" % ( repr(self.__class__), repr(name))) return ans def __setattr__(self, name, value): try: State.__setattr__(self, name, value) except AttributeError, err: if name + "_x" in self._vars: State.__setattr__(self, name + "_x", value(self.fn_rep.x)) else: err.message = "Cannot set %s object attribute %s" % ( repr(self.__class__), repr(name)) raise def _pre_hook__new__(self, varargin, kwargs): """We break the default :class:`mmf.objects.StateVars` behaviour a bit here because we allow the user to provide arguments as functions without the '_x'. """ # First make sure that :attr:`fn_rep` is set. if 'fn_rep' in kwargs: setattr(self, 'fn_rep', kwargs.pop('fn_rep')) # Now set all attributes without `_x` if the attribute with an # appended `_x` exists. for k in kwargs.keys(): k_x = "_".join([k, "x"]) if k_x in self._vars and k not in self._vars: setattr(self, k, kwargs.pop(k)) return self
[docs] def copy_from(self, *v, **kwargs): r"""Return a copy:: state.copy_from(a=..., b=...) state.copy_from(state, a=..., b=...) This allows one to convert from two slightly incompatible forms. See :class:`State_x` for an example. This should be a deep copy of the object: no data should be held in common. Parameters ---------- *v : IState , optional State to copy. If not provided, then uses `state=self`. **kwargs : dict, optional Any additional parameters should be used to initialize those values. This version checks to see if :attr:`fn_rep` is the same in both states. If not, then the representation is changed using :attr:`fn_rep``.function`. """ new_state = State.copy_from(self, *v, **kwargs) if ((new_state.fn_rep.__class__ is not self.fn_rep.__class__) or str(new_state.fn_rep.items()) != str(self.fn_rep.items())): new_state._change_fn_rep(self.fn_rep) return new_state
def _change_fn_rep(self, fn_rep): """Change `fn_rep`, implementing a change of basis and applying this to :attr:`solver_data` if it supports it, otherwise obliterating it. This is called by the :meth:`__init__` when copy-constructing from a state with a different :attr:`fn_rep`. >>> from slda import fn_rep >>> fr1 = fn_rep.FnRep(x=np.array([0, 1, 4])) >>> fr2 = fn_rep.FnRep(x=np.array([0, 1, 2, 3, 4])) >>> class State1(State_x): ... _state_vars = [ ... 'a', 'b_x', 'c_x', 'd_x', 'e_x', 'fn_rep', ... ('parameters', ClassVar(['a', 'b_x', 'c_x', 'd_x'])), ... ('force_real', ClassVar(True))] ... process_vars() >>> s1 = State1(a=1, b_x=[0, 0.1, 0.2], c=np.sqrt, d=np.sin, ... e_x=fr1.x*0 + 1, fn_rep=fr1) >>> s2 = State1(a=1, b_x=[0, 0.1, 0, 0, 0.2], c=np.sqrt, d=np.sin, ... e_x=fr2.x*0 + 2, fn_rep=fr2) >>> s3 = s2.copy_from(s1) >>> s2._packing_profile == s3._packing_profile True >>> np.allclose(s3.e_x, s3.fn_rep.x*0 + 1.0) True """ # pylint: disable-msg=E0203 n0 = len(self.fn_rep.x) n1 = len(fn_rep.x) if False: n0 = len(fn_rep.x) I0 = np.eye(len(self.fn_rep.x)) I1 = np.eye(len(fn_rep.x)) f_mat = np.vstack([self.fn_rep.function(I0[n, :])(fn_rep.x) for n in xrange(n0)]) f_inv_t = np.vstack([fn_rep.function(I1[n, :])(self.fn_rep.x) for n in xrange(n1)]).T else: def f_x(fx): r"""We perform a manual vectorization over any extra indices in `fx` here. This is not efficient if there are many.""" fx = np.asarray(fx) shape = fx.shape if 1 < len(shape): fx = fx.reshape((shape[0], np.prod(shape[1:]))) res = np.vstack([ self.fn_rep.function(fx[:,n])(fn_rep.x) for n in xrange(fx.shape[1])]) res = res.T.reshape((res.shape[1],) + shape[1:]) else: res = self.fn_rep.function(fx)(fn_rep.x) return res # List of either numbers n (representing that n variables # should not be transformed) or instances of f_x for each # block that should be transformed. transformers = [] vars_to_transform = set(_v for _v in self._vars if _v.endswith('_x') and hasattr(self, _v) # could be NotImplemented ) # Compute transformation matrix, and update _packing_profile. shift = 0 total_shift = 0 for (_v, cmplx) in self._packing_profile['vars']: offset, n = self._packing_profile['indices'][_v] if _v.endswith('_x'): vars_to_transform.remove(_v) assert n == n0 n = n1 if cmplx: transformers.append(f_x) transformers.append(f_x) shift = 2*(n1 - n0) else: transformers.append(f_x) shift = n1 - n0 else: if cmplx: transformers.append(2*n) else: transformers.append(n) self._packing_profile['indices'][_v] = \ (offset + total_shift, n) total_shift += shift def transform(x): """Effect the transformation.""" y = [] n = 0 for t in transformers: if t is f_x: y.append(t(x[n:n+n0])) n += n0 else: y.append(x[n:n+t]) n += t return np.concatenate(y, axis=0) self.solver_set_x_(x_=transform(self.x_)) for _v in vars_to_transform: # Process remaining parameters setattr(self, _v, f_x(getattr(self, _v))) if hasattr(self, 'solver_data'): if hasattr(self.solver_data, 'change_basis'): self.solver_data.change_basis(transform) else: del self.solver_data self.fn_rep = fn_rep
class State2(State): """Example State class. We need to define this here so that the doctests for State work. """ _state_vars = [ 'a', 'b', 'c', ('parameters', ClassVar(['a', 'b', 'c'])), ('force_real', True), ] process_vars() def indent(s, amount=_INDENT_AMOUNT, fillchar=" "): """Returns string s left indented by specified number of fillchars. >>> s = "1\\n2\\n\\n3" >>> indent(s) ' 1\\n 2\\n \\n 3' >>> indent(s, 2, '.') '..1\\n..2\\n..\\n..3' """ return "\n".join([amount*fillchar + line for line in s.splitlines()])
[docs]def solve(f, x0, solver, store_history=False, **kwargs): """Solve `f(x) = 0` using specified `solver` and return the solution. Parameters ---------- f : function Function to find the root of. x0 : array Initial guess. solver : Solver Instance of a solver to solve the problem. store_history : bool If `True`, then keep a record of the history in the state. kwargs : dict, optional Additional parameters to pass to the Problem instance. Examples -------- >>> from broyden_solvers import Broyden >>> def f(x): ... return x**2 - np.arange(len(x)) >>> solver = Broyden(x_tol=1e-8, G_tol=1e-8) >>> sol = solve(f, x0=[1, 1, 1], solver=solver) >>> np.max(abs(sol.x_**2 - np.array([0, 1, 2]))) < 1e-8 True """ x0 = np.asarray(x0) if x0.shape is (): x0 = x0.ravel() def f(x, _f=f): r"""Wrapper for scalar functions.""" return np.array([_f(x[0])]) class P(Problem): _state_vars = [ ('state0', BareState(x_=1.0*x0), \ "Convert to a float if needed by `1.0*`"), ('has_F', ClassVar(True)), ('has_G', ClassVar(True)), ('has_dx', ClassVar(False)), ] process_vars() def get_initial_state(self): return self.state0.copy_from(store_history=store_history) def G(self, x, compute_dx=False, iter_data=None, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): return (f(x), None) p = P(**kwargs) s = p.get_initial_state() sol, niter = solver.solve(p, s) return sol
[docs]def fixedpoint(f, x0, solver, store_history=False, **kwargs): """Solve `f(x) = x` using specified `solver` and return the solution. Parameters ---------- f : function Function to find the fixedpoint of. x0 : array Initial guess. solver : Solver Instance of a solver to solve the problem. store_history : bool If `True`, then keep a record of the history in the state. kwargs : dict, optional Additional parameters to pass to the Problem instance. Examples -------- >>> from broyden_solvers import Broyden >>> def f(x): ... return x - x**2 + np.arange(len(x)) >>> solver = Broyden(x_tol=_ABS_TOL, G_tol=_ABS_TOL) >>> sol = fixedpoint(f, x0=[1, 1, 1], solver=solver) >>> np.max(abs(sol.x_**2 - np.array([0, 1, 2]))) < _ABS_TOL True """ x0 = np.asarray(x0) if x0.shape is (): x0 = x0.ravel() def f(x, _f=f): r"""Wrapper for scalar functions.""" return np.array([_f(x[0])]) class P(Problem): _state_vars = [ ('state0', BareState(x_=1.0*x0), \ "Convert to a float if needed by `1.0*`")] process_vars() def get_initial_state(self): return self.state0.copy_from(store_history=store_history) def F(self, x, iter_data=None, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS): return f(x) p = P(**kwargs) s = p.get_initial_state() sol, niter = solver.solve(p, s) return sol
st = """ from mmf.objects import * from solver_interface import * class FnRep(StateVars): _state_vars = [('N', 25), ('x', Computed)] process_vars() def __init__(self, *v, **kw): self.x = np.linspace(-1, 1, self.N) def function(self, fx): c1 = np.cosh(1) def f(x): return np.interp(x, self.x.ravel(), fx.ravel()) x_ = np.hstack([[-1], self.x.ravel(), [1]]) fx_ = np.hstack([[c1], fx.ravel(), [c1]]) return np.interp(x, x_, fx_) return f class DEQState(State_x): _state_vars = ['f_x'] process_vars() force_real = True parameters = ['f_x'] class DEQProblem(Problem_x): _state_vars = [('fn_rep', Delegate(FnRep)), ('N=fn_rep.N', 25)] process_vars() def get_initial_state(self): state = DEQState(fn_rep=self.fn_rep, f_x=self.fn_rep.x*0) return state def G(self, x, abs_tol=0, rel_tol=0, mesgs=_MESGS): f = np.asarray(x).ravel() c1 = np.cosh(1) dx = np.diff(self.fn_rep.x[1:]) ddf = np.diff(np.diff(f))/dx/dx a = 1 + 2./dx/dx G0 = np.hstack([[f[0] - c1], f[1:-1] - ddf, [f[-1] - c1]]) G1 = np.hstack([[f[0] - c1], (f[1:-1] - ddf)/a, [f[-1] - c1]]) G = G1 #self._plot_iteration(self.fn_rep.x, f, G) return G def J(self, x): f = x dx = np.diff(self.fn_rep.x[1:]) a = 1 + 2./dx/dx J0 = (np.diag(np.hstack([[1], a, [1]])) - np.diag(np.hstack([1./dx/dx, [0]]), -1) - np.diag(np.hstack([[0], 1./dx/dx]), 1)) J1 = (np.diag(np.hstack([[1], a/a, [1]])) - np.diag(np.hstack([1./dx/dx/a, [0]]), -1) - np.diag(np.hstack([[0], 1./dx/dx/a]), 1)) J = J1 return J def plot_iter(self, x, f, G, plt): plt.plot(x, f) from broyden_solvers import Broyden problem = DEQProblem(N=10) solver0 = Broyden(max_step=10, dyadic=False, verbosity=0, bad_step_factor=10, abs_xtol=0, max_weighted_step=0.001, abs_tol=1e-12, schedule=[({'N':50}, {})]) solver1 = Broyden(solver0) solver1.schedule = [({'N':10}, {}), ({'N':50}, {'reset_Jinv': True})] solver2 = Broyden(solver0) solver2.schedule = [({'N':10}, {}), ({'N':50}, {})] solver3 = Broyden(solver0) solver3.schedule = [({'N':N}, {'abs_tol':1e-1}) for N in xrange(5, 50, 4)] + [({'N':50}, {'abs_tol':1e-12})] initial_state = problem.get_initial_state() s0, i0 = solver0.solve(problem, initial_state) s1, i1 = solver1.solve(problem, initial_state) s2, i2 = solver2.solve(problem, initial_state) s3, i3 = solver3.solve(problem, initial_state) print i0, i1, i2, i3 #del s2.solver_data #p2 = DEQProblem(N=100) #s2, i2 = solver.solve(p2, s2) #bx = s2.solver_data.x0 #for n in reversed(xrange(len(x))): # dx = np.zeros(len(x), dtype=float) # dx[n] = 0.00001 # g = p2.G(x+dx, 0, 0, []) # s2.solver_data.broyden._update_J(G=g, x=x+dx) #print np.max(abs(s2.solver_data.Jinv0 - Ji)) """ """ x = s2.solver_data.x0 for n in reversed(xrange(len(x))): dx = np.zeros(len(x), dtype=float) dx[n] = 0.00001 g = p2.G(x+dx, 0, 0, []) s2.solver_data.broyden._update_J(G=g, x=x+dx) #print np.max(abs(s2.solver_data.Jinv0 - Ji)) """