r"""IMT Integration method (:mod:`mmf.math.integrate.integrate_1d.imt`)
.. currentmodule:: mmf.math.integrate.integrate_1d.imt
.. autosummary::
IMT
"""
from __future__ import division
import warnings
import numpy as np
import scipy as sp
import scipy.integrate
from mmf.objects import StateVars, process_vars, Computed
__all__ = ['IMT']
_finfo = np.finfo(float)
_eps = _finfo.eps
_tiny = _finfo.tiny
[docs]class IMT(StateVars):
r"""Class for performing the integral
.. math::
\int_{a}^{b}\d{x} f(x)
using the IMT method for evaluating integrals over [0,1]:
.. math::
\int_{0}^{1} f(x) \d{x} = \int_{0}^{1} f[x(t)] x'(t) \d{t}
\approx \frac{1}{N}\sum_{k=1}^{N-1} f(x_k) x'_{k},\\
t_{k} = \frac{k}{N},\\
x_{k} = \int_{0}^{t_{k}} x'(t)\d{t},\\
x'(t) = \frac{1}{Q}\exp\left(-\frac{1}{t} - \frac{1}{1-t}\right),\\
Q = \int_{0}^{1}\exp\left(-\frac{1}{t} - \frac{1}{1-t}\right).
Examples
--------
>>> integrate = IMT()
>>> integrate(np.sqrt, 0, 1) # doctest: +ELLIPSIS
0.666666666...
"""
_state_vars = [
('n_max', 11,
"""Maximum `N = 2**n_max` intervals. The abscissa will
dynamically increase as required up to this maximum value
in order to satisfy the tolerances. Once this hard limit
is reached, then the tolerances may not be met with a
warning will be emitted."""),
('n', 9, """Current number of intervals."""),
('n_0', 5,
"""Start integration with this rule to ensure a minimum
number of points. Since the method is adaptive, this
should be used to prevent the routine from missing
features."""),
('abs_tol', _eps, "Desired absolute accuracy."),
('rel_tol', _eps, "Desired relative accuracy."),
('_x_k', Computed, "Abscissa"),
('_w_k', Computed, "Weights"),
('_Q', Computed, "Normalization"),
('verbosity', 0,
"If > 0, then print warnings. If > 9 then print termination level")
]
process_vars()
@staticmethod
def _dx_dt_Q(t):
return np.exp(-np.divide(1,t) - np.divide(1,1-t))
def __init__(self, *v, **kw):
if self.n_0 >= self.n:
raise ValueError("n_0=%i must be less than n=%i" %
(self.n_0, self.n))
Q = 0.0070298584066096556
self._Q = Q
#self.Q = sp.integrate.quad(self._dx_dt_Q, 0, 1,
# epsabs=_eps, epsrel=_eps)[0]
N = 2**self.n
k = np.arange(1,N, dtype=float)
t_k = k/N
self._w_k = self._dx_dt_Q(t_k)/Q/N
self._x_k = np.array([sp.integrate.quad(self._dx_dt_Q, 0, t,
epsabs=_eps,
epsrel=_eps)[0]
for t in t_k])/Q
def phi(self, f, a=0, b=1):
r"""Return the function `phi(t) = f(x(t))*w(t)`.
to which the trapezoidal rule is applied.
.. note:: This is slow. Not for production use.
"""
@np.vectorize
def phi(t):
x = sp.integrate.quad(self._dx_dt_Q, 0, t,
epsabs=_eps,
epsrel=_eps)[0]/self._Q
w = np.exp( -np.divide(1,t) - np.divide(1,1-t))/self._Q
return f(x*(b-a)+a)*w
return phi
def sum_m(self, f, m, a=0, b=1):
r"""Return the `n`'th term which is the sum of the function
`f` on the set of unique lattice points in the `n`'th
lattice."""
x_new, w_new = self.x_w_new(m)
return np.sum(f(x_new*(b-a)+a)*w_new,
axis=-1)*(b-a)
def x_w_new(self, m):
r"""Return the new abscissa and weights `(x,w)` at level `m`.
If needed, these are computed and cached."""
if m <= self.n:
stride = 2**(self.n - m)
offset = stride//2 - 1
x_new = self._x_k[offset::stride]
w_new = self._w_k[offset::stride]*(stride//2)
else:
raise NotImplementedError
return x_new, w_new
def _augment(self):
r"""Augment the abscissa and weights."""
# Compute new abscissa and weights
n = self.n
N = 2**n
M = 2*N
k = np.arange(1,M, dtype=float)
t_k = k/M
x_k = np.empty(t_k.shpe, dtype=float)
w_k = np.empty(t_k.sahpe, dtype=float)
x_k[1::2] = self._x_k
w_k[1::2] = self._w_k
dt = t_k[0]
for n in xrange(N):
m = 2*n+1
x = x_k[m]
t = t_k[m]
x_k[m-1] = x - sp.integrate.quad(self._dx_dt_Q,
t_k[m], t_k[m-1],
epsabs=_eps*dt,
epsrel=_eps)
x_k[m+1] = x - sp.integrate.quad(self._dx_dt_Q,
t_k[m], t_k[m+1],
epsabs=_eps*dt,
epsrel=_eps)
self._w_k = self._dx_dt_Q(t_k)/Q/N
self._x_k = np.array([[0]
for t in t_k])/Q
return (x_new, w_new)
def __call__(self, f, a=0, b=1,
n_max=np.inf,
abs_tol=None, rel_tol=None):
if abs_tol is None:
abs_tol = self.abs_tol
if rel_tol is None:
rel_tol = self.rel_tol
res = 0
for m in xrange(0, min(n_max + 1, self.n)):
term = self.sum_m(f, m=m, a=a, b=b)
res /= 2
corr = term - res
res += term
abs_err = abs(corr)
rel_err = abs_err/(abs(res) + _tiny)
err = np.minimum(abs_err/abs_tol,
rel_err/rel_tol).max()
if err <= 1 and m >= self.n_0:
if 9 < self.verbosity:
print("Terminating at level %i" % m)
break
if err > 1:
if self.verbosity > 0:
warnings.warn(
"Desired tolerance not met: " +
"abs_tol=%g, rel_tol=%g, err/tol=%g"
% (abs_tol, rel_tol, err))
return res