Source code for mmf.math.integrate.integrate_1d.imt

r"""IMT Integration method (:mod:`mmf.math.integrate.integrate_1d.imt`)

.. currentmodule:: mmf.math.integrate.integrate_1d.imt

.. autosummary::

   IMT

"""

from __future__ import division

import warnings

import numpy as np
import scipy as sp

import scipy.integrate

from mmf.objects import StateVars, process_vars, Computed

__all__ = ['IMT']

_finfo = np.finfo(float)
_eps = _finfo.eps
_tiny = _finfo.tiny


[docs]class IMT(StateVars): r"""Class for performing the integral .. math:: \int_{a}^{b}\d{x} f(x) using the IMT method for evaluating integrals over [0,1]: .. math:: \int_{0}^{1} f(x) \d{x} = \int_{0}^{1} f[x(t)] x'(t) \d{t} \approx \frac{1}{N}\sum_{k=1}^{N-1} f(x_k) x'_{k},\\ t_{k} = \frac{k}{N},\\ x_{k} = \int_{0}^{t_{k}} x'(t)\d{t},\\ x'(t) = \frac{1}{Q}\exp\left(-\frac{1}{t} - \frac{1}{1-t}\right),\\ Q = \int_{0}^{1}\exp\left(-\frac{1}{t} - \frac{1}{1-t}\right). Examples -------- >>> integrate = IMT() >>> integrate(np.sqrt, 0, 1) # doctest: +ELLIPSIS 0.666666666... """ _state_vars = [ ('n_max', 11, """Maximum `N = 2**n_max` intervals. The abscissa will dynamically increase as required up to this maximum value in order to satisfy the tolerances. Once this hard limit is reached, then the tolerances may not be met with a warning will be emitted."""), ('n', 9, """Current number of intervals."""), ('n_0', 5, """Start integration with this rule to ensure a minimum number of points. Since the method is adaptive, this should be used to prevent the routine from missing features."""), ('abs_tol', _eps, "Desired absolute accuracy."), ('rel_tol', _eps, "Desired relative accuracy."), ('_x_k', Computed, "Abscissa"), ('_w_k', Computed, "Weights"), ('_Q', Computed, "Normalization"), ('verbosity', 0, "If > 0, then print warnings. If > 9 then print termination level") ] process_vars() @staticmethod def _dx_dt_Q(t): return np.exp(-np.divide(1,t) - np.divide(1,1-t)) def __init__(self, *v, **kw): if self.n_0 >= self.n: raise ValueError("n_0=%i must be less than n=%i" % (self.n_0, self.n)) Q = 0.0070298584066096556 self._Q = Q #self.Q = sp.integrate.quad(self._dx_dt_Q, 0, 1, # epsabs=_eps, epsrel=_eps)[0] N = 2**self.n k = np.arange(1,N, dtype=float) t_k = k/N self._w_k = self._dx_dt_Q(t_k)/Q/N self._x_k = np.array([sp.integrate.quad(self._dx_dt_Q, 0, t, epsabs=_eps, epsrel=_eps)[0] for t in t_k])/Q def phi(self, f, a=0, b=1): r"""Return the function `phi(t) = f(x(t))*w(t)`. to which the trapezoidal rule is applied. .. note:: This is slow. Not for production use. """ @np.vectorize def phi(t): x = sp.integrate.quad(self._dx_dt_Q, 0, t, epsabs=_eps, epsrel=_eps)[0]/self._Q w = np.exp( -np.divide(1,t) - np.divide(1,1-t))/self._Q return f(x*(b-a)+a)*w return phi def sum_m(self, f, m, a=0, b=1): r"""Return the `n`'th term which is the sum of the function `f` on the set of unique lattice points in the `n`'th lattice.""" x_new, w_new = self.x_w_new(m) return np.sum(f(x_new*(b-a)+a)*w_new, axis=-1)*(b-a) def x_w_new(self, m): r"""Return the new abscissa and weights `(x,w)` at level `m`. If needed, these are computed and cached.""" if m <= self.n: stride = 2**(self.n - m) offset = stride//2 - 1 x_new = self._x_k[offset::stride] w_new = self._w_k[offset::stride]*(stride//2) else: raise NotImplementedError return x_new, w_new def _augment(self): r"""Augment the abscissa and weights.""" # Compute new abscissa and weights n = self.n N = 2**n M = 2*N k = np.arange(1,M, dtype=float) t_k = k/M x_k = np.empty(t_k.shpe, dtype=float) w_k = np.empty(t_k.sahpe, dtype=float) x_k[1::2] = self._x_k w_k[1::2] = self._w_k dt = t_k[0] for n in xrange(N): m = 2*n+1 x = x_k[m] t = t_k[m] x_k[m-1] = x - sp.integrate.quad(self._dx_dt_Q, t_k[m], t_k[m-1], epsabs=_eps*dt, epsrel=_eps) x_k[m+1] = x - sp.integrate.quad(self._dx_dt_Q, t_k[m], t_k[m+1], epsabs=_eps*dt, epsrel=_eps) self._w_k = self._dx_dt_Q(t_k)/Q/N self._x_k = np.array([[0] for t in t_k])/Q return (x_new, w_new) def __call__(self, f, a=0, b=1, n_max=np.inf, abs_tol=None, rel_tol=None): if abs_tol is None: abs_tol = self.abs_tol if rel_tol is None: rel_tol = self.rel_tol res = 0 for m in xrange(0, min(n_max + 1, self.n)): term = self.sum_m(f, m=m, a=a, b=b) res /= 2 corr = term - res res += term abs_err = abs(corr) rel_err = abs_err/(abs(res) + _tiny) err = np.minimum(abs_err/abs_tol, rel_err/rel_tol).max() if err <= 1 and m >= self.n_0: if 9 < self.verbosity: print("Terminating at level %i" % m) break if err > 1: if self.verbosity > 0: warnings.warn( "Desired tolerance not met: " + "abs_tol=%g, rel_tol=%g, err/tol=%g" % (abs_tol, rel_tol, err)) return res