r"""
===================
Iterative solvers
===================
This module contains the interface for solving root-finding and.or
fixed-point problems of the forms::
0 == G(x)
x == F(x)
These can be related through the relationship `F(x) = x - G(x)`,
however, both forms have their use:
1) The `0 == G(x)` form makes the objective function explicit and may
be useful for specifying tolerances. One also may have an
expression in terms of `G` for calculating a good step (for
example, if the Jacobian can be calculated). Thus, the function
:meth:`Problem.G` returns a pair `(G, dx)` containing a suggested
step (if it can be calculated).
This form also implicitly suggests that a good fixed-point
iteration may not be known (which is why we always include a
suggested step `dx`).
2) The `x == F(x)` form naturally suggests an iteration with `dx = x -
F(x)` and so :meth:`Problem.F` returns only the next step.
If present, this form is used to form the objective function `G(x)
= x - F(x)` by memory-limited quasi-Newton methods such as the
Broyden iteration which stores the inverse Jacobian as a matrix of
the form `B = I + D` where `D` is rank deficient. The ideally
convergent iteration `F(x) = x_sol` (always converging in a single
step) would have $\partial F = 0$, hence `D = 0` and so this is
ideally captured by the rank deficient approximate Jacobian.
These interfaces address the following functionality goals:
- Provide a history of the state of a solution so that the
calculation can be reproduced.
- Allow for interaction with the solver, such as plotting/displaying
intermediate results, prematurely terminating the solver, and
possibly even adjusting the solution during the solution.
- Error recovery and reproduction.
The interface is defined by three main interfaces:
.. autosummary::
:nosignatures:
IState
IProblem
ISolver
The general usage is something like::
problem = Problem(...)
solver = Solver(...)
state, niter = solver.solve(problem)
new_state, niter = solver.solve(problem, state)
# Archive
file = open(filename, 'w')
file.write(new_state.get_history())
file.close
...
# Restore
env = {}
execfile(filename, env)
state = env['state']
Problems and solvers are considered independent with the state
information being stored in the return state object.
:class:`IState`
Specifies :attr:`IState.x_`, the current state of the solution. The
abstraction employed here is that a state may be the result of three
"transformations":
1. Initialization from a :meth:`IProblem.get_initial_state()` call.
2. Initialization from direct assignment of `x`.
3. Transformation from a previous state by a :class:`ISolver` object.
The latter keeps track of the history of the state through the
previous transformations/initializations so that the computation can
be repeated.
:class:`IProblem` Specifies :meth:`IProblem.initial_guess()` which
implicitly defines the state class and problem size. A problem
must implement the :class:`IProblem` interface and specify at least
one of the functions `G(x)` (to be set to zero) or `F(x)` (to find
a fixed-point) by defining :meth:`IProblem.step`
- `state0 =` :meth:`IProblem.get_initial_state()`:
This provides the initial guess `x0` which must implement the
:class:`IState` interface. This also defining the size of the
problem so the solver can allocate resources etc.
- `(x1, G1, dx1, F1) = `:meth:`IProblem.step()`:
Returns the new step `x1`, the objective function `G1 = G(x1)`,
the suggested step `dx1`, and the fixed-point iteration
`F1=F(x1)` based on the step `x` suggested by the solver. If the
problem compute the Newton step ``dx1 = -np.dot(j_inv, G1)`` (as
it will indicate by displaying :attr:`IProblem.newton` = `True`),
then this should also be returned, otherwise, one should set
``dx1 = -G(x)`` as a default step.
The new step `x1` need not be either of the natural steps `x0 +
dx` or `F1` allowing the problem more control, to enforce for
example constraints or prevent the solver from wandering into
unphysical regions of solution space.
:class:`ISolver`
Provides a means for solving a problem. Once initialized, this
provides a method to transform an initial guess (provided either by
a Problem or by another State) into a new State.
.. todo:: Add and test support for a "global" dictionary used to allow the
solver and problem to maintain state during the solution of a
problem. Strictly, the calculation should not depend on this
dictionary, but it can be used to store useful diagnostic
information about the progress of the solution. Also,
the dictionary can be used to maintain flags that control the
iteration, thus allowing the user to alter or affect the iteration
(from another process or thread). This use of a global dictionary
has not been fleshed out yet but the idea is to allow one to use
the :mod:`mmf.async` module for remote control.
To facilitate usage, partial implementations are provided:
.. rubric:: States
.. autosummary::
BareState
State
State_x
.. rubric:: Problems
.. autosummary::
Problem
PlotProblem
Problem_x
.. rubric:: Solvers
.. autosummary::
Solver
As an example and to get started, look at the code for :class:`Scale`
and :class:`State2`.
Tolerances
----------
The general plan for determining if a parameter $x$ is close enough to
$x_0$ is to consider the condition `err <= 1` where::
abs_err = abs(x - x_0)
rel_err = abs_err/(abs(x_0) + _TINY)
err = min(abs_err/(abs_tol + _TINY), rel_err/(rel_tol + _TINY))
This is `True` if either the absolute error is less than `abs_tol` or
the relative error is less than `rel_tol`. This is what we mean below
by $\abs{x - x_0} < \epsilon$.
The real questions are:
1) What to use for $x$ and $x_0$?
There are two forms of solver here: Root finding $G(x) = 0$ and
fixed-point finding $x = F(x)$. In general these are related by $G(x)
= x - F(x)$ but some problems may define both. There is no
reference scale for $G(x)$ as it should be zero, so only the
absolute tolerance :attr:`ISolver.G_tol` makes sense here.
It would be ideal if we could also provide a tolerance on the step
$\abs{x-x_0} < \epsilon_x$, but this is only easy to implement with
bracketing algorithms. Instead, we may provide a heuristic based
on the Jacobian, but in general this is difficult to trust if the
Jacobian is not exact.
2) What to do when there is a vector of parameters?
The actual tolerance criterion used is:
.. math::
\Norm{\frac{\vect{G}(x)}{\vect{G}_{\text{typ}}}}_{p} \leq \epsilon_{G}
where the user can specify both the vector of typical scales
$\vect{G}_{\text{typ}}$ through :attr:`IProblem.G_scale` and the
type of norm can be specified through :attr:`ISolver.norm_p`.
Examples
--------
Another way to use this is when representing functions in a discrete
basis. In this case, one parametrizes the number of points used, and
would like to be able to continue an iterative solution while
increasing the resolution. Here is how it is done.
We will use as an example, the solution of the differential equation
:math:`f''(x) = f(x)` with boundary conditions
:math:`f(-1)=f(1)=\cosh(1)`.
"""
from __future__ import division
from __future__ import with_statement
__docformat__ = "restructuredtext en"
import sys
import time
from copy import copy, deepcopy
import UserDict
import warnings
try:
from warnings import catch_warnings # pylint: disable-msg=E0611
except ImportError:
from contextlib import contextmanager
@contextmanager
def catch_warnings(record=False):
warning_list = []
def catch_showwarning(message, category, filename, lineno,
file=None, line=None):
warning_list.append(
warnings.formatwarning(message, category, filename,
lineno))
try:
old_showwarning = warnings.showwarning
warnings.showwarning = catch_showwarning
yield warning_list
finally:
warnings.showwarning = old_showwarning
import numpy as np
from zope.interface import Interface, Attribute, implements
from mmf.objects import StateVars, Container, process_vars
from mmf.objects import ClassVar, Computed, Required, Excluded
from mmf.objects import Internal, Deleted
import mmf.archive
import mmf.utils
# Module defaults
__all__ = ['IterationError',
'IProblem', 'IPlotProblem', 'IState', 'ISolver',
'ISolverData', 'IJinv',
'Solver', 'SolverData', 'Problem', 'PlotProblem', 'Problem_x',
'BareState', 'State', 'State_x', 'IMesg', 'Mesg',
'StatusMesg',
'solve', 'fixedpoint']
_FINFO = np.finfo(float)
_TINY = _FINFO.tiny # pylint: disable-msg=E1101
_ABS_TOL = 1e-12
_REL_TOL = 1e-12
_ABS_XTOL = 1e-12
_REL_XTOL = 1e-12
_MESSAGE_INDENT = 2
_VERBOSITY = 0
_INDENT_AMOUNT = 4
_DEBUG = False
[docs]class IMesg(Interface):
r"""Interface to a messaging object."""
messages = Attribute("List of `(mesg, level)` pairs.")
show = Attribute(""":class`Container` of booleans controlling
which messages should be displayed.""")
store = Attribute(""":class`Container` of booleans controlling
which messages should be stored.""")
[docs] def new(self):
"""Return a new IMesg instance for displaying sub messages."""
[docs] def msg(self, msg, type):
"""Add `msg` to the list of messages with specified `type`
and display if `type` is `True` in `show`.
Parameters
----------
msg : str
Message text.
type : str, optional
Message type.
"""
[docs] def error(self, msg):
"""Add an error message."""
[docs] def warn(self, msg):
"""Add a warning message."""
[docs] def status(self, msg):
"""Display a status message."""
[docs] def info(self, msg):
"""Add an information message"""
[docs] def iter(self, msg):
"""Display an iteration message."""
[docs] def delete(self):
"""Delete the messenger."""
[docs] def delete_sub(self):
"""Clear all message sub queues."""
[docs] def clear(self):
"""Clear bar and all message subqueues."""
[docs]class Mesg(StateVars):
"""Print the displayed messages immediately. This is the default
implementation.
Examples
--------
>>> mesgs = Mesg()
>>> mesgs.error('An error occurred!')
-Error: An error occurred!
>>> mesgs.status('You are here.')
-Status: You are here.
Info and Warnings are not displayed by default, but the are stored
>>> mesgs.info('3+3 = 6')
>>> mesgs.warn('Hang on!')
>>> mesgs.show
Container(info=False, status=True, warn=False, error=True, iter=False)
>>> mesgs.store
Container(info=True, status=True, warn=True, error=True, iter=False)
Let's turn on warnings:
>>> mesgs.show.warn = True
>>> mesgs.warn('Hang on again!')
-Warning: Hang on again!
and also turn on iteration messages. These are still not stored:
>>> mesgs.show.iter = True
>>> for n in xrange(3):
... mesgs.iter('%i of 3' % n)
-Iter: 0 of 3
-Iter: 1 of 3
-Iter: 2 of 3
Here are all of the messages so far. Note that the both messages
were stored
>>> mesgs.messages # doctest: +NORMALIZE_WHITESPACE
[('-Error: An error occurred!', 'error'),
('-Status: You are here.', 'status'),
('-Info: 3+3 = 6', 'info'),
('-Warning: Hang on!', 'warn'),
('-Warning: Hang on again!', 'warn')]
"""
implements(IMesg)
_state_vars = [
('show', Container(error=True, warn=False, status=True,
iter=False, info=False),
"""The `True` entries should be displayed."""),
('store', Container(error=True, warn=True, status=True,
iter=False, info=True),
"""The `True` entries should be stored in
:attr:`messages`"""),
('messages', [], "List of stored messages")]
process_vars()
def new(self):
return self.__class__(show=self.show, store=self.store)
def msg(self, msg, level='info'):
if level not in self.store:
raise ValueError(
"Message type '%s' not recognized" % (level, ))
if getattr(self.store, level, False):
self.messages.append((msg, level))
if getattr(self.show, level, False):
print(msg)
def error(self, msg):
"""Add an error message with a default level of `0`."""
self.msg("-Error: %s" % (msg,), level='error')
def warn(self, msg):
"""Add a warning message with a default level of `10`."""
self.msg("-Warning: %s" % (msg,), level='warn')
def status(self, msg):
"""Display a status message with a default level of `-1."""
self.msg("-Status: %s" % (msg,), level='status')
def info(self, msg):
"""Display a message about the state of an iteration etc. but
do not log."""
self.msg("-Info: %s" % (msg,), level='info')
def iter(self, msg):
"""Display a message about the state of an iteration etc. but
do not log."""
self.msg("-Iter: %s" % (msg,), level='iter')
def _inactive(self, *v, **kw):
raise ValueError("Messenger is not active.")
def delete(self):
self.__dict__['msg'] = self._inactive
def delete_sub(self):
"""Clear all message sub queues."""
def clear(self):
"""Clear all message subqueues."""
pass
_MESGS = Mesg() # Default object. Should generally not be used.
[docs]class StatusMesg(Mesg):
"""This class acts like a message lists, but prints the current
message in a status-bar without using up screen space.
Use an instance as the default argument for `mesgs`.
"""
"""
Examples
--------
>>> mesgs = StatusMesg()
>>> mesgs.error('An error occurred!')
-Error: An error occurred!
<BLANKLINE>
>>> mesgs.status('You are here.')
\x1b[A-Status: You are here.
<BLANKLINE>
Iterations are displayed in a status bar:
>>> mesgs.show.iter = True
>>> for n in xrange(3):
... mesgs.iter('%i of 3' % n)
-Iter: 0 of 3
\x1b[A\r\x1b[K-Iter: 1 of 3
\x1b[A\r\x1b[K-Iter: 2 of 3
Note that anything that is stored but not displayed also appears
in the status bar as an iteration message.
>>> mesgs.info('3+3 = 6')
\x1b[A\r\x1b[K-Info: 3+3 = 6
Regular messages will appear and remain, but unless you clear the
status bar, the status message will still be displayed (in this
case the last info statement).
>>> mesgs.error('Still showing status bar')
\x1b[A\r\x1b[K\x1b[A-Error: Still showing status bar
<BLANKLINE>
-Info: 3+3 = 6
Here we clear it.
>>> mesgs.clear()
\x1b[A\r\x1b[K
>>> mesgs.error('Status bar gone.')
\x1b[A\r\x1b[K\x1b[A-Error: Status bar gone.
Here are the stored messages:
>>> mesgs.messages # doctest: +NORMALIZE_WHITESPACE
[('-Error: An error occurred!', 'error'),
('-Status: You are here.', 'status'),
('-Info: 3+3 = 6', 'info'),
('-Error: Still showing status bar', 'error'),
('-Error: Status bar gone.', 'error')]
"""
_state_vars = [
('keep_last', True,
r"""If `True`, then the last iteration message will remain
on the display, otherwise it will be deleted."""),
('_sb', Computed,
r"""Reference to the StatusBar instance. When all of these
are deleted, then the output will revert to normal
behaviour"""),
('_bar', Computed, ":class:`mmf.utils.StatusBar` instance."),
('_bars', Computed, "List of all subbars."),
]
process_vars()
def __init__(self, *v, **kw):
Mesg.__init__(self, *v, **kw)
self._sb = mmf.utils.StatusBar()
self._bar = self._sb.new_bar(keep_last=self.keep_last)
self._bars = []
def new(self):
new = self.__class__(show=self.show, store=self.store,
keep_last=self.keep_last)
self._bars.append(new._bar)
return new
def msg(self, msg, level='info'):
"""This prints stored messages and adds displayed messages to
the status bar."""
if level not in self.store:
raise ValueError(
"Message type '%s' not recognized" % (level, ))
if getattr(self.store, level, False):
self.messages.append((msg, level))
if getattr(self.show, level, False):
self._bar.print_(msg)
else:
self._bar.msg(msg)
elif getattr(self.show, level, False):
self._bar.msg(msg)
def delete(self):
"""Delete the status bar. It cannot be used after this"""
self._bar.delete()
self.clear()
def delete_sub(self):
"""Clear all message sub queues."""
while self._bars:
b = self._bars.pop()
b.clear()
b.delete()
def clear(self):
"""Clear all message queues, including self."""
self.delete_sub()
self._bar.clear()
def __del__(self):
del self._sb
[docs]class IterationError(Exception):
"""Iteration has failed for some reason."""
[docs]class IState(Interface):
r"""Interface for the state of a solution.
The state keeps track of:
- The current set of parameters x.
- The history of the state.
One can perform arbitrary manipulations of the state by housing
these in the appropriate "Solver" object which perform these
manipulations.
Usage::
state = State(x_=...)
x = state.x_
x1 = F(x)
state.x_=x1 # Without using a solver
"""
x_ = Attribute(
r"""This is a flat representation of the state of the
object. It should be implemented as a property with get and
set functionality. Setting :attr:`x_` should clear the history
and initialize it with an archived version of :attr:`x_`. We
use the name :attr:`x_` to avoid clashes with meaningful
parameters which might be called `x`.""")
sensitive = Attribute(
r"""List of indices of sensitive parameters. These are
parameters that must be adjusted slowly or only once a degree
of convergence has been achieved. For example, the chemical
potentials in a grand canonical ensemble. Solvers may treat
these specially.""")
solver_data = Attribute(
r"""Solver specific data representing the state of the solver
(for example, gradient info). This may be used by subsequent
solvers if they are related, but is typically ignored.""")
converged = Attribute(
r"""Boolean like object signifying if state is from a converged
calculation.""")
messages = Attribute(
r"""Messages accumulated during the iteration, such as reasons
for termination.""")
store_history = Attribute(
r"""If `True`, then a record of the solver history will be
saved. This can be somewhat slow and is designed for large
scale solver applications where the number of iterations is
small, but the cost of each iteration is large. When using the
solver in inner loops, this should be set to `False` to speed
the evaluation.""")
history = Attribute(
r"""History of the state as a sequence of strings to be executed
in order to reproduce the current state.
`history[0]`:
should be an archive that can be evaluated to define the
variable 'state' (which is the initial state). This is
either the result of a
:meth:`IProblem.get_initial_state()` or a direct
assignment to :attr:`x_`.
`history[1:]`:
should be a series of ``(solver, problem, iterations)``
tuples that specify which solver and problems were used
(and how many iterations) to obtain the n'th state from
the previous state. By specifying the number of
iterations, one allows for calculations that were
interrupted before they reached convergence. (The actual
format is not a requirement for the interface, only that
the function :meth:`IState.get_history()` can return the
appropriate archived string.)
The references to solvers and problems should be kept in a
careful manner: if the solver or problem instances are
modified, then somehow these objects should be archived in the
state prior to the modification so that the representation is
faithful. However, archiving each copy could lead to a history
bloat. This represents a bit of a design problem.""")
[docs] def record_history(self, problem, solver=None, iterations=None):
"""Add problem, solver and iterations to :attr:`history`. If
`solver` and `iterations` are `None`, then record a copy by
`problem`.
This information should be used by :meth:`history` to reproduce the
state.
Should respect :attr:`store_history`.
"""
raise NotImplementedError
[docs] def get_history(self):
"""Return a string that can be evaluated to define the state::
env = {}
exec(self.get_history(), env)
state = env['state']
"""
raise NotImplementedError
[docs] def solver_set_x_(self, x_):
"""This should be called by the solver during iterations in
order to set `x` without resetting the history. As such it
should not directly set `self.x_` which would
trigger reset of the history.
Should respect :attr:`store_history`.
"""
raise NotImplementedError
[docs] def copy_from(self, *v, **kwargs):
r"""Return a copy::
state.copy_from(a=..., b=...)
state.copy_from(state, a=..., b=...)
This allows one to convert from two slightly
incompatible forms. See :class:`State_x` for an example.
This should be a deep copy of the object: no data should be
held in common.
Parameters
----------
*v : IState , optional
State to copy. If not provided, then uses `state=self`.
**kwargs : dict, optional
Any additional parameters should be used to initialize
those values.
"""
# The following is a skeleton:
if 0 == len(v):
state = self
elif 1 == len(v):
state = v[0]
else:
raise ValueError("copy_from accepts only one positional "+
"arg: `state`")
return self.__class__(deepcopy(state), **kwargs)
[docs] def copy_from_x_(self, x_):
"""Return a copy of `self` with a newly initialized `x`.
"""
[docs]class IProblem(Interface):
r"""Interface for an iterative problem: the solver's goal is to
find a solution to the problem ``G(x) = 0``.
The function :meth:`G()`, specified through :meth:`step()`, must
accept a real vector `x` of the same dimension as returned by
:meth:`get_initial_state()` and return the same.
If possible, the problem should be organized so that it converges
on its own. The solvers will try to accelerate this convergence.
"""
globals = Attribute(r""""Global" dictionary for storing
intermediate results. These must not be used for
modifying the calculation, but provide access to
mid-iteration calculational details for user feedback.""")
state0 = Attribute(r"""Dummy instance of the state class used to
encapsulate the solution.""")
has_dx = Attribute(r"""`bool`. If `True`, then the problem can
provide a downhill step. This can be computed by passing the
`compute_dx=True` flag to :meth:`step`.""")
has_F = Attribute(r"""`bool`. If `True`, then the problem can
compute a reasonable fixed-point iteration `F`. This can be
computed by passing the `compute_F=True` flag to
:meth:`step`.""")
has_G = Attribute(r"""`bool`. If `True`, then the problem can
compute an objective function `G` that will be used to
determine convergence. This can be computed by passing the
`compute_G=True` flag to :meth:`step`.""")
G_scale = Attribute(r"""Typical scale for `G(x)`. The scaled vector
`G/G_scale` is used for convergence testing and in some
solvers to scale the approximate Jacobian.""")
x_scale = Attribute(r"""Typical scale for `x`. The scaled vector
`x/x_scale` is used for convergence testing and in some
solvers to scale the approximate Jacobian.""")
j0_diag = Attribute(r"""Some solvers assume that the approximate
Jacobian $\partial\vect{G}/\partial\vect{x} = \diag(j_0)$.
The default implementations will approximate this as
`j0_diag = G_scale/x_scale` but if you know that the function
has a drastically different behaviour, then you should override
this.""")
active = Attribute(
r"""This should be a boolean array of indices such that
`x_[active]` returns an array of all the active parameters.
Note that if `active` is `()`, then all the parameters are
active. In this way, only a subset of the parameters may be
worked on, but other fixed parameters are still maintained.""")
[docs] def get_initial_state(self):
"""Return a state implementing :class:`IState` representing
the initial state for the problem."""
[docs] def solver_update(self, state, p_dict):
"""This is called by the solver to implement the schedule (see
:attr:`ISolver.schedule`). It should update the problem
and `state` according to the parameters in `p_dict`, returning
the updated state. It should be careful not to obliterate the
state history.
Parameters
----------
state : :interface:`IState`
This is the current state object.
p_dict : dict
Dictionary of updated parameters. If the problem is an
instance of :class:`StateVars`, then this could be passed
to :meth:`StateVars.initialize`.
"""
[docs] def step(self, x, x0=None, G0=None, dx0=None, F0=None,
compute_G=False, compute_dx=False, compute_F=False,
extrapolate=None, iter_data=None,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
r"""Return `(x1, G1, dx1, F1)` where
`x1` is the new solution estimate.
Parameters
----------
x : array
Step suggested by the solver. Real 1-d array of the same
length as returned by :meth:`__len__`
x0 : array
Previous step and the point at which `G0`, `dx0`, and `F0`
are evaluated. If this is `None`, then this is the first
step of a new solver cycle.
G0 : array
Objective function `G0(x0)` evaluated at `x0`.
dx0 : array
Suggested step from `x0` based on `G0`.
F0 : array
Fixed-point iteration `F(x0)` evaluated at `x0`.
compute_G, compute_dx, compute_F : bool
These flags are set depending on what is needed. A problem
could always compute `G1`, `dx1`, and `F1` to be safe, but
if these are expensive, then this allows :meth:`step` to compute
only the required values.
extrapolate : function
The extrapolator is a function ``x_new =
extrapolate(x_controls)`` that returns the solver's
extrapolated step based on the specified control
parameters. `x_controls` should be a pair `(inds, vals)`
of indices into the array `x` and values such that the step
should preserve ``x[ind] = c*vals + (1-c)*x0[ins]`` for
some ``0 < c =< 1``. (I.e. this is a convex extrapolation
from `x0` to `x`).
.. note:: Extrapolation typically requires some linear
algebra (such as solving a linear system) on the
submatrix of the Jacobian in the basis spanned by the
elements in `x_controls`. If the problem is large, this
may be prohibitive unless the number of entries in
`x_controls` is sufficiently small.
If the solver contains information about the function `G`
such as gradient information, then the extrapolated state
`x_new` will incorporate this information extrapolating the
step in such a way as to satisfy the controls.
iter_data : object
This is the object returned by :meth:`iter_data`. It is
passed in here so that properties of the current state that
depend on the full iteration can be included here. Note,
this function may be called several times during a single
iteration depending on the solver, whereas
:meth:`iter_data` is only called once.
abs_tol, rel_tol : float
If possible, `G(x)` should be calculated to the specified
absolute and relative tolerances. This feature is used by
some solvers to allow faster computations when far from the
solution.
mesgs : IMesgs
If there are any problems, then an exception should be
raised ans/or a message sent through this.
Returns
-------
x1 : array (real and same shape as `x`).
This is the new solution step. It need not be either of
the natural steps `x0 + dx0` or `F0`, allowing the problem
more control, for example, to enforce constraints or
prevent the solver from wandering into unphysical regions
of solution space.
G1 : array (real and same shape as `x`) or None
Objective function `G(x1)` evaluated at the suggested point
`x1`. Compute if :attr:`has_G` and `compute_G` are both
`True`. This objective function is generally used for
convergence checking.
dx1 : array (real and same shape as `x`) or None
Suggested step from `x1` based on the value of `G1`.
Compute if :attr:`has_dx` and `compute_dx` are both
`True`. If possible, this should be a downhill direction
such as the full Newton step ``dx = -j_inv*G(x1)``.
F1 : array (real and same shape as `x`) or None
Fixed-point function `F(x1)` evaluated at the suggested
point `x1`. Compute if :attr:`has_F` and `compute_F` are
both `True`. This is used as both a step $x \mapsto F(x)$
and for convergence $\norm{x - F(x)}$ testing when no other
objective function is provided.
Notes
-----
This provides a mechanism for the problem to control the
iterations, for example, enforcing constraints. A typical
algorithm would proceed as follows:
1) Inspect `x`. If any small number of parameters needs to be
adjusted to satisfy constraints etc. then these should be
specified in a dictionary `x_controls`.
2) Call `x_new = extrapolate(x_controls)` to get the solvers
suggestion including the controls.
3) Adjust `x_new` if needed.
4) Compute `G1=`, `dx1`, and `F1`.
5) Return `(x_new, G1, dx1, F1)`
Here is some pseudo-code::
x1 = x
if not okay(x1):
<Determine where we need x[i_n] = v_n>
x_controls = ([i_1, i_2, ...], [v_1, v_2, ...])
x1 = extrapolate(x_controls)
if not okay(x1):
<Fix x1 to make it okay>
x1 = ...
G1 = dx1 = F1 = None
if self.has_G and compute_G:
G1 = ...
if self.has_dx and compute_dx:
dx1 = ...
if self.has_F and compute_F:
F1 = ...
return (x1, G1, dx1, F1)
"""
[docs] def iter_data(self, x, x0, G0, dx0, F0, solver, data=None):
r"""Return data from the iteration for plotting etc. This optional
method is called by the solver after each iteration with
the current solution `x`. The object it returns will be
passed to :meth:`plot_iter` as the keyword argument `data`.
This allows the problem to calculate and accumulate results
for plotting or for recording the evolution/convergence
history.
This can be retrieved from `state.solver_data.iter_data`.
.. note:: The problem can set `state.solver_data.iter_data`
directly if it likes during the iteration, but since
solvers might perform processing, one cannot be guaranteed
that this will be in sync with `x` etc. One may do this
for speed if needed though.
Parameters
----------
x : array
Current guess.
x0 : array, None
Previous guess.
G0 : array, None
`G0 = G(x0)`.
dx0 : array, None
Previous recommended step. If :attr:`newton`, then ``dx =
-np.dot(j_inv, G0)`` is the full Newton step.
F0 : array, None
`F0 = F(x0)`.
solver : ISolver
Current solver. This can be queried for convergence
requirements etc.
data : object or None
Previous data object from last iteration. The first
iteration this is `None`. If you modify the `iter_data`
object in :meth:`G` or :meth:`step`, then this modified
object will be passed in here, so be sure to include this
information in the return value. Typically one just
mutates `data` and returns it.
"""
def __len__(self):
"""Return the length of the solution vector."""
[docs] def pre_solve_hook(self, state):
r"""If this is defined, it will be called by the solver before
solving."""
[docs]class IPlotProblem(IProblem):
r"""Generalization of :class:`IProblem` that provide methods for
plotting results during the iterations."""
[docs] def plot_iteration(self, data):
r"""Plot the current iteration.
Parameters
----------
data : object
This is the object returned by :meth:`iter_data`.
"""
[docs]class IJinv(Interface):
r"""Interface of a matrix-like object that represents the inverse
Jacobian.
"""
x = Attribute("""Current point. Should be read only. (Use :meth:`set_point`
to change.)""")
g = Attribute("""Objective at the current point. Should
be read only. (Use :meth:`set_point` to change.)""")
x0 = Attribute("""Point where the Jacobian is valid. Should be read
only. (Use :meth:`update_point` to change.)""")
g0 = Attribute("""Objective at :attr:`x0`. Should
be read only. (Use :meth:`update_point` to change.)""")
def __mul__(self, dg):
r"""Return the matrix multiplication `j_inv*dg`."""
def __rmul__(self, dxt):
r"""Return the matrix multiplication `dxt*j_inv`."""
[docs] def update(self, x0=None, g0=None):
r"""Update the inverse Jacobian so that it is valid at the
point `(x0, g0)` (use :attr:`x` and :attr:`g` by default). Does not
necessarily add `x0` and `g0` (use :meth:`add_point`) or set the current
point (use :meth:`set_point`) but may do either or both depending on the
algorithm.
"""
[docs] def add_point(self, x, g):
r"""Add the point `(x, g)` to the Jacobian information. Does not
necessarily update the Jacobian (use :meth:`update`) but may. Does not
change the current point (use :meth:`set_point`)."""
[docs] def set_point(self, x, g):
r"""Add `x` and `g` and make this the current point `(x, g)` and update
the Jacobian to be valid here.
Return a measure of the incompatibility of the new data with
the current approximation. This need not "add" the point to
the current data set (the functionality provided by
:meth:`add_point`).
"""
[docs] def incompatibility(self, x, g, x0, g0):
r"""Return a dimensionless measure of the compatibility of a step from
`x0` to `x` with the current Jacobian. An incompatibility of
0 means perfectly compatible (this is what should happen if
the function is linear and the Jacobian is correct along the
specified step).
"""
[docs] def reset(self):
r"""Reset the Jacobian to a diagonal form."""
[docs] def get_x(self, x_controls=None):
r"""Return `x`: the Broyden step to get to the root of `G(x)`.
This is the downhill Newton step from the point specified by
(:attr:`x0`, :attr:`g0`) that satisfies the constraints `x_controls`.
Parameters
----------
x_controls : (array, array), optional
Tuple `(inds, vals)` indices and the respective values of the
control parameters.
"""
[docs]class ISolverData(Interface):
"""Interface for solver data objects."""
iter_data = Attribute("""This is the object returned by
:meth:`IProblem.iter_data`.""")
iteration_number = Attribute(
"This is the current iteration number.")
[docs] def new(self, solver_data=None):
r"""Return a new instance of the appropriate `solver_data`. We
allow the possibility of copying data here.
Parameters
----------
solver_data : ISolverData
Previous solver data. Provided if the solution was
computed from a previous solver. If the solver was
compatible data should be used, otherwise a new instance .
"""
[docs] def change_basis(self, f, f_inv_t=None):
"""Use the function `x_new = f(x)` to update the data
corresponding to a change of basis. Raise an
:except:`AttributeError` if the change cannot be implemented
(or don't define this method.)
Parameters
----------
f : function
Change of basis transformation (should be essentially
matrix multiplication, but for large bases one might want
to prevent forming this matrix explicitly.)
f_inv_t : function
Transpose of the inverse of the function. If this is
provided, then it should be used to transform the bra's,
otherwise, use `f`.
"""
[docs]class ISolver(Interface):
"""Interface for solver objects."""
G_tol = Attribute("""Tolerance for each component of
`G(x)/G_scale`. This may be a vector or a scalar.
See also attr:`IProblem.G_scale`. Convergence is assumed if
all components satisfy both this and (:attr:`x_tol` or
:attr:`x_tol_rel`).""")
x_tol = Attribute("""Tolerance for each component of
`x/x_scale`. This may be a vector or a scalar. See also
attr:`IProblem.x_scale`. Convergence is assumed if all
components satisfy both (:attr:`x_tol` or
:attr:`x_tol_rel`) and :attr:`G_tol`.""")
x_tol_rel = Attribute("""Relative tolerance for each component of
`x`. This may be a vector or a scalar. Convergence is assumed if all
components satisfy both (:attr:`x_tol` or
:attr:`x_tol_rel`) and :attr:`G_tol`.""")
norm_p = Attribute("""Norm to use for tolerances
$\norm{\cdot}_{p}$. Passed to the `ord` parameter of
:func:`numpy.linalg.norm`.""")
last_state = Attribute("""The last good state.
This may be used after an exception is raised to obtain
the state that caused the exception or the last good
state before the exception occured (in the case of an
external interrupt for example.
To resume the computation, one should be able to call
:meth:`ISolver.solve()` with this state as an input (i.e. this
state should be complete with the history set etc. To
ensure this, use :meth:`IState.solver_set_x_()`).
""")
schedule = Attribute("""List of pairs of dictionaries `(p_dict,
s_dict)` determining the schedule. The solver the
solution will be run through the standard solver with the
solver and state updated with these arguments at each
stage of the schedule. The problem will be updated
through the :meth:`IProblem.solver_update` to allow the
state to be properly updated and to have its history
modified. For example, one might like to start solving
with low-resolution and low accuracy, slowly increasing
these for example.""")
mesgs = Attribute("""Object implementing the :interface:`IMesgs`
interface. This is used for communicating messages with
the user.""")
def __init__(self, state0):
"""Must provide copy-construction facilities from another
object `state0`."""
[docs] def solve(self, problem, state, iterations=None):
"""Return `(new_state, niter)` where `new_state` is the new
state representing the solution to the specified problem
starting from the specified state.
Requirements:
1) If `iterations` is specified, exactly this number of steps
should be executed.
2) Convergence should check at least the :attr:`G_tol` and
(:attr:`x_tol` or :attr:`x_tol_rel`) criteria using
:attr:IProblem.G_scale` and :attr:`IProblem.x_scale`.
Convergence should occur only if both of these are
satisfied (although other criteria may also be implemented.)
3) The new state should have `state.solver_set_x_()` called to set
the solution and set `state.solver_data`.
4) The argument `state` is not to be mutated unless expressly
advertised, so generally a copy should be made.
5) :meth:`IProblem.iter_data` must be called each iteration.
6) :meth:`IProblem.step` must be used to query the function
values (check the flags :attr:`IProblem.has_F`,
:attr:`IProblem.has_G`, and :attr:`IProblem.has_dx` for
capabilities).
7) The schedule :attr:`schedule` should be followed.
Parameters
----------
problem : IProblem
Instance of the problem to solve.
state : IState
Initial state. (Usually obtained by calling
`problem.get_initial_state()`).
iterations : int
Number of iterations to perform. This is intended to be
used to reproduce a state from a history by executing an
exact number of iterations.
See Also
--------
:meth:`Solver.solve` :
This is the reference implementation. Please follow this
example if you are going to implement this method to make
sure that a step is not forgotten.
"""
[docs] def print_iteration_info(self, iteration_number, state,
start_time, mesg=None):
"""Print iteration information to :interface:`IMesg` object
`mesg` (if not `None`)."""
class IConverged(Interface):
"""Boolean like object that represents convergence information."""
def __non_zero__(self):
"""Return True if converged, otherwise False."""
raise NotImplementedError
############################################################
# Specializations and partial implementations
[docs]class Problem(StateVars):
"""Partial implementation of :class:`IProblem` interface. Only
:attr:`state0` and one of :meth:`G` or :meth:`F` need2 to be
defined (though both can be).
"""
implements(IProblem)
_state_vars = [
('state0', Required,
"""Instance of the state class used to encapsulate
the solution. If :meth:`get_initial_state()` is not
overloaded, then it returns a copy of this.
.. note:: If calculating a valid/reasonable initial state
is costly, then this should be done in a customized
:meth:`get_initial_state` so that initialization is
not slowed down."""),
('globals', {},
r""""Global" dictionary for storing intermediate results.
These must not be used for modifying the calculation, but
provide access to mid-iteration calculational details for
user feedback."""),
('has_G', ClassVar(False),
r"""If the problem has an objective function, then set
this to `True` and return this from :meth:`G` or
:meth:`step`"""),
('has_dx', ClassVar(False),
r"""If the problem can provide a downhill step, then set
this to `True` and return this from :meth:`G` or
:meth:`step`"""),
('has_F', ClassVar(False),
r"""If the problem has a fixed-point iteration, then set
this to `True` and return this from :meth:`F` or
:meth:`step`"""),
('x_scale', 1,
r"""Typical scale for `x. The scaled vector
`x/x_scale` is used for convergence testing.
"""),
('G_scale', ClassVar(1),
r"""Typical scale for `G(x). The scaled vector
`G/G_scale` is used for convergence testing. Ths problem
must compute this based on some user-friendly way of
specifying the scales.
.. note:: See :attr:`G_scale_parameters` for a user
friendly way of setting this that will work
if the state is an instance of :class:`State`. To use
this, redefine this as a :class:`Computed` attribute and call
:meth:`define_G_scale` at the end of the constructor (after
:attr:`state0` has been initialized).
"""),
('G_scale_parameters', {},
r"""Dictionary of parameter names with associated
`G_scales`.
"""),
('active', ClassVar(()),
r"""This should be a boolean array of indices such that
`x_[active]` returns an array of all the active
parameters. The problem should provide some user-friendly
way of specifying which parameters are active and them
populate this.
.. note:: The default version just sets this to `()`.
See :attr:`active_parameters` and :attr:`inactive_parameters` for
defaults that will work if the state is an instance of
:class:`State`. In this case, redefine this as a
:class:`Computed` attribute and call :meth:`define_active` at the
end of the constructor (after :attr:`state0` has been
initialized) to define this.
"""),
('active_parameters', set(),
r"""This should be a set of the names of all the active
parameters. It is used to define :attr:`active`. This
will be updated by :meth:`__init__` to include all active
parameters (any inactive parameters will be subtracted).
See :attr:`active`.
"""),
('inactive_parameters', set(),
r"""This should be a set of the names of all the inactive
parameters (those held fixed). It is used to define
:attr:`active`. This will modify
:attr:`active_parameters`.
See :attr:`active`.
"""),
]
process_vars()
##########################################################
# Required methods: At least one of G or F needs to be defined by
# the subclass.
[docs] def G(self, x, iter_data=None, compute_dx=False,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
r"""Return `(G, dx)` where `G=G(x)` and `x + dx` is the
recommended step.
The solver's goal is to find a solution ``G(x) == 0``.
Parameters
----------
x : array
Real 1-d array of the same length as returned by
:meth:`__len__`
iter_data : object
This is the object returned by :meth:`iter_data`. It is
passed in here so that properties of the current state that
depend on the full iteration can be included here. Note,
this function may be called several times during a single
iteration depending on the solver, whereas
:meth:`iter_data` is only called once.
The solvers will always pass this, but a default should be
provided so users can easily call it.
compute_dx : bool
If :attr:`newton` is `True` and this is `True`, then the
method must return a tuple `(G_x, dx)` where ``dx =
-j_inv*G_x`` is the full Newton step.
abs_tol, rel_tol : float
If possible, `G(x)` should be calculated to the specified
absolute and relative tolerances. This feature is used by
some solvers to allow faster computations when far from the
solution but may be ignored.
mesgs : IMesgs
If there are any problems, then an exception should be
raised or a message notified through this.
Returns
-------
G : array
Result of ``G = G(x)`` (must be a real 1-d array of the same
length as `x`).
dx : array
Suggested step based on `G`. If :attr:`newton`, then this
should be the full Newton step ``dx = -j_inv*G(x)``. If
no reasonable step is known, then this can be `None`.
Raises
------
NotImplementedError
Raised if the subclass does not define :meth:`F`.
Notes
-----
This default version returns `G(x) = x - F(x)` and `dx = F(x)
- x`. This is based on the assumption that $x \mapsto F(x)$
is almost a convergent iteration.
Despite providing both for both `G` and `dx`, this will not
set the flags :attr:`has_G` or :attr:`has_dx` which must be
set by the user if this default is okay.
"""
if self.__class__.F.im_func is Problem.F.im_func:
raise NotImplementedError(
"Class %s defined neither F nor G" %
(self.__class__.__name__, ))
else:
F = self.F(x, iter_data=iter_data,
abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs)
G = x - F
dx = None
if compute_dx:
# Although this is simple, it makes a copy which could
# be expensive if x is large.
dx = -G
return G, dx
[docs] def F(self, x, iter_data=None,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
r"""Return `F = F(x)`.
The solver's goal is to find a solution ``F(x) == x``. Some
solvers assume that the iteration $x \mapsto F(x)$ is almost
convergent.
Parameters
----------
x : array
Real 1-d array of the same length as returned by
:meth:`__len__`
iter_data : object
See :meth:`G`.
abs_tol, rel_tol : float
See :meth:`G`
mesgs : IMesgs
See :meth:`G`.
Returns
-------
F : array
Result of ``F = F(x)`` (must be a real 1-d array of the same
length as `x`).
Raises
------
NotImplementedError
Raised if the subclass does not define :meth:`G`.
Notes
-----
This default version returns `F(x) = x + dx` if :meth:`G`
returns `dx`, otherwise it returns `F(x) = x - G(x)`.
Despite providing for both `F`, this will not set the flags
:attr:`has_F` which must be set by the user if this default is
okay.
"""
if self.__class__.G.im_func is Problem.G.im_func:
raise NotImplementedError(
"Class %s defined neither F nor G" %
(self.__class__.__name__, ))
else:
G, dx = self.G(x, iter_data=iter_data, compute_dx=True,
abs_tol=abs_tol, rel_tol=rel_tol,
mesgs=mesgs)
if dx is None:
F = x - G
else:
F = x + dx
return F
##########################################################
#@property
#def state0(self):
# """Return the State class by extracting it from
# get_initial_state()
# """
# return self.get_initial_state()
[docs] def get_initial_state(self):
"""Return a valid flat vector representing the initial state
for the problem."""
return self.state0.copy_from()
[docs] def iter_data(self, x, x0, G0, dx0, F0, solver, data=None):
r"""Return data from the iteration for plotting etc. This optional
method is called by the solver after each iteration with
the current solution `x`. The object it returns will be
passed to :meth:`plot_iter` as the keyword argument `data`.
This allows the problem to calculate and accumulate results
for plotting or for recording the evolution/convergence
history.
This can be retrieved from `state.solver_data.iter_data`.
Parameters
----------
x : array
Current guess.
x0 : array, None
Previous guess.
G0 : array or tuple or None
`G0 = G(x0)` was the previous step. If :attr:`newton`,
then this is the tuple `(G0, dx0)` where ``dx0 =
-np.dot(j_inv, G0)`` is the full Newton step.
solver : ISolver
Current solver. This can be queried for convergence
requirements etc.
data : object or None
Previous data object from last iteration. The first
iteration this is `None`.
"""
pass
[docs] def step(self, x, x0=None, G0=None, dx0=None, F0=None,
compute_G=False, compute_dx=False, compute_F=False,
extrapolate=None, iter_data=None,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
r"""Return `(x1, G1, dx1, F1)` where `x1` is the new solution
estimate.
Parameters
----------
x : array
Step suggested by the solver. Real 1-d array of the same
length as returned by :meth:`__len__`
x0 : array
Previous step and the point at which `G0`, `dx0`, and `F0`
are evaluated. If this is `None`, then this is the first
step of a new solver cycle.
G0 : array
Objective function `G0(x0)` evaluated at `x0`.
dx0 : array
Suggested step from `x0` based on `G0`.
F0 : array
Fixed-point iteration `F(x0)` evaluated at `x0`.
compute_G, compute_dx, compute_F : bool
These flags are set depending on what is needed. A problem
could always compute `G1`, `dx1`, and `F1` to be safe, but
if these are expensive, then this allows :meth:`step` to compute
only the required values.
extrapolate : function
The extrapolator is a function ``x_new =
extrapolate(x_controls)`` that returns the solver's
extrapolated step based on the specified control
parameters. `x_controls` should be a pair `(inds, vals)`
of indices into the array `x` and values such that the step
should preserve ``x[ind] = c*vals + (1-c)*x0[ins]`` for
some ``0 < c =< 1``. (I.e. this is a convex extrapolation
from `x0` to `x`).
.. note:: Extrapolation typically requires some linear
algebra (such as solving a linear system) on the
submatrix of the Jacobian in the basis spanned by the
elements in `x_controls`. If the problem is large, this
may be prohibitive unless the number of entries in
`x_controls` is sufficiently small.
If the solver contains information about the function `G`
such as gradient information, then the extrapolated state
`x_new` will incorporate this information extrapolating the
step in such a way as to satisfy the controls.
iter_data : object
This is the object returned by :meth:`iter_data`. It is
passed in here so that properties of the current state that
depend on the full iteration can be included here. Note,
this function may be called several times during a single
iteration depending on the solver, whereas
:meth:`iter_data` is only called once.
abs_tol, rel_tol : float
If possible, `G(x)` should be calculated to the specified
absolute and relative tolerances. This feature is used by
some solvers to allow faster computations when far from the
solution.
mesgs : IMesgs
If there are any problems, then an exception should be
raised or a message notified through this.
Returns
-------
x1 : array (real and same shape as `x`).
This is the new solution step. It need not be either of
the natural steps `x0 + dx0` or `F0`, allowing the problem
more control, for example, to enforce constraints or
prevent the solver from wandering into unphysical regions
of solution space.
G1 : array (real and same shape as `x`) or None
Objective function `G(x1)` evaluated at the suggested point
`x1`. Compute if :attr:`has_G` and `compute_G` are both
`True`. This objective function is generally used for
convergence checking.
dx1 : array (real and same shape as `x`) or None
Suggested step from `x1` based on the value of `G1`.
Compute if :attr:`has_dx` and `compute_dx` are both
`True`. If possible, this should be a downhill direction
such as the full Newton step ``dx = -j_inv*G(x1)``.
F1 : array (real and same shape as `x`) or None
Fixed-point function `F(x1)` evaluated at the suggested
point `x1`. Compute if :attr:`has_F` and `compute_F` are
both `True`. This is used as both a step $x \mapsto F(x)$
and for convergence $\norm{x - F(x)}$ testing when no other
objective function is provided.
Notes
-----
This provides a mechanism for the problem to control the
iterations, for example, enforcing constraints. A typical
algorithm would proceed as follows:
1) Inspect `x`. If any small number of parameters needs to be
adjusted to satisfy constraints etc. then these should be
specified in a dictionary `x_controls`.
2) Call `x_new = extrapolate(x_controls)` to get the solvers
suggestion including the controls.
3) Adjust `x_new` if needed.
4) Compute `G1=`, `dx1`, and `F1`.
5) Return `(x_new, G1, dx1, F1)`
Here is some pseudo-code::
x1 = x
if not okay(x1):
<Determine where we need x[i_n] = v_n>
x_controls = ([i_1, i_2, ...], [v_1, v_2, ...])
x1 = extrapolate(x_controls)
if not okay(x1):
<Fix x1 to make it okay>
x1 = ...
G1 = dx1 = F1 = None
if self.has_G and compute_G:
G1 = ...
if self.has_dx and compute_dx:
dx1 = ...
if self.has_F and compute_F:
F1 = ...
return (x1, G1, dx1, F1)
"""
x1 = x
G1 = dx1 = F1 = None
if ((self.has_G and compute_G) or
(self.has_dx and compute_dx)):
G1, dx1 = self.G(x, iter_data=iter_data,
compute_dx=compute_dx,
abs_tol=abs_tol, rel_tol=rel_tol,
mesgs=mesgs)
if self.has_F and compute_F:
if compute_G and (self.__class__.F is Problem.F or
self.__class__.G is Problem.G):
# This is an optimization to prevent two calls to F or
# G. It duplicates the code in the default F and G.
if self.__class__.F is Problem.F and dx1 is not None:
F1 = x1 + dx1
else:
F1 = x1 - G1
else:
F1 = self.F(x, iter_data=iter_data,
abs_tol=abs_tol, rel_tol=rel_tol, mesgs=mesgs)
return (x1, G1, dx1, F1)
[docs] def solver_update(self, state, p_dict):
"""This is called by the solver to implement the schedule (see
:attr:`ISolver.schedule`). It should update the problem
and `state` according to the parameters in `p_dict`, returning
the updated state. It should be careful not to obliterate the
state history.
Parameters
----------
state : IState
This is the current state object.
p_dict : dict
Dictionary of updated parameters. If the problem is an
instance of :class:`StateVars`, then this could be passed
to :meth:`StateVars.initialize`.
"""
self.initialize(**p_dict)
return state
def __len__(self):
"""Return the length of the solution vector."""
return len(self.get_initial_state().x_)
@classmethod
def _post_hook_process_vars(cls, results,
_G=G, _F=F):
"""We set default values of :attr:`has_G` and :attr:`has_F`
based on whether or not the new class redefines :meth:`F` and
:meth:`G`.
Notes
-----
Members of the class block scope are not accessible in
contained blocks so we must pass the functions explicitly as
defaults here.
See Also
--------
:func:`process_vars`
"""
if cls.F.im_func is not _F:
results['has_F'] = True
if cls.G.im_func is not _G:
results['has_G'] = True
[docs] def define_active(self):
r"""Define :attr:`active` from :attr:`active_parameters` and
:attr:`inactive_parameters` using :meth:`State.indices` of
:attr:`state0`.
"""
if 'active' in self._class_vars:
raise AttributeError(
"To use define_active() you must make 'active' Computed:\n" +
" (Add ('active', Computed) to your %s's _state_vars list)"
% (self.__class__.__name__,))
parameters = self.state0.parameters
if not self.active_parameters:
self.active_parameters = set(parameters)
else:
self.active_parameters.intersection_update(parameters)
if (set(parameters) == self.active_parameters and
not self.inactive_parameters):
self.active = ()
else:
self.active_parameters = (self.active_parameters
- self.inactive_parameters)
self.active = np.zeros(len(self.state0.x_), dtype=bool)
for p in self.active_parameters:
offset, n = self.state0.indices(p)
self.active[offset:offset+n] = True
[docs] def define_G_scale(self):
r"""Define :attr:`G_scale` from :attr:`G_scale_parameters`
using :meth:`State.indices` of :attr:`state0`.
"""
if 'G_scale' in self._class_vars:
raise AttributeError(
"To use define_G_scale() you must make G_scale Computed:\n" +
" (Add ('G_scale', Computed) to your %s's _state_vars list)"
% (self.__class__.__name__,))
parameters = self.state0.parameters
if not self.G_scale_parameters:
self.G_scale = 1
else:
self.G_scale = np.ones(self.state0.x_.shape, dtype=float)
for p in parameters:
offset, n = self.state0.indices(p)
if p in self.G_scale_parameters:
self.G_scale[offset:offset+n] = self.G_scale_parameters[p]
else:
self.G_scale[offset:offset+n] = 1
@property
[docs] def j0_diag(self):
r"""Scale of diagonals of Jacobian `dG/dx`.
This default is computed as a property `G_scale/x_scale`.
"""
return np.divide(self.G_scale, self.x_scale)
[docs]class PlotProblem(Problem):
r"""Partial implementation of :class:`IPlotProblem` interface. As
with :class:`Problem`, only :meth:`G` and
:meth:`get_initial_state` need to be defined.
"""
implements(IPlotProblem)
_state_vars = [
('n_prev_plots', 5, \
"""Number of previous plots to show (as light yellow)"""),
('_plot_key', Internal((111, )), \
"""Current key formed from the args to subplot. Used to store
previous plotted points."""),
('_plot_data', Internal({}), \
"""Dictionary with lists of plot arguments for previous
plot data."""),
('_prev_plots', Internal([]), \
"""List of previous plots."""),
('_no_plot', Internal(False),
"""If `True`, then don't draw, just store data."""),
]
process_vars()
##########################################################
# Optional methods: These need to be defined by subclasses.
def plot_iter(self, plt, data):
r"""Called by :meth:`plot_iteration` to plot the current
iteration. If the methods of `plt` are used to plot the
results, then the results will be stored so that the previous
:attr:`n_prev_plots` can also be plotted in light yellow for
comparison.
If this is not desired, then call :mod:`matplotlib` methods
directly. Presently `plt = self`, so inspect the current
object for the methods available to support this feature like
:meth:`plot` and :meth:`contour`.
.. note:: This method should not clear the figure etc. as this
is done by :meth:`plot_iteration`.
Parameters
----------
plt : object
Object with methods like `plot` and `contour` which should
be used to allow for automatic plotting of previous
iterations.
data : object
Object returned by :meth:`iter_data`. This should be used
to compute and store results to be plotted.
"""
pass
def iter_data(self, x, x0, G0, dx0, F0, solver, data=None):
r"""Return data from the iteration for plotting etc. This optional
method is called by the solver after each iteration with
the current solution `x`. The object it returns will be
passed to :meth:`plot_iter` as the keyword argument `data`.
This allows the problem to calculate and accumulate results
for plotting or for recording the evolution/convergence
history.
Parameters
----------
x : array
Current guess.
x0 : array, None
Previous guess.
G0 : array or tuple or None
`G0 = G(x0)` was the previous step. If :attr:`newton`,
then this is the tuple `(G0, dx)` where ``dx =
-np.dot(j_inv, G0)`` is the full Newton step.
solver : ISolver
Current solver. This can be queried for convergence
requirements etc.
data : object or None
Previous data object from last iteration. The first
iteration this is `None`.
"""
return None
def gca(self):
import pylab
return pylab.gca()
def xlabel(self, *v, **kw):
import pylab
if not self._no_plot:
return pylab.xlabel(*v, **kw)
else:
return None
def ylabel(self, *v, **kw):
import pylab
if not self._no_plot:
return pylab.ylabel(*v, **kw)
else:
return None
def title(self, *v, **kw):
import pylab
if not self._no_plot:
return pylab.title(*v, **kw)
else:
return None
def subplot(self, *v):
import pylab
self._plot_key = v
if not self._no_plot:
pylab.subplot(*v)
def plot(self, *v, **kw):
import pylab
if 'no_hist' not in kw:
self._plot_data.setdefault(self._plot_key, []).append(
(pylab.plot, deepcopy(v), kw))
if not self._no_plot:
pylab.plot(*v, **kw)
def semilogy(self, *v, **kw):
import pylab
if 'no_hist' not in kw:
self._plot_data.setdefault(self._plot_key, []).append(
(pylab.semilogy, deepcopy(v), kw))
if not self._no_plot:
pylab.semilogy(*v, **kw)
def contour(self, *v, **kw):
import pylab
if 'no_hist' not in kw:
self._plot_data.setdefault(self._plot_key, []).append(
(pylab.contour, deepcopy(v), kw))
if not self._no_plot:
pylab.contour(*v, **kw)
def plot_iteration(self, x, x0, G0, dx0, F0, problem,
solver_data, trial=False, plot=False, mesgs=_MESGS):
r"""Plot the current iteration. This version calls
:meth:`plot_iter` to plot the current iteration and plots the
previous :attr:`n_prev_plots` iterations in lighter yellow.
Parameters
----------
x : array
New attempted step
x0, G0, dx0, F0 : array
Old step and objective functions etc.
problem : IProblem
solver_data : ISoverData
This will be used to set `solver_data.iter_data`.
trial : bool
If `True`, then the present step will not be coloured in
black (it will be red instead) and it will not be added to
the history. This should be set for trial steps.
plot : bool
Pass the `solver.plot` flag here.
"""
solver_data.iter_data = problem.iter_data(
x=x, x0=x0, G0=G0, dx0=dx0, F0=F0, solver=self,
data=solver_data.iter_data)
# pylint: disable-msg=E1101
if plot and IPlotProblem.providedBy(problem):
try: # Don't fail on plotting!
import pylab
self._prev_plots = getattr(
self, '_prev_plots', [])[-(self.n_prev_plots):]
pylab.ioff()
if trial:
# Save trial data
self._plot_data = {}
self._no_plot = True
try:
self.plot_iter(plt=self,
data=solver_data.iter_data)
except:
raise
finally:
self._no_plot = False
if trial:
c = 'red'
for v in self._plot_data:
pylab.subplot(*v)
for cmd, v_, kw_ in self._plot_data[v]:
if cmd is pylab.contour:
kw_['colors'] = [c]
else:
kw_['color'] = c
kw_['label'] = None
cmd(*v_, **kw_)
else:
pylab.clf()
n_c = self.n_prev_plots
c0 = np.array((1.0, 1.0, 1.0))
c1 = np.array((0.9, 0.9, 0.0))
for n, prev in enumerate(self._prev_plots):
c = (n + 1)/n_c*c1 + (1 - (1 + n)/n_c)*c0
for v in prev:
pylab.subplot(*v)
for cmd, v_, kw_ in prev[v]:
if cmd is pylab.contour:
kw_['colors'] = [c]
else:
kw_['color'] = c
kw_['label'] = None
cmd(*v_, **kw_)
self._plot_data = {}
self.plot_iter(plt=self,
data=solver_data.iter_data)
self._prev_plots.append(self._plot_data)
pylab.ion()
pylab.draw()
except Exception, e:
mesgs.warn("Plotting error %s:" % (str(e), ))
[docs]class Problem_x(PlotProblem):
"""Partial implementation of :class:`IProblem` interface. Same as
:class:`Problem` but with an additional attribute :attr:`fn_rep`
representing a discretization. To be used with :class:`State_x`,
and provides an updated :meth:`solver_update` utilizing this.
"""
_state_vars = [
('fn_rep', Required, \
"""Object for converting between functions and data.
Must implement the :class:`IFnRep` interface."""),
]
process_vars()
def solver_update(self, state, p_dict):
"""This is called by the solver to implement the schedule (see
:attr:`ISolver.schedule`). It should update the problem
and `state` according to the parameters in `p_dict`, returning
the updated state. It should be careful not to obliterate the
state history.
Parameters
----------
state : IState
This is the current state object.
p_dict : dict
Dictionary of updated parameters. If the problem is an
instance of :class:`StateVars`, then this could be passed
to :meth:`StateVars.initialize`.
"""
if id(self.fn_rep) == id(state.fn_rep):
# We need to keep the old fn_rep in the state so it can do
# an update
state.fn_rep = deepcopy(state.fn_rep)
self.initialize(**p_dict)
state._change_fn_rep(self.fn_rep)
return state
class Converged(StateVars):
"""Boolean like object that represents convergence information.
Convergence is True if any condition is met.
>>> bool(Converged(Step=False, Func=False))
False
>>> bool(Converged(Step=True, Func=False))
True
>>> bool(Converged(Step=False, Func=True))
True
>>> bool(Converged(Step=np.bool_(True), Func=True))
True
"""
implements(IConverged)
_state_vars = [
('Step', False, \
"The step size is smaller than specified."),
('Func', False, \
"G(x) is smaller than specified."),
('err', None, \
"Maximum absolute error (for reporting purposes)"),
]
process_vars()
def __nonzero__(self):
"""Return True if converged, otherwise False."""
return bool(self.Step or self.Func)
def __str__(self):
"""Single-line string rep for nice reporting."""
return ("Step: %s, Func: %s, err: %g" %
(self.Step, self. Func, self.err) )
[docs]class SolverData(StateVars):
"""Default implementation of solver data objects."""
implements(ISolverData)
_state_vars = [
('iter_data', None, \
r"""This is the object returned by
:meth:`IProblem.iter_data`."""),
('iteration_number', 0),
('j_inv_G', NotImplemented,
"""An object implementing :interface:`IJinv` that acts like a
matrix approximating the inverse of the Jacobian for `G` at
the current step."""),
('j_inv_F', NotImplemented,
"""An object implementing :interface:`IJinv` that acts like a
matrix approximating the inverse of the Jacobian for `G=x-F` at
the current step."""),
]
process_vars()
[docs] def new(self, solver_data=None, with_self=False):
"""If `type(solver_data)` is a subclass of `type(self), then
return a copy constructed instance of of `solver_data`,
otherwise return a new blank copy.
Parameters
----------
solver_data : ISolverData
with_self : bool
If `True`, then include all the data from `self` with
appropriate entries overwritten from `solver_data`.
Examples
--------
>>> from broyden import JinvBroyden
>>> s0 = SolverData()
>>> s1 = SolverData(j_inv_F=JinvBroyden())
>>> class SD(SolverData):
... _state_vars = [('b', None)]
... process_vars()
>>> s2 = SD()
>>> s3 = s2.new(solver_data=s1)
>>> s3.b = 3
>>> s3.j_inv_F
JinvBroyden()
>>> s4 = s1.new(s3)
>>> s4.j_inv_F
JinvBroyden()
>>> s5 = s3.new(s0)
>>> s5.j_inv_F
Traceback (most recent call last):
...
AttributeError: 'SD' object has no attribute 'j_inv_F'
>>> s5 = s3.new(s0, with_self=True)
>>> s5.j_inv_F
JinvBroyden()
"""
if with_self:
res = self.__class__(self)
else:
res = self.__class__()
if issubclass(type(self), type(solver_data)):
args = dict(solver_data.items())
else:
args = dict([(key, getattr(solver_data, key))
for key in self._vars
if hasattr(solver_data, key)])
res.initialize(**args)
return res
[docs]class Solver(StateVars):
"""Default partial implementation for solver objects.
The user only needs to overload iterate() in order to provide a
useful subclass."""
implements(ISolver)
_state_vars = [
('solver_data0', SolverData(),
"Blank SolverData instance."),
('G_tol', _ABS_TOL, "See :attr:`ISolver.G_tol`"),
('x_tol', _ABS_TOL, "See :attr:`ISolver.x_tol`"),
('x_tol_rel', _REL_TOL, "See :attr:`ISolver.x_tol_rel`"),
('tol_safety_factor', 10, \
""""Increase the tolerances when computing function by this
amount so that roundoff error will not affect derivative
computations etc."""),
('norm_p', None, "See :attr:`ISolver.norm_p`"),
('min_abs_step', _ABS_TOL,
"Fail if step size falls below this"),
('min_rel_step', _REL_TOL,
"Fail if relative step size falls below this"),
('max_iter', np.inf, "Maximum number of iterations."),
('max_time', np.inf, "Maximum time."),
('max_step', np.inf,
r"""To prevent too large steps (mainly to prevent
converging to an undesired solution) the step size of any
component will never be larger than this."""),
('schedule', [],
r"""List of pairs of dictionaries `(fn_rep_dict,
solver_dict)` determining the schedule. The solver
the solution will be run through the standard solver with
the solver and state updated with these arguments at each
stage of the schedule. One might like to start solving
with low-resolution and low accuracy, slowly increasing
these for example."""),
('verbosity', _VERBOSITY, \
"Level of verbosity: 0 means quiet"),
('message_indent', _MESSAGE_INDENT),
('mesgs', Mesg(), "Messenger object"),
('last_state', Excluded, \
"""The last good state.
This may be used after an exception is raised to obtain
the state that caused the exception or the last good
state before the exception occured (in the case of an
external interrupt for example.
To resume the computation, one should be able to call
:meth:`ISolver.solve()` with this state as an input (i.e. this
state should be complete with the history set etc. To
ensure this, use :meth:`IState.solver_set_x_()`).
"""),
('plot', False, \
"""If the problem implements :class:`IPlotProblem` and
this is `True`, then the results are plotted after each
iteration.
"""),
('debug', False,
"""If `True`, then store some debugging information in
:attr:`_debug`."""),
('_debug', Excluded(Container()),
"""Debugging data."""),
('_f_evals', Internal(0),
r"""Total number of function evaluations. Used by
:meth:`solve` and :meth:`problem_step`."""),
]
process_vars()
##########################################################
# Required methods: These need to be defined by subclasses.
[docs] def iterate(self, x, problem, solver_data,
x0=None, G0=None, dx0=None, F0=None,
mesgs=None):
r"""Perform a single iteration and return the triplet
`(x_new, (x, G, dx, F))`.
Mutate `solver_data` if used.
Parameters
----------
x : array
Step suggested by the solver. Real 1-d array of the same
length as returned by :meth:`__len__`
x0 : array
Previous step and the point at which `G0`, `dx0`, and `F0`
are evaluated. If this is `None`, then this is the first
step of a new solver cycle.
G0 : array
Objective function `G0(x0)` evaluated at `x0`.
dx0 : array
Suggested step from `x0` based on `G0`.
F0 : array
Fixed-point iteration `F(x0)` evaluated at `x0`.
problem : IProblem
Problem instance implementing :class:`IProblem`.
solver_data : ISolverData
Solver data instance implementing :class:`ISolverData`.
mesgs : IMesgs
Messaging should be sent through this instance. It will be
a new status bar in the current messaging window.
Returns
-------
x_new : array
Proposed step. The solver guarantees that this will be the
next argument `x` to iterate. This should not have been
tested yet (i.e. `G(x_new)` has not yet been calculated)
and may not be a very good approximation (for example, a
line search may still need to be performed.)
x, G, dx, F : array (real and same shape as `x`).
Objective function, `G(x)`, recommended step `dx`, and
fixed-point objective `F(x)` evaluated at `x`. See
:meth:`IProblem.step` for details.
.. note:: These must have been validated by the problem,
being returned at some point by :meth:`IProblem.step`.
The solver must decide which of these is needed and set the
appropriate `compute_*` flags when calling
:meth:`IProblem.step`.
Raises
------
IterationError
This may be raised if the algorithm is forced to terminate
because a root cannot be found. The current state will be
saved, messages printed, and then the exception raised
again.
Notes
-----
Here is a sample default implementation::
mesgs = self.mesgs.new()
(x1, G1, dx1, F1) = problem.step(
x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
compute_G=True, compute_dx=True, compute_F=True,
extrapolate=lambda _:x,
abs_tol=self.abs_tol, rel_tol=self.rel_tol,
iter_data=solver.iter_data, mesgs=mesgs)
if dx1 is None:
if F1 is None:
# No information is known about the step here! One
# should probably be careful and take a small step
# until one learns about the function.
x_new = x1 - G1
else:
# Assume that x -> F(x) provides a good iteration.
x_new = F1
else:
x_new = x1 + dx1
return (x_new, (x1, G1, dx1, F1))
"""
raise NotImplementedError()
##########################################################
[docs] def print_iteration_info(self, iteration_number, state,
start_time):
"""Print iteration information."""
elapsed_time = time.time() - start_time
message = ("Iteration: %5i Elapsed time: %f Converged: %s"
% (iteration_number,
elapsed_time,
state.converged))
self.mesgs.iter(message)
def _G(self, problem, x, iter_data, mesgs):
"""Simple call to problem.G with current tolerances."""
return problem.G(x, iter_data=iter_data,
abs_tol=self.abs_tol,
rel_tol=self.rel_tol,
mesgs=mesgs)
def _step(self, problem, iter_data, x, x0, G0, dx0, F0,
compute_G, compute_dx, compute_F, mesgs):
r"""Simple call to :meth:`step` with current tolerances and
extrapolate."""
return problem.step(x, x0=x0, G0=G0, dx0=dx0, F0=F0,
compute_G=compute_G,
compute_dx=compute_dx,
compute_F=compute_F,
extrapolate=\
lambda xc:self._extrapolate(x, x0, xc),
iter_data=iter_data,
abs_tol=self.abs_tol,
rel_tol=self.rel_tol,
mesgs=mesgs)
def _extrapolate(self, x, x0, x_controls):
r"""Redefine this to return a corrected step if information
exists."""
x = copy(x)
inds, vals = x_controls
inds = np.asarray(inds, dtype=int)
vals = np.asarray(vals)
x[inds] = vals
return x
[docs] def solve(self, problem, state, iterations=None):
"""Return the new state representing the solution to the
specified problem starting from the specified state. If
iterations is specified, exactly this number of steps should
be executed.
The new state should have :meth:`State.solver_set_x_` called to set
the solution (optional: and :meth:`State.solver_data set`).
Note: state may be mutated.
Parameters
----------
problem : IProblem
Instance of the problem to solve.
state : IState
Initial state. (Usually obtained by calling
`problem.get_initial_state()`).
iterations : int
Number of iterations to perform. This is intended to be
used to reproduce a state from a history by executing an
exact number of iterations.
"""
state = problem.state0.copy_from(state)
self.last_state = state
if self.debug:
self._debug.steps = []
# Preserve clean copies for the the archival process. We
# won't modify the originals
solver_ = deepcopy(self)
problem_ = deepcopy(problem)
if self.schedule:
schedule = self.schedule
else:
schedule = [({}, {})]
x = state.x_
if problem.active is not ():
# There are specific active variables, the
# rest should be held fixed.
inactive = ~problem.active
x_inactive = x[inactive]
start_time = time.time()
messages = {}
self._f_evals = 0
state.solver_data = solver_.solver_data0.new(
getattr(state, 'solver_data', None),
with_self = True)
state.solver_data.iteration_number = 0
try:
if hasattr(problem_, 'pre_solve_hook'):
problem_.pre_solve_hook(state=state)
# Invariant:
# Either iteration_number = 0 and state history is
# valid or calling state.record_history() will make it
# valid. Note if an exception is thrown, the state
# itself may not be valid, but the last history will
# be.
#
# Caveat: if an exception is thrown before
# iteration_number is incremented, then the state
# history may be one step behind.
for (p_dict, s_dict) in schedule:
if p_dict:
state = problem_.solver_update(state, p_dict)
if s_dict:
solver_.initialize(**s_dict)
x = state.x_
x0 = G0 = dx0 = F0 = None
state.converged = False
if iterations:
solver_.max_iter = min(
solver_.max_iter,
iterations - state.solver_data.iteration_number)
failed = False
while (not state.converged
and not failed
and solver_._has_resources(
state,
start_time,
state.solver_data.iteration_number)):
if IPlotProblem.providedBy(problem):
problem.plot_iteration(
x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem,
solver_data=state.solver_data,
plot=self.plot,
trial=False)
mesgs = self.mesgs.new()
try:
#######################################
# Here is the solver step. Don't blink!
#######################################
with catch_warnings(record=True) as ws:
if _DEBUG and x0 is not None:
x0_ = 1*x0; F0_ = 1*F0
if (_DEBUG and
abs(F0 - problem_.F(x0)).max() > 1e-12):
import pdb;pdb.set_trace()
(x_, (x1, G1, dx1, F1)) = solver_.iterate(
x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem_,
solver_data=state.solver_data,
mesgs=mesgs)
if (_DEBUG and
abs(F1 - problem_.F(x1)).max() > 1e-12):
import pdb;pdb.set_trace()
if self.debug:
self._debug.steps.append((x1, G1, dx1, F1))
except IterationError, err:
exc_info = sys.exc_info()
failed = True
reason = err
finally:
# Process messages and delete messenger.
self.mesgs.messages.extend(mesgs.messages)
self.mesgs.delete_sub()
if problem.active is not ():
# We can force x_ to have the correct values,
# but cannot force x1, etc. as this might
# affect active G1, F1 fields. Instead, we
# raise an exception if the solver does not
# respect the active behaviours.
x_[inactive] = x_inactive
if (x1 is not None and not
np.allclose(x1[inactive], x_inactive)):
raise ValueError(
"Inactive parts of x1 changed by solver.")
if (G1 is not None and not
np.allclose(G1[inactive], 0)):
raise ValueError(
"Inactive parts of G1 not zero from solver.")
if (F1 is not None and not
np.allclose(F1[inactive], x_inactive)):
raise ValueError(
"Inactive parts of F1 changed by solver.")
for w in ws:
self.mesgs.warn(str(w))
if failed:
self.mesgs.error("Failed: %r" % str(reason))
else:
# Update Jacobians
if x0 is not None:
if x1 is not None:
# Add new step.
x_update = x1
g_update = G1
f_update = F1
else:
# Initial step, add base point.
x_update = x0
g_update = G0
f_update = F0
if (hasattr(state.solver_data, 'j_inv_G') and
state.solver_data.j_inv_G is not NotImplemented
and g_update is not None):
state.solver_data.j_inv_G.update(
x0=x_update, g0=g_update)
if (hasattr(state.solver_data, 'j_inv_F') and
state.solver_data.j_inv_F
is not NotImplemented
and f_update is not None):
state.solver_data.j_inv_F.update(
x0=x_update, g0=(x_update - f_update))
if self.mesgs.messages:
messages[state.solver_data
.iteration_number] = self.mesgs.messages
self.mesgs.messages = []
state.solver_data.iteration_number += 1
if not failed:
# Originally we had x_=x1 here. Why not x_?
state.solver_set_x_(x_=x_)
try:
state.converged = solver_.converged(
x=x_,
x1=x1, G1=G1, dx1=dx1, F1=F1,
x0=x0, G0=G0, dx0=dx0, F0=F0,
solver_data=state.solver_data,
problem=problem)
except IterationError, err:
exc_info = sys.exc_info()
failed = True
reason = err
# Enforce maximum step size here
dx = x_ - x1
scale = (np.linalg.norm(dx, ord=self.norm_p)
/self.max_step)
if scale > 1:
dx /= scale
x_ = x1 + dx
solver_.print_iteration_info(
state.solver_data.iteration_number,
state=state,
start_time=start_time)
if failed:
raise exc_info[1], None, exc_info[2]
x, x0, G0, dx0, F0 = x_, x1, G1, dx1, F1
except:
raise
finally:
# If there are any exceptions we catch them, set the state
# history so it is complete, and then throw the exception.
self.mesgs.clear()
if 0 < state.solver_data.iteration_number:
state.messages.append(messages)
state.record_history(
problem=problem,
solver=self,
iterations=state.solver_data.iteration_number)
return state, state.solver_data.iteration_number
def _has_resources(self, state, start_time, iteration_number):
"""Return True if there are resources to proceed with the
calculation. (I.e. iterations or time)."""
has_resources = True
if iteration_number >= self.max_iter:
has_resources = False
state.messages.append(
"No more iterations left: max_iter = %i"%self.max_iter)
elif (time.time() - start_time) >= self.max_time:
has_resources = False
state.messages.append(
"No more time left: max_time = %f"%self.max_time)
return has_resources
def _converged(self, x, x1, G1, dx1, F1, x0, G0, dx0, F0,
problem):
"""Return Converged instance specifying if convergence
criteria are met.
This uses the basic :class:`Converged` class.
Parameters
----------
x : array
Suggested step.
x1, G1, dx1, F1 : array
Last point at which `G1 = G(x1)` etc. were evaluated.
x0, G0, dx0, F0 : array
Previous point at which `G0 = G(x0)` etc. were evaluated.
"""
if G1 is None:
if F1 is None:
raise ValueError(
"Problem must define at least one of F or G")
if x1 is None:
# Possible initial step
return False
G1 = x1 - F1
if F1 is None:
F1 = x1 - G1
err_G = np.linalg.norm(G1/(problem.G_scale*self.G_tol + _TINY),
ord=self.norm_p)
if dx1 is None:
if x0 is None or x1 is None:
# Possible initial step.
return False
dx1 = np.abs(x1 - x0)
#dx1 = np.abs(J1*(x1 - x0))/(np.abs(G1 - G0) + _TINY)
err_x_abs = np.abs(dx1/(problem.x_scale*self.x_tol + _TINY))
err_x_rel = np.abs(dx1/(np.abs(x1)*self.x_tol_rel + _TINY))
err_x = np.minimum(err_x_abs, err_x_rel)
converged = Converged()
#converged.Step = err_x.max() <= 1
converged.err = np.minimum(err_x, err_G).max()
converged.err = err_G.max()
converged.Func = err_G <= 1
if converged.Step and not converged.Func:
# Possibly raise a warning that the steps are becoming
# small.
#raise IterationError("Failure: Min step size reached.")
pass
return converged
[docs] def converged(self, x, x1, G1, dx1, F1, x0, G0, dx0, F0,
solver_data, problem):
r"""This should return a :class:`Converged` instance with the
convergence state. You can overload this. The default method
is :meth:`_converged` which does the basis tolerance
checking.
If there is another reason for termination because of an
error, then an :exc:`IterationError` may be raised.
"""
return self._converged(x=x,
x1=x1, G1=G1, dx1=dx1, F1=F1,
x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem)
[docs] def problem_step(self, problem, solver_data,
x, x0=None, G0=None, dx0=None, F0=None,
compute_G=False, compute_dx=False, compute_F=False,
extrapolate=None, iter_data=None,
abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS,
trial=True):
r"""This should be used instead of calling `problem.step()` as
it does the following:
0) Calls `problem.step()`.
1) Adjusts `x`, `G`, and `F` to exclude the inactive
parameters.
2) Calls `problem.plot_iterations` with `trial=True`.
3) Increments :attr:`_f_evals`.
4) Increases the tolerances by :attr:`tol_safety_factor`.
"""
if abs_tol is None:
abs_tol = self.G_tol
abs_tol /= self.tol_safety_factor
rel_tol /= self.tol_safety_factor
inactive = ()
if problem.active is not ():
inactive = np.logical_not(problem.active)
if x0 is None:
x_inactive = x[inactive]
else:
x_inactive = x0[inactive]
x, G, dx, F = problem.step(x=x,
x0=x0, G0=G0, dx0=dx0, F0=F0,
compute_G=compute_G,
compute_dx=compute_dx,
compute_F=compute_F,
extrapolate=extrapolate,
iter_data=solver_data.iter_data,
abs_tol=abs_tol,
rel_tol=rel_tol,
mesgs=mesgs)
if inactive is not ():
x[inactive] = x_inactive
if dx is not None:
dx[inactive] = 0
if G is not None:
G[inactive] = 0
if F is not None:
F[inactive] = x_inactive
self._f_evals += 1
if IPlotProblem.providedBy(problem):
problem.plot_iteration(
x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem,
solver_data=solver_data,
plot=self.plot,
trial=trial)
return x, G, dx, F
[docs] def problem_F(self, problem, solver_data,
x, x0=None, G0=None, dx0=None, F0=None,
extrapolate=None, iter_data=None,
abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS,
trial=True):
r"""This should be used instead of calling `problem.F()` as
it does the following:
0) Calls `problem.F()` or `problem.step()`.
1) Adjusts `F` to fix the inactive parameters.
2) Calls `problem.plot_iterations` with `trial=True`.
3) Increments :attr:`_f_evals`.
4) Increases the tolerances by :attr:`tol_safety_factor`.
"""
if IPlotProblem.providedBy(problem):
problem.plot_iteration(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem,
solver_data=solver_data,
plot=self.plot,
trial=trial)
if abs_tol is None:
abs_tol = self.G_tol
abs_tol /= self.tol_safety_factor
rel_tol /= self.tol_safety_factor
if hasattr(problem, 'F'):
F = problem.F(x=x,
iter_data=solver_data.iter_data,
abs_tol=abs_tol,
rel_tol=rel_tol,
mesgs=mesgs)
else:
_x, _G, _dx, F = problem.step(x=x, x0=x0, G0=G0, dx0=dx0,
F0=F0,
compute_G=False,
compute_dx=False,
compute_F=True,
extrapolate=extrapolate,
iter_data=solver_data.iter_data,
abs_tol=abs_tol,
rel_tol=rel_tol,
mesgs=mesgs)
if problem.active is not ():
inactive = np.logical_not(problem.active)
F[inactive] = x[inactive]
self._f_evals += 1
return F
[docs] def problem_G(self, problem, solver_data,
x, x0=None, G0=None, dx0=None, F0=None,
compute_dx=False,
extrapolate=None, iter_data=None,
abs_tol=None, rel_tol=_REL_TOL, mesgs=_MESGS,
trial=True):
r"""This should be used instead of calling `problem.F()` as
it does the following:
0) Calls `problem.F()` or `problem.step()`.
1) Adjusts `F` to fix the inactive parameters.
2) Calls `problem.plot_iterations` with `trial=True`.
3) Increments :attr:`_f_evals`.
4) Increases the tolerances by :attr:`tol_safety_factor`.
"""
if abs_tol is None:
abs_tol = self.G_tol
abs_tol /= self.tol_safety_factor
rel_tol /= self.tol_safety_factor
if IPlotProblem.providedBy(problem):
problem.plot_iteration(x=x, x0=x0, G0=G0, dx0=dx0, F0=F0,
problem=problem,
solver_data=solver_data,
plot=self.plot,
trial=trial)
if hasattr(problem, 'G'):
G, dx = problem.G(x=x,
iter_data=solver_data.iter_data,
compute_dx=compute_dx,
abs_tol=abs_tol,
rel_tol=rel_tol,
mesgs=mesgs)
else:
_x, G, dx, _F = problem.step(x=x, x0=x0, G0=G0, dx0=dx0,
F0=F0,
compute_G=True,
compute_dx=compute_dx,
compute_F=False,
extrapolate=extrapolate,
iter_data=solver_data.iter_data,
abs_tol=abs_tol,
rel_tol=rel_tol,
mesgs=mesgs)
if problem.active is not ():
inactive = np.logical_not(problem.active)
G[inactive] = 0
if dx is not None:
dx[inactive] = 0
self._f_evals += 1
return G, dx
class Scale(Solver):
"""Example Solver class that multiplies the solution by c.
We need to define this here so that the doctests for State work.
"""
_state_vars = [('c', Required, "Multiplicative factor")]
process_vars()
def solve(self, problem, state, iterations=None):
state = deepcopy(state)
state.solver_set_x_(x_=state.x_*self.c)
state.record_history(solver=self, problem=problem,
iterations=1)
return state, 1
[docs]class BareState(StateVars):
"""Default implementation of a bare :class:`IState`. This only
uses the flat representation of the state provided by :attr:`x_`.
>>> s = BareState(x_=[1, 2, 3])
>>> s # doctest: +ELLIPSIS
BareState(x_=[1, 2, 3])
>>> s1 = BareState(s)
>>> print s1 # doctest: +ELLIPSIS
x_=[1, 2, 3]
converged=False
messages=[]
sensitive=[]
store_history=False
history=[]
Excluded:
_solver_setting_x_=False
>>> class Scale(Solver):
... "Example Solver class that multiplies the solution by c."
... _state_vars = [('c', Required, "Multiplicative factor")]
... process_vars()
... def solve(self, problem, state,
... iterations=None):
... state = deepcopy(state)
... state.solver_set_x_(x_=state.x_*self.c)
... state.record_history(problem=problem, solver=self,
... iterations=1)
... return state, 1
>>> times = Scale(c=2)
>>> state0 = BareState(x_=np.array([1, 2, 3]), store_history=True)
>>> state1, its = times.solve(problem=None, state=state0)
>>> times.c = 3
>>> state2, its = times.solve(problem=None, state=state1)
>>> state2.x_
array([ 6, 12, 18])
>>> s = state2.get_history()
>>> d = {}
>>> exec(s, d)
>>> d['state'].x_
array([ 6, 12, 18])
"""
implements(IState)
_state_vars = [
('x_', Required,
"""This is a flat representation of the state of the
object. It should be implemented as a property with get
and set functionality. Setting :attr:`x_` should clear the
history and initialize it with an archived version of :attr:`x_`.
We use the name :attr:`x_` to avoid clashes with meaningful
parameters which might be called ``x``."""),
('solver_data', NotImplemented, \
"""Solver specific data representing the state of the
solver (for example, gradient info). This may be used by
subsequent solvers if they are related, but is typically
ignored."""),
('converged', False, \
"""Boolean like object signifying if state is from a
converged calculation."""),
('messages', [], \
"""Messages accumulated during the iteration, such as
reasons for termination."""),
('sensitive', [], \
"""List of indices of sensitive parameters. These are
parameters that must be adjusted slowly or only once a
degree of convergence has been achieved. For example,
the chemical potentials in a grand canonical ensemble.
Solvers may treat these specially."""),
('store_history', False,
"""If `True`, then a record of the solver history will be
saved. This can be somewhat slow and is designed for
large scale solver applications where the number of
iterations is small, but the cost of each iteration is
large. When using the solver in inner loops, this should
be set to `False` to speed the evaluation.
This is typically set by the solver when
:meth:`ISolver.solve` is called, so the default here is
`False` for efficiency."""),
('history', [], \
"""History of the state as a sequence of strings to be
executed in order to reproduce the current state.
``history[0]``:
should be an archive that can be evaluated to define
the variable 'state' (which is the initial state). This
is either the result of a
:meth:`IProblem.get_initial_state()` or a direct
assignment to :attr:`x_`.
``history[1:]``:
should be a series of ``(solver, problem, iterations)``
tuples that specify which solver and problems were used
(and how many iterations) to obtain the n'th state from
the previous state. By specifying the number of
iterations, one allows for calculations that were
interrupted before they reached convergence. (The
actual format is not a requirement for the interface,
only that the function :meth:`IState.get_history()` can
return the appropriate archived string.)
The references to solvers and problems should be kept in
a careful manner: if the solver or problem instances are
modified, then somehow these objects should be archived
in the state prior to the modification so that the
representation is faithful. However, archiving each copy
could lead to a history bloat. This represents a bit of
a design problem."""),
('_solver_setting_x_', Excluded(False), \
"""Flag that a solver is setting x, and hence not to
reset history"""),
]
process_vars()
[docs] def __init__(self, *varargin, **kwargs):
"""This version of BareState is implemented as a
:class:`mmf.objects.StateVars` subclass, thus, __init__ is
called each time `self.x_` is directly set. We use this
functionality to reset the history.
Thus:
***Subclasses must call State.__init__() explicitly, or else
reproduce this functionality.***
"""
if self.store_history and 'x_' in kwargs:
self._x_set()
def _reset(self):
"""Reset the state, clearing out solver_data, history etc."""
try:
del self.solver_data
except AttributeError:
pass
self.converged = False
self.history = []
self.messages = []
def _x_set(self):
"""Record the setting of x_ in history."""
if self.store_history and not self._solver_setting_x_:
self._reset()
self.history.append(self.archive('state'))
[docs] def record_history(self, problem, solver=None, iterations=None):
"""Add problem, solver and iterations to self.history.
This information should be used by history() to reproduce the
state.
"""
if self.store_history:
a = mmf.archive.Archive()
if solver:
a.insert(problem=problem, solver=solver)
self.history.append((str(a), iterations))
else:
a.insert(problem=problem)
self.history.append((str(a), ))
[docs] def get_history(self):
"""Return a string that can be evaluated to define the state::
env = {}
exec(self.get_history(), env)
state = env['state']
Raises
------
ValueError
If there is no history (i.e. if :attr:`store_history` is `False`).
"""
if not self.store_history:
raise ValueError("get_history() called on a state with " +
"store_history=False")
a = mmf.archive.Archive()
a.insert(history=self.history)
hist_lines = [self.history[0]]
for a_i in self.history[1:]:
if 2 == len(a_i):
archive, iterations = a_i
hist_lines.append(archive)
hist_lines.append("state, niter = solver.solve("
"problem=problem, "
"state=state, "
"iterations=%i)"%iterations)
else:
archive = a_i[0]
hist_lines.append(archive)
hist_lines.append("state = problem.state0.copy_from(state)")
return "\n".join(hist_lines)
[docs] def solver_set_x_(self, x_):
"""This should set x and include an entry in the history log
indicating that the present values were the result of a call
to (or could be reproduced by a call to)
`solver.solve(problem, self, iterations)`.
This must be careful not to directly set `self.x_` which would
trigger reset of the history.
"""
self._solver_setting_x_ = True
self.x_ = x_
self._solver_setting_x_ = False
[docs] def copy_from(self, *v, **kwargs):
r"""Return a copy::
state.copy_from(a=..., b=...)
state.copy_from(state, a=..., b=...)
This allows one to convert from two slightly
incompatible forms. See :class:`State_x` for an example.
This should be a deep copy of the object: no data should be
held in common.
Parameters
----------
*v : IState , optional
State to copy. If not provided, then uses `state=self`.
**kwargs : dict, optional
Any additional parameters should be used to initialize
those values.
Notes
-----
This version just uses the copy constructor.
"""
if 0 == len(v):
state = self
elif 1 == len(v):
state = v[0]
else:
raise ValueError("copy_from accepts only one positional "+
"arg: `state`")
return self.__class__(deepcopy(state), **kwargs)
[docs] def copy_from_x_(self, x_):
"""Return a copy of `self` initialized from `x`.
"""
new = deepcopy(self)
new.x_ = copy(x_)
return new
[docs]class State(BareState, UserDict.DictMixin):
"""Default implementation of :class:`IState` with parameters.
Here :attr:`x_` is defined through a list of parameters that
provide specialized access to the state vector :attr:`x_`. To do
this, the user must define :attr:`parameters` which is a list of
the parameter names.
These parameters are stored independently and the actual :attr:`x_` is
assembled as needed from these (a cache may be used to speed this up if
performance is a problem).
The vector :attr:`x_` will be defined by packing these together in the
order defined by the parameter_names list, where each element is
converted to a one-dimensional array sequence using the following
rules:\n
1) If the object has a method called 'ravel' then this is used,
2) otherwise :func:`numpy.ravel()` is used.
3) If :attr:`force_real` then any complex parameters are flattened
with the real array first followed by the imaginary array.
The default or initial values supplied define the shape of the
vector :attr:`x_`.
.. note:: In order for the setting of attributes to work properly,
"array" attributes (i.e. those having a length) must return a
reference so that they can be assigned efficiently without
reallocation using the `flat` attribute or using slicing.
(In particular, they must not return a copy).
Examples
--------
>>> class State2(State):
... _state_vars = [
... 'a', 'b', 'c',
... ('parameters', ClassVar(['a', 'b', 'c'])),
... ('force_real', ClassVar(True))]
... process_vars()
>>> s = State2(a=1, b=[2.1, 2.2], c=3+3j)
>>> s.x_ # doctest: +NORMALIZE_WHITESPACE
array([ 1. , 2.1, 2.2, 3. , 3. ])
Note that we can define parameters dynamically if required:
>>> class State3(State):
... _ignore_name_clash = ['parameters']
... _state_vars = ['a', 'b', 'c',
... ('parameters', Required),
... ('force_real', True)]
... process_vars()
>>> s = State3(a=1, b=[2.1, 2.2], c=3+3j,
... parameters=['a', 'b', 'c'])
>>> s.x_ # doctest: +NORMALIZE_WHITESPACE
array([ 1. , 2.1, 2.2, 3. , 3. ])
>>> s.parameters = ['a', 'b']
>>> s.x_ # doctest: +NORMALIZE_WHITESPACE
array([ 1. , 2.1, 2.2])
>>> class Scale(Solver):
... "Example Solver class that multiplies the solution by c."
... _state_vars = [('c', Required, "Multiplicative factor")]
... process_vars()
... def solve(self, problem, state, iterations=None):
... state = deepcopy(state)
... state.solver_set_x_(x_=state.x_*self.c),
... state.record_history(solver=self, problem=problem,
... iterations=1)
... return state, 1
>>> times = Scale(c=2)
>>> state0 = State2(a=1, b=[2, 3], c=4+5j, store_history=True)
>>> state1, niter = times.solve(problem=None, state=state0)
>>> times.c = 3
>>> state2, niter = times.solve(problem=None, state=state1)
>>> [state2.a, state2.b, state2.c]
[6.0, [12.0, 18.0], (24+30j)]
>>> state2.x_ # doctest: +NORMALIZE_WHITESPACE
array([ 6., 12., 18., 24., 30.])
>>> s = state2.get_history()
>>> d = {}
>>> exec(s, d)
>>> d['state'].x_ # doctest: +NORMALIZE_WHITESPACE
array([ 6., 12., 18., 24., 30.])
"""
_state_vars = [
('x_', Deleted, "Now this is a property"),
('_x_', Excluded, "Temporary cache for parameter `x_`"),
('parameters', ClassVar(Required), \
"""Parameter name list. The vector :attr:`x_` is a
sequence of these, flattened. One can directly access the
appropriate part of :attr:`x_` using these attribute
names"""),
('sensitive_parameters', ClassVar([]), \
"""Name list of sensitive parameters."""),
('force_real', True, \
"""Force :attr:`x_` to be real if using
:attr:`parameter_names`. Complex parameters are
flattened, first reals, followed by imaginaries."""),
('sensitive', Computed, \
"""List of indices of sensitive parameters."""),
('_packing_profile', Excluded, \
"""Information used to pack and unpack :attr:`x_` into
the parameters. This is a dictionary with the following
keys:
`'vars'`: list of tuples `(name, cmplx)` where `name`
is the name of the variable (as in
:attr:`parameters`) and `cmplx` is a flag
indicating whether or not the parameter is complex.
`'packers'`: dict of names informing with method to
use to pack. The keys are the :attr:`parameters`
and the names are strings. Presently
the packers are one of either :meth:`_pack_array`
for arrays or :meth:`_pack_num` for numbers.
`'indices'`: dict of tuples `(offset, n)` where there
`n` parameters start at offset `offset` in
:attr:`_x`. If the parameter is complex, then
there are `2n` data-points, first the real values,
and then the imaginary values (if
:attr:`force_real` is `True`).
"""),
('use_cache', False, \
r"""If `True`, then use a cache to speed the forming of :attr:`x_`
when requested. If `False`, then :attr:`x_` will be assembled each
time. The default is `False` because that is safer (there may be
ways of changing the object so that this gets out of sync."""),
]
process_vars()
def __str__(self):
"""Return a pretty string showing the state."""
lines = [" %s = %r" % (k, getattr(self, k))
for k in (self.parameters + ['converged'])]
return "\n".join(lines)
[docs] def __init__(self, *v, **kw):
"""This version of State is implemented as a
:class:`mmf.objects.StateVars` subclass, thus,
:meth:`__init__` is called each time :attr:`x_` is directly
set. We use this functionality to reset the history.
Thus:
***Subclasses must call State.__init__() explicitly, or else
reproduce this functionality.***
"""
if (1 == len(v)
and hasattr(v[0], 'items')):
# Called as a copy constructor. State has been copied from
# other by __new__. You do not need to copy the
# attributes here, but may need to copy calculated
# parameters.
other = v[0]
if self.use_cache:
self.__dict__['_x_'] = other.x_
self.sensitive = other.sensitive
for k in kw:
# Set any auxiliary values. These override x_
setattr(self, k, kw[k])
elif 0 < len(v):
# Another argument passed without kw. Error here.
raise ValueError(
"State accepts only keyword arguments.")
else:
if 'parameters' in kw or (set(self.parameters) - set(kw)):
self._define_packing()
if self.use_cache:
self.__dict__['_x_'] = self._compute_x_()
self.sensitive = []
for v in self.sensitive_parameters:
if v in self.parameters:
offset, n = self._packing_profile['indices'][v]
self.sensitive.extend(np.arange(offset,
offset + n))
if 0 < len(set(self.parameters).intersection(kw)):
# Some parameter being set.
if self.use_cache:
self.__dict__['_x_'] = self._compute_x_()
if self.store_history:
self._x_set()
if 'converged' in kw:
# Keep convergence information
self.converged = kw['converged']
def _compute_x_(self):
"""Compute and return the vector representation of :attr:`x_`. Called
in `__init__` to ensure that `self.x_` is up to date.
"""
x_s = []
for (v, cmplx) in self._packing_profile['vars']:
val = np.ravel(getattr(self, v))
if cmplx:
x_s.append(val.real)
x_s.append(val.imag)
else:
assert np.isreal(val).all()
x_s.append(np.real(val))
return np.hstack(x_s)
[docs] def get_x_(self):
r"""Return `x_`"""
if self.use_cache:
if '_x_' in self.__dict__:
return self.__dict__['_x_']
else:
self.__dict__.setdefault('_x_', self._compute_x_())
else:
return self._compute_x_()
[docs] def set_x_(self, x_):
"""Set the vector x_ directly."""
self.suspend() # Don't recompute until all parameters are set.
for (v, cmplx) in self._packing_profile['vars']:
offset, n = self._packing_profile['indices'][v]
pack = getattr(self, self._packing_profile['packers'][v])
if cmplx:
val = x_[offset:offset+n] + 1j*x_[offset+n:offset+2*n]
else:
val = x_[offset:offset+n]
pack(v, val)
self.resume() # Okay, now reinitialize
if self.use_cache:
self.__dict__['_x_'][::] = x_[::]
x_ = property(get_x_, set_x_,
doc="""`x_` is no longer required because data is
specified by parameters. We still need to access
it though.""")
[docs] def indices(self, parameter):
"""Return `(offset, n)` where `self.x_[np.arange(offset,
offset+n)]` are the values corresponding to `parameter` in the
array :attr:`x_`."""
return self._packing_profile['indices'][parameter]
def _pack_array(self, name, val):
"""Set property `name` of `self` from `val` assuming it is an
array. If `len(val) == len(name)`, then the item is mutated,
otherwise it is copied."""
a = getattr(self, name)
try:
mutate = (len(a) == len(val))
except:
mutate = False
if mutate:
try:
a.flat = val
except AttributeError:
# Allows one to use lists.
a[:] = val
else:
setattr(self, name, deepcopy(val))
def _pack_num(self, name, val):
"""Set property name of self from val as a scalar."""
setattr(self, name, val[0])
def _define_packing(self):
"""This defines the packing structure based on the definition
of :attr:`parameters`. It should be called whenever
:attr:`parameters` is changed."""
# By this point, all of the parameter values must have
# been set, so we can construct the packing profile.
vars = []
packers = {}
indices = {}
cmplx = {}
offset = 0
for v in self.parameters:
val = getattr(self, v)
try:
len(val)
packers[v] = '_pack_array'
except TypeError:
packers[v] = '_pack_num'
val = np.ravel(val)
cmplx = (self.force_real and np.iscomplexobj(val))
vars.append((v, cmplx))
n = len(val)
indices[v] = (offset, n)
offset += n
if cmplx:
offset += n
self._packing_profile = dict(vars=vars,
packers=packers,
indices=indices)
def _post_hook__new__(self, *varargin, **kwargs):
"""We break the default :class:`mmf.objects.StateVars`
behaviour a bit here because we allow the user to specify
:attr:`x_` via the named parameters upon construction.
We do this here so that these calculation are only performed
upon construction rather than when recalculating.
"""
if hasattr(self, 'parameters'):
self._define_packing()
return self
def __getitem__(self, name):
r"""Allows the state to be used as a dictionary."""
return getattr(self, name)
def __setitem__(self, name, value):
r"""Allows the state to be used as a dictionary."""
return setattr(self, name, value)
def __delitem__(self, name):
r"""Allows the state to be used as a dictionary."""
raise NotImplementedError("Cannot delete State items: %s" %
(repr(name),))
[docs] def keys(self):
return [v for v in self._vars if v not in set(State._vars)]
[docs]class State_x(State):
"""Another default implementation of :class:`IState` with
parameters. This is the same as :class:`State` except that it
allows parameters with names that end with `_x` to be accessed or
set as functions via the name without the `_x` using
:attr:`fn_rep` -- which must implement :class:`IFnRep` -- to
convert between the two.
In order to properly implement assignment, the underlying data is
always mutated (thus, even if a parameter is set using a function,
this function is converted to a discrete representation in the
basis for storage.)
.. note:: There is preliminary support for a change of basis through
:meth:`_change_fn_rep`. This is called by :meth:`copy_from` if the
:attr:`fn_rep` changes. Note that *all* attributes who's name ends in
`'_x'` are transformed to the new basis, not just those listed in
:attr:`parameters`.
Examples
--------
>>> from slda import fn_rep
>>> fn_rep = fn_rep.FnRep(x=np.array([0, 1, 4]))
>>> class State1(State_x):
... _state_vars = [
... 'a', 'b_x', 'c_x',
... ('parameters', ClassVar(['a', 'b_x', 'c_x'])),
... ('force_real', ClassVar(True)),
... ('fn_rep', ClassVar(fn_rep))]
... process_vars()
>>> s = State1(a=1, b_x=[0.0, 0.1, 0.2], c=np.sqrt)
>>> s.x_ # doctest: +NORMALIZE_WHITESPACE
array([ 1. , 0. , 0.1, 0.2, 0. , 1. , 2. ])
"""
_state_vars = [
('fn_rep', Required, \
"""Object for converting between functions and data.
Must implement the :class:`IFnRep` interface."""),
]
process_vars()
[docs] def __init__(self, *v, **kw):
State.__init__(self, *v, **kw)
def __getattr__(self, name):
"""Provide access to functions for state variables applied to
basis points."""
try:
ans = State.__getattribute__(self, name)
except AttributeError:
ans = None
if ans is None:
try:
ans = State.__getattribute__(self,
'fn_rep').function(
State.__getattribute__(self,
name + "_x"))
except AttributeError:
ans = None
if ans is None:
raise AttributeError("%s object has no attribute %s" % (
repr(self.__class__), repr(name)))
return ans
def __setattr__(self, name, value):
try:
State.__setattr__(self, name, value)
except AttributeError, err:
if name + "_x" in self._vars:
State.__setattr__(self, name + "_x",
value(self.fn_rep.x))
else:
err.message = "Cannot set %s object attribute %s" % (
repr(self.__class__), repr(name))
raise
def _pre_hook__new__(self, varargin, kwargs):
"""We break the default :class:`mmf.objects.StateVars`
behaviour a bit here because we allow the user to provide
arguments as functions without the '_x'.
"""
# First make sure that :attr:`fn_rep` is set.
if 'fn_rep' in kwargs:
setattr(self, 'fn_rep', kwargs.pop('fn_rep'))
# Now set all attributes without `_x` if the attribute with an
# appended `_x` exists.
for k in kwargs.keys():
k_x = "_".join([k, "x"])
if k_x in self._vars and k not in self._vars:
setattr(self, k, kwargs.pop(k))
return self
[docs] def copy_from(self, *v, **kwargs):
r"""Return a copy::
state.copy_from(a=..., b=...)
state.copy_from(state, a=..., b=...)
This allows one to convert from two slightly
incompatible forms. See :class:`State_x` for an example.
This should be a deep copy of the object: no data should be
held in common.
Parameters
----------
*v : IState , optional
State to copy. If not provided, then uses `state=self`.
**kwargs : dict, optional
Any additional parameters should be used to initialize
those values.
This version checks to see if :attr:`fn_rep` is the same in
both states. If not, then the representation is changed using
:attr:`fn_rep``.function`.
"""
new_state = State.copy_from(self, *v, **kwargs)
if ((new_state.fn_rep.__class__ is not self.fn_rep.__class__)
or str(new_state.fn_rep.items()) != str(self.fn_rep.items())):
new_state._change_fn_rep(self.fn_rep)
return new_state
def _change_fn_rep(self, fn_rep):
"""Change `fn_rep`, implementing a change of basis and
applying this to :attr:`solver_data` if it supports it,
otherwise obliterating it.
This is called by the :meth:`__init__` when copy-constructing
from a state with a different :attr:`fn_rep`.
>>> from slda import fn_rep
>>> fr1 = fn_rep.FnRep(x=np.array([0, 1, 4]))
>>> fr2 = fn_rep.FnRep(x=np.array([0, 1, 2, 3, 4]))
>>> class State1(State_x):
... _state_vars = [
... 'a', 'b_x', 'c_x', 'd_x', 'e_x', 'fn_rep',
... ('parameters', ClassVar(['a', 'b_x', 'c_x', 'd_x'])),
... ('force_real', ClassVar(True))]
... process_vars()
>>> s1 = State1(a=1, b_x=[0, 0.1, 0.2], c=np.sqrt, d=np.sin,
... e_x=fr1.x*0 + 1, fn_rep=fr1)
>>> s2 = State1(a=1, b_x=[0, 0.1, 0, 0, 0.2], c=np.sqrt, d=np.sin,
... e_x=fr2.x*0 + 2, fn_rep=fr2)
>>> s3 = s2.copy_from(s1)
>>> s2._packing_profile == s3._packing_profile
True
>>> np.allclose(s3.e_x, s3.fn_rep.x*0 + 1.0)
True
"""
# pylint: disable-msg=E0203
n0 = len(self.fn_rep.x)
n1 = len(fn_rep.x)
if False:
n0 = len(fn_rep.x)
I0 = np.eye(len(self.fn_rep.x))
I1 = np.eye(len(fn_rep.x))
f_mat = np.vstack([self.fn_rep.function(I0[n, :])(fn_rep.x)
for n in xrange(n0)])
f_inv_t = np.vstack([fn_rep.function(I1[n, :])(self.fn_rep.x)
for n in xrange(n1)]).T
else:
def f_x(fx):
r"""We perform a manual vectorization over any extra
indices in `fx` here. This is not efficient if there
are many."""
fx = np.asarray(fx)
shape = fx.shape
if 1 < len(shape):
fx = fx.reshape((shape[0], np.prod(shape[1:])))
res = np.vstack([
self.fn_rep.function(fx[:,n])(fn_rep.x)
for n in xrange(fx.shape[1])])
res = res.T.reshape((res.shape[1],) + shape[1:])
else:
res = self.fn_rep.function(fx)(fn_rep.x)
return res
# List of either numbers n (representing that n variables
# should not be transformed) or instances of f_x for each
# block that should be transformed.
transformers = []
vars_to_transform = set(_v for _v in self._vars
if _v.endswith('_x')
and hasattr(self, _v) # could be NotImplemented
)
# Compute transformation matrix, and update _packing_profile.
shift = 0
total_shift = 0
for (_v, cmplx) in self._packing_profile['vars']:
offset, n = self._packing_profile['indices'][_v]
if _v.endswith('_x'):
vars_to_transform.remove(_v)
assert n == n0
n = n1
if cmplx:
transformers.append(f_x)
transformers.append(f_x)
shift = 2*(n1 - n0)
else:
transformers.append(f_x)
shift = n1 - n0
else:
if cmplx:
transformers.append(2*n)
else:
transformers.append(n)
self._packing_profile['indices'][_v] = \
(offset + total_shift, n)
total_shift += shift
def transform(x):
"""Effect the transformation."""
y = []
n = 0
for t in transformers:
if t is f_x:
y.append(t(x[n:n+n0]))
n += n0
else:
y.append(x[n:n+t])
n += t
return np.concatenate(y, axis=0)
self.solver_set_x_(x_=transform(self.x_))
for _v in vars_to_transform:
# Process remaining parameters
setattr(self, _v, f_x(getattr(self, _v)))
if hasattr(self, 'solver_data'):
if hasattr(self.solver_data, 'change_basis'):
self.solver_data.change_basis(transform)
else:
del self.solver_data
self.fn_rep = fn_rep
class State2(State):
"""Example State class.
We need to define this here so that the doctests for State work.
"""
_state_vars = [
'a',
'b',
'c',
('parameters', ClassVar(['a', 'b', 'c'])),
('force_real', True),
]
process_vars()
def indent(s, amount=_INDENT_AMOUNT, fillchar=" "):
"""Returns string s left indented by specified number of
fillchars.
>>> s = "1\\n2\\n\\n3"
>>> indent(s)
' 1\\n 2\\n \\n 3'
>>> indent(s, 2, '.')
'..1\\n..2\\n..\\n..3'
"""
return "\n".join([amount*fillchar + line
for line in s.splitlines()])
[docs]def solve(f, x0, solver, store_history=False, **kwargs):
"""Solve `f(x) = 0` using specified `solver` and return the
solution.
Parameters
----------
f : function
Function to find the root of.
x0 : array
Initial guess.
solver : Solver
Instance of a solver to solve the problem.
store_history : bool
If `True`, then keep a record of the history in the state.
kwargs : dict, optional
Additional parameters to pass to the Problem instance.
Examples
--------
>>> from broyden_solvers import Broyden
>>> def f(x):
... return x**2 - np.arange(len(x))
>>> solver = Broyden(x_tol=1e-8, G_tol=1e-8)
>>> sol = solve(f, x0=[1, 1, 1], solver=solver)
>>> np.max(abs(sol.x_**2 - np.array([0, 1, 2]))) < 1e-8
True
"""
x0 = np.asarray(x0)
if x0.shape is ():
x0 = x0.ravel()
def f(x, _f=f):
r"""Wrapper for scalar functions."""
return np.array([_f(x[0])])
class P(Problem):
_state_vars = [
('state0', BareState(x_=1.0*x0), \
"Convert to a float if needed by `1.0*`"),
('has_F', ClassVar(True)),
('has_G', ClassVar(True)),
('has_dx', ClassVar(False)),
]
process_vars()
def get_initial_state(self):
return self.state0.copy_from(store_history=store_history)
def G(self, x, compute_dx=False, iter_data=None,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
return (f(x), None)
p = P(**kwargs)
s = p.get_initial_state()
sol, niter = solver.solve(p, s)
return sol
[docs]def fixedpoint(f, x0, solver, store_history=False, **kwargs):
"""Solve `f(x) = x` using specified `solver` and return the
solution.
Parameters
----------
f : function
Function to find the fixedpoint of.
x0 : array
Initial guess.
solver : Solver
Instance of a solver to solve the problem.
store_history : bool
If `True`, then keep a record of the history in the state.
kwargs : dict, optional
Additional parameters to pass to the Problem instance.
Examples
--------
>>> from broyden_solvers import Broyden
>>> def f(x):
... return x - x**2 + np.arange(len(x))
>>> solver = Broyden(x_tol=_ABS_TOL, G_tol=_ABS_TOL)
>>> sol = fixedpoint(f, x0=[1, 1, 1], solver=solver)
>>> np.max(abs(sol.x_**2 - np.array([0, 1, 2]))) < _ABS_TOL
True
"""
x0 = np.asarray(x0)
if x0.shape is ():
x0 = x0.ravel()
def f(x, _f=f):
r"""Wrapper for scalar functions."""
return np.array([_f(x[0])])
class P(Problem):
_state_vars = [
('state0', BareState(x_=1.0*x0), \
"Convert to a float if needed by `1.0*`")]
process_vars()
def get_initial_state(self):
return self.state0.copy_from(store_history=store_history)
def F(self, x, iter_data=None,
abs_tol=_ABS_TOL, rel_tol=_REL_TOL, mesgs=_MESGS):
return f(x)
p = P(**kwargs)
s = p.get_initial_state()
sol, niter = solver.solve(p, s)
return sol
st = """
from mmf.objects import *
from solver_interface import *
class FnRep(StateVars):
_state_vars = [('N', 25), ('x', Computed)]
process_vars()
def __init__(self, *v, **kw):
self.x = np.linspace(-1, 1, self.N)
def function(self, fx):
c1 = np.cosh(1)
def f(x):
return np.interp(x, self.x.ravel(), fx.ravel())
x_ = np.hstack([[-1], self.x.ravel(), [1]])
fx_ = np.hstack([[c1], fx.ravel(), [c1]])
return np.interp(x, x_, fx_)
return f
class DEQState(State_x):
_state_vars = ['f_x']
process_vars()
force_real = True
parameters = ['f_x']
class DEQProblem(Problem_x):
_state_vars = [('fn_rep', Delegate(FnRep)),
('N=fn_rep.N', 25)]
process_vars()
def get_initial_state(self):
state = DEQState(fn_rep=self.fn_rep,
f_x=self.fn_rep.x*0)
return state
def G(self, x, abs_tol=0, rel_tol=0, mesgs=_MESGS):
f = np.asarray(x).ravel()
c1 = np.cosh(1)
dx = np.diff(self.fn_rep.x[1:])
ddf = np.diff(np.diff(f))/dx/dx
a = 1 + 2./dx/dx
G0 = np.hstack([[f[0] - c1], f[1:-1] - ddf, [f[-1] - c1]])
G1 = np.hstack([[f[0] - c1], (f[1:-1] - ddf)/a, [f[-1] - c1]])
G = G1
#self._plot_iteration(self.fn_rep.x, f, G)
return G
def J(self, x):
f = x
dx = np.diff(self.fn_rep.x[1:])
a = 1 + 2./dx/dx
J0 = (np.diag(np.hstack([[1], a, [1]]))
- np.diag(np.hstack([1./dx/dx, [0]]), -1)
- np.diag(np.hstack([[0], 1./dx/dx]), 1))
J1 = (np.diag(np.hstack([[1], a/a, [1]]))
- np.diag(np.hstack([1./dx/dx/a, [0]]), -1)
- np.diag(np.hstack([[0], 1./dx/dx/a]), 1))
J = J1
return J
def plot_iter(self, x, f, G, plt):
plt.plot(x, f)
from broyden_solvers import Broyden
problem = DEQProblem(N=10)
solver0 = Broyden(max_step=10, dyadic=False, verbosity=0,
bad_step_factor=10, abs_xtol=0,
max_weighted_step=0.001, abs_tol=1e-12,
schedule=[({'N':50}, {})])
solver1 = Broyden(solver0)
solver1.schedule = [({'N':10}, {}),
({'N':50}, {'reset_Jinv': True})]
solver2 = Broyden(solver0)
solver2.schedule = [({'N':10}, {}),
({'N':50}, {})]
solver3 = Broyden(solver0)
solver3.schedule = [({'N':N}, {'abs_tol':1e-1})
for N in xrange(5, 50, 4)] + [({'N':50}, {'abs_tol':1e-12})]
initial_state = problem.get_initial_state()
s0, i0 = solver0.solve(problem, initial_state)
s1, i1 = solver1.solve(problem, initial_state)
s2, i2 = solver2.solve(problem, initial_state)
s3, i3 = solver3.solve(problem, initial_state)
print i0, i1, i2, i3
#del s2.solver_data
#p2 = DEQProblem(N=100)
#s2, i2 = solver.solve(p2, s2)
#bx = s2.solver_data.x0
#for n in reversed(xrange(len(x))):
# dx = np.zeros(len(x), dtype=float)
# dx[n] = 0.00001
# g = p2.G(x+dx, 0, 0, [])
# s2.solver_data.broyden._update_J(G=g, x=x+dx)
#print np.max(abs(s2.solver_data.Jinv0 - Ji))
"""
"""
x = s2.solver_data.x0
for n in reversed(xrange(len(x))):
dx = np.zeros(len(x), dtype=float)
dx[n] = 0.00001
g = p2.G(x+dx, 0, 0, [])
s2.solver_data.broyden._update_J(G=g, x=x+dx)
#print np.max(abs(s2.solver_data.Jinv0 - Ji))
"""