Source code for mmf.utils.mmf_theano

r"""Tools for working with :mod:`theano`.

The main goal is to enable as seamless integration of custom functions as
possible.  In particular, addressing the following issues:

1) Some functions may be purely numeric due to a complicated or adaptive
   algorithm.  These functions need to be wrapped by :class:`theano.Op` or
   :class:`theano.scalar.ScalarOp` so that they can occupy a place in the
   tree.  Once a function tree is generated, we can apply
   :class:`theano.tensor.Elemwise` to allow vectorization, and then use
   :func:`theano.function` to compile this
2) Compiling functions can be slow and should only be done when needed (maybe on
   import or at most once per session).  An issue is how to use automatic
   differentiation of member functions that depend on parameters.  Just passing
   symbolics through the function often works, but generates a tree that has the
   current values hard-coded: The function will need to be recompiled whenever
   the parameters change.
"""
from __future__ import division

__all__ = ['Polynomial', 'Elemwise', 'UnaryScalarOp', 'InverseUnaryScalarOp',
           'ScalarOpFromExpr', 'UnaryScalarOpFromExpr'] 

import os.path
import inspect
import subprocess

import numpy as np

import theano.scalar
import theano.compile.mode
import theano.gof.compiledir

from mmf.objects import StateVars, process_vars, Computed, Excluded
from mmf.archive import DataSet

_FAST_PY = theano.compile.mode.Mode('py', 'fast_run')

class CompileCache(StateVars):
    r"""Cache of compiled functions.

    One can use :meth:`function` as a replacement for :func:`theano.function`
    for compiling.  If the archive exists and has an appropriate entry, and if
    the mercurial checksum is unchanged, then the compiled function will be
    restored from disk rather than being recompiled.

    Note: we use the thano hash as a key in the actual dataset, but map the
    given name to this using the `lookup` dictionary so that users can use any
    function names they like (for instance `'f(x,y)'` which is not a valid
    identifier).
    """
    _state_vars = [
        ('prefix', "", "Prefix for archive name.  Module name for example."),
        ('force_compile', False, "Set to `True` to force compilation."),
        ('use_if_modified', False, "If `True`, then cache will be used " +
         "even if the mercurial repository is modified."),
        ('module', None, r"""Specify the base module for your project here.
            This will be used to compute the cache name, and also to determine
            the directory in which to run `hg` in order to determine the
            checksum.  If `None`, then `hg` will be run in the current working
            directory (wherever :func:`subprocess.Popen` runs commands.)"""),
        ('use_dataset', True),
        ('dir', None, """Output directory for cache.  Computed if `None` and set
            to reside in the theano compile directory."""),
        ('mode', _FAST_PY, "Theano compilation mode."),
        ('hg_checksum', None, "Current mercurial checksum. Computed if `None`"),
        ('filename', Computed, "Name of the :class:`mmf.archive.Dataset`"),
        ('modified', Computed, "`True` if the hg repo has been modified."),
        ('dataset', Computed, "The actual dataset."),
        ('_cache', Excluded, """The cache.  Items compiled in the current
            session are stored here current to prevent having to load the entire
            dataset.""")
        ]
    process_vars()
        
    def __init__(self, *v, **kw):
        if not self.dir:
            self.dir = theano.gof.compiledir.filter_compiledir(
                theano.config.compiledir)
        self.hg_checksum, self.modified = self.get_hg_checksum(self.hg_checksum)
        if self.hg_checksum and self.use_dataset:
            self.filename = os.path.join(
                self.dir, 
                "compile_cache_%s_%s" % (self.prefix, self.hg_checksum))
        else:
            self.filename = None
        if self.filename:
            self.dataset = DataSet(self.filename, 'w')
            if 'lookup' not in self.dataset:
                self.dataset['lookup'] = {}
        else:
            self.dataset = None
        self._cache = {}

    def get_hg_checksum(self, hg_checksum=None):
        r"""Return `(checksum, modified)`, the mercurial checksum of the current
        directory and a boolean flag indicating if there are uncommitted
        modifications.  The checksum is `None` if the current directory is not
        under version control."""
        modified = True
        cmd = ["hg","-q","id"]
        working_dir = None
        if self.module:
            working_dir = os.path.dirname(self.module.__file__)
        try:
            if not hg_checksum:
                hg_checksum = subprocess.check_output(
                    cmd, cwd=working_dir).strip()
            if len(hg_checksum) not in [12, 13]:
                hg_checksum = None
            elif hg_checksum.endswith('+'):
                hg_checksum = hg_checksum[:12]
            else:
                modified = False
        except subprocess.CalledProcessError:
            hg_checksum = None
        return hg_checksum, modified

    def compile(self, name, outputs, inputs, mode=None, *v, **kw):
        r"""Return the compiled object and store it in the cache as `name`.
        This always forces a compile."""
        if mode is None:
            mode = self.mode
        function = theano.function(outputs, inputs, mode=mode, *v, **kw)
        if self.dataset:
            _hash = abs(hash(name))
            key = "theano_function_%i" % (_hash,)
            lookup = self.dataset['lookup']
            lookup[name] = key
            self.dataset['lookup'] = lookup
            setattr(self.dataset, key, function)
        self._cache[name] = function
        return function

    def function(self, name, outputs, inputs, mode=None, *v, **kw):
        r"""Return the compiled object and store it in the cache as `name`.
        Only compiles if the object is not in the cache or dataset."""
        function = self[name]
        if not function:
            function = self.compile(name, outputs, inputs, mode=mode, *v, **kw)
        return function

    def __getitem__(self, name):
        r"""Return the function `name` if it exists, otherwise return `False`.
        The recommend use of this is to quickly check if a function exists
        before proceeding to perform a lengthy calculation to generate it."""
        if name not in self._cache:
            if self.dataset and name in self.dataset['lookup']:
                key = self.dataset['lookup'][name]
                self._cache[name] = getattr(self.dataset, key)
        return self._cache.get(name, False)

    def __contains__(self, name):
        r"""Return `True` if the compiled function `name` is cached."""
        return ((name in self._cache) 
                or (self.dataset and name in self.dataset['lookup']))
    
[docs]class Polynomial(theano.Op): r"""Theano wrapper for polynomials. Examples -------- >>> import theano.tensor as T >>> x = T.scalar('x') >>> c = T.vector('c') >>> p = Polynomial()(c, x) >>> dp = T.grad(p, x) >>> ddp = T.grad(dp, x) >>> f = theano.function([c, x], [p, dp, ddp], mode='FAST_COMPILE') >>> _c = [1.0,2.0,-3.0,4.0] >>> f(_c, 2.0) [25.0, array(38.0), array(42.0)] We can also vectorize the polynomial over `x`. Note the use of `sum()` when constructing the derivatives. See https://groups.google.com/d/topic/theano-users/81hG-LzvxOY/discussion >>> import theano.tensor as T >>> x = T.matrix('x') >>> c = T.vector('c') >>> p = Polynomial()(c, x) >>> dp = T.grad(p.sum(), x) >>> ddp = T.grad(dp.sum(), x) >>> f = theano.function([c, x], [p, dp, ddp], mode='FAST_COMPILE') >>> _c = [1.0,2.0,-3.0,4.0] >>> _x = [[1.0,2.0],[3.0,4.0]] >>> f(_c, _x) [array([[ 4., 25.], [ 88., 217.]]), array([[ 8., 38.], [ 92., 170.]]), array([[ 18., 42.], [ 66., 90.]])] """
[docs] def __init__(self, d=0): self.d = d
def __eq__(self, other): return type(self) == type(other) and self.d == other.d def __hash__(self): return hash(type(self)) + hash(self.d)
[docs] def make_node(self, c, x): x_ = theano.tensor.as_tensor_variable(x) c_ = theano.tensor.as_tensor_variable(c) return theano.Apply(self, inputs=[c_, x_], outputs=[x_.type()])
[docs] def perform(self, node, inputs, output_storage): c, x = inputs output_storage[0][0] = np.polynomial.Polynomial(c).deriv(self.d)(x)
[docs] def grad(self, inputs, output_gradients): r"""We do not yet support differentiation wrt `c` though this should be easy (except for broadcasting issues).""" c, x = inputs dC_dp = output_gradients[0] dp_dx = Polynomial(d=self.d+1)(c, x) return [None, dC_dp * dp_dx]
def _polynomial(c=theano.tensor.vector("c"), x=theano.tensor.scalar("x")): r"""Return an Op representing a polynomial defined by the coefficients `c` acting on `x`.""" max_coefficients_supported = 10000 # Generate the components of the polynomial full_range = theano.tensor.arange(max_coefficients_supported) components, updates = theano.scan(fn=lambda _coeff, power, free_var: _coeff*(free_var ** power), outputs_info=None, sequences=[c, full_range], non_sequences=x) polynomial = components.sum() return polynomial
[docs]class ScalarOpFromExpr(theano.scalar.ScalarOp): r"""This class allows you to construct a :class:`theano.scalar.ScalarOp` from a tensor expression. This allows for automatic derivative calculations."""
[docs] def __init__(self, inputs, outputs, d=None, name=None, fn=None, mode='FAST_COMPILE'): self.name = name self.mode = mode while d: outputs = [theano.tensor.grad(_o, d.pop()) for _o in outputs] self.inputs = inputs self.outputs = outputs self.nin = len(inputs) self.nout = len(outputs) if not fn: fn = theano.function(outputs, inputs, mode=mode) self.fn = fn
def __hash__(self): return hash(self.inputs) ^ hash(self.outputs) ^ hash(type(self)) def __eq__(self, other): return hash(self) == hash(other)
[docs] def output_types(self): return [theano.tensor.scalar_from_tensor(_o).type for _o in self.outputs]
[docs] def impl(self, inputs): return self.fn(*inputs)
[docs] def grad(self, inputs, grads): res = [] for _i in self.inputs: res.append(0) for _g in grads: res[-1] += _g*ScalarOpFromExpr(inputs=self.inputs, outputs=self.outputs, d=[_i], name=self._get_d_name(self.name, _i.name), mode=self.mode)(inputs) return res
def _get_d_name(self, name, dname): r"""Return a new name of the form `'name_dname'` representing the derivative of `name` wrt `dname`. If either is `None` then the `name` will be returned unchanged.""" if name is not None and dname is not None: name = name + "_" + dname return name
[docs]class UnaryScalarOpFromExpr(ScalarOpFromExpr): r"""Unary version of :class:`ScalarOpFromExpr`. Takes a single variable `x` and a single expression `f_x` as inputs instead of lists. The derivatives are also specified as an integer `d` now.""" nin = 1 nout = 1
[docs] def __init__(self, x, f_x, d=0, name=None, fn=None, mode='FAST_COMPILE'): self.mode = mode self.name = name while d > 0: f_x = theano.tensor.grad(f_x, x) d -= 1 self.x = x self.f_x = f_x if not fn: fn = theano.function([x], f_x, mode=mode) self.fn = fn
def __hash__(self): return hash(self.x) ^ hash(self.f_x) ^ hash(type(self))
[docs] def output_types(self, _dtypes): return [theano.tensor.scalar_from_tensor(self.f_x).type]
[docs] def impl(self, x): return self.fn(x)
[docs] def grad(self, (x,), (gf,)): return [gf*UnaryScalarOpFromExpr( x=self.x, f_x=self.f_x, d=1, mode=self.mode, name=self._get_d_name(self.name, self.x.name))(x)]
[docs]class UnaryScalarOp(theano.scalar.UnaryScalarOp): r"""Theano wrapper for functions of a single variable that can compute their own derivative via a flag `d`. To use this, simply subclass and define :meth:`impl`. Several things happen automatically: 1) If not provided, the name will be set to the lowercase version of the class name with primes or '^{(d)}' used to denote derivatives. 2) The :attr:`output_types_preference` is set to :attr:`theano.scalar.upgrade_to_float_no_complex` (can override by defining as a class variable.) 3) :meth:`__call__` is overloaded to call :meth:`impl` if none of the inputs is a theano variable. This will use variables names as listed in the arguments of :meth:`impl` as the theano variables, and will either use the class variables of the same name or will use :attr:`_variable_type` to create the variables. 4) Provides a convenience method :meth:`get_function` that returns the compiled function, inspecting :meth:`impl` to determine the arguments and using :attr:`_variable_type` to define the types. .. note:: Be sure to wrap the generated Op with :class:`theano.tensor.Elemwise` if you want to apply this to arrays. """ _max_primes = 3 _variable_type = theano.tensor.dscalar _compile_mode = _FAST_PY def __hash__(self): return hash(type(self)) ^ hash(self.d) def __eq__(self, other): return hash(self) == hash(other)
[docs] def __init__(self, d=0, name=None, _proxy=None): self.d = d if name is None: name = self.__class__.__name__.lower() if d < self._max_primes: self.name = "%s%s" % (name, "'"*d) else: self.name = "%s^(%i)" % (name, d) if not hasattr(self, 'output_types_preference'): self.output_types_preference = theano.scalar.upgrade_to_float self._proxy = _proxy # This provides the implementation.
[docs] def impl(self, *v, **kw): r"""Required. Compute and return the `self.d`th derivative numerically.""" if self._proxy: return self._proxy.impl(*v, **kw) else: raise NotImplementedError
[docs] def grad(self, inputs, output_gradients): x, = inputs gx, = output_gradients return [gx * self.__class__(d=self.d + 1, _proxy=self._proxy)(x)]
def __call__(self, *v, **kw): if _is_symbolic(v, kw): return theano.scalar.UnaryScalarOp.__call__(self, *v, **kw) else: return self.impl(*v, **kw)
[docs] def get_function(self, vectorize=True): r"""Return the op as a compiled function. Inspects :meth:`impl` to determine the names of the arguments. This will of course fail if `_proxy` is used.""" op = self if vectorize: op = theano.tensor.Elemwise(op) names = inspect.getargs(self.impl.im_func.func_code).args[1:] vars = [getattr(self.__class__, _n, self._variable_type(_n)) for _n in names] return theano.function(vars, op(*vars), mode=self._compile_mode)
@property
[docs] def elemwise(self): r"""Return as a vectorized op.""" return theano.tensor.Elemwise(self)
[docs]class InverseUnaryScalarOp(UnaryScalarOp): r"""Class skeleton to represent the inverse of an operator. You must provide the implementation (presumably using a root-finder of some sort. Presently you need to provide an operator (not applied to its arguments) for both `f` and its derivative `df`. This could be relaxed if we figure out how to "unapply" an operation. Notation, `y=f(x)`, this function thus has argument `y`. """ f = NotImplemented df = NotImplemented def __hash__(self): return hash(type(self)) ^ hash(self.f) ^ hash(self.df)
[docs] def __init__(self, name=None, _proxy=None): if name is None: name = self.__class__.__name__.lower() if not hasattr(self, 'output_types_preference'): self.output_types_preference = \ theano.scalar.upgrade_to_float_no_complex self._proxy = _proxy # This provides the implementation.
[docs] def impl(self, *v, **kw): r"""Required. Compute and return the inverse function numerically.""" if self._proxy: return self._proxy.impl(*v, **kw) else: raise NotImplementedError
[docs] def grad(self, (y,), (gz,)): r"""This is the key: we use `self` to compute `x` then return `1/df`.""" x = self(y) return [self.df(x).__rdiv__(gz)]
def _is_symbolic(v, kw): r"""Return `True` if any of the arguments are symbolic.""" symbolic = False v = list(v) for _container, _iter in [(v, xrange(len(v))), (kw, kw)]: for _k in _iter: _v = _container[_k] if isinstance(_v, theano.gof.Variable): symbolic = True return symbolic
[docs]class Elemwise(theano.tensor.Elemwise): r"""Wrapper for :class:`theano.tensor.Elemwise` that overloads :meth:`__call__` to directly call the implementation for numerical arguments.""" def __call__(self, *v, **kw): if _is_symbolic(v, kw): return theano.tensor.Elemwise.__call__(self, *v, **kw) else: return self.scalar_op.impl(*v, **kw)
def test_1(): class Sq(UnaryScalarOp): def impl(self, x): if 0 == self.d: return x*x elif 1 == self.d: return 2.0*x elif 2 == self.d: return 2.0 else: return 0.0 sq = Elemwise(Sq()) def f(x): r"""Example of a function using sq.""" return 2.0*sq(3.0 - sq(x)) assert np.allclose(72.0, f(3.0)) x = theano.tensor.dscalar('x') df = theano.function([x], theano.tensor.grad(f(x), x)) _df = lambda x: -8.0*x*(3.0 - sq(x)) assert np.allclose(df(2.0), _df(2.0)) def test_inv(): class Sq(UnaryScalarOp): def impl(self, x): if 0 == self.d: return x*x elif 1 == self.d: return 2.0*x elif 2 == self.d: return 2.0 else: return 0.0 class Sqrt(InverseUnaryScalarOp): f = Sq(d=0) df = Sq(d=1) def impl(self, y): return np.sqrt(y) sqrt = Elemwise(Sqrt()) def f(x, sqrt=sqrt): r"""Example of a function using sq.""" return 2.0*sqrt(x + x**2) def _f(_x, d=0): import sympy x = sympy.var('x') y = f(x, sqrt=sympy.sqrt) res = [] while 0 <= d: res.append(float(y.n(subs=dict(x=_x)))) y = y.diff(x) d -= 1 return res assert np.allclose(2.0, sqrt(4.0)) x = theano.tensor.dscalar('x') fs = [f(x)] d = 3 for _n in xrange(d): fs.append(theano.tensor.grad(fs[-1], x)) theano_f = theano.function([x], fs, mode='FAST_COMPILE') assert np.allclose(theano_f(2.0), _f(2.0, d=d))