r"""Tools for working with :mod:`theano`.
The main goal is to enable as seamless integration of custom functions as
possible. In particular, addressing the following issues:
1) Some functions may be purely numeric due to a complicated or adaptive
algorithm. These functions need to be wrapped by :class:`theano.Op` or
:class:`theano.scalar.ScalarOp` so that they can occupy a place in the
tree. Once a function tree is generated, we can apply
:class:`theano.tensor.Elemwise` to allow vectorization, and then use
:func:`theano.function` to compile this
2) Compiling functions can be slow and should only be done when needed (maybe on
import or at most once per session). An issue is how to use automatic
differentiation of member functions that depend on parameters. Just passing
symbolics through the function often works, but generates a tree that has the
current values hard-coded: The function will need to be recompiled whenever
the parameters change.
"""
from __future__ import division
__all__ = ['Polynomial', 'Elemwise', 'UnaryScalarOp', 'InverseUnaryScalarOp',
'ScalarOpFromExpr', 'UnaryScalarOpFromExpr']
import os.path
import inspect
import subprocess
import numpy as np
import theano.scalar
import theano.compile.mode
import theano.gof.compiledir
from mmf.objects import StateVars, process_vars, Computed, Excluded
from mmf.archive import DataSet
_FAST_PY = theano.compile.mode.Mode('py', 'fast_run')
class CompileCache(StateVars):
r"""Cache of compiled functions.
One can use :meth:`function` as a replacement for :func:`theano.function`
for compiling. If the archive exists and has an appropriate entry, and if
the mercurial checksum is unchanged, then the compiled function will be
restored from disk rather than being recompiled.
Note: we use the thano hash as a key in the actual dataset, but map the
given name to this using the `lookup` dictionary so that users can use any
function names they like (for instance `'f(x,y)'` which is not a valid
identifier).
"""
_state_vars = [
('prefix', "", "Prefix for archive name. Module name for example."),
('force_compile', False, "Set to `True` to force compilation."),
('use_if_modified', False, "If `True`, then cache will be used " +
"even if the mercurial repository is modified."),
('module', None, r"""Specify the base module for your project here.
This will be used to compute the cache name, and also to determine
the directory in which to run `hg` in order to determine the
checksum. If `None`, then `hg` will be run in the current working
directory (wherever :func:`subprocess.Popen` runs commands.)"""),
('use_dataset', True),
('dir', None, """Output directory for cache. Computed if `None` and set
to reside in the theano compile directory."""),
('mode', _FAST_PY, "Theano compilation mode."),
('hg_checksum', None, "Current mercurial checksum. Computed if `None`"),
('filename', Computed, "Name of the :class:`mmf.archive.Dataset`"),
('modified', Computed, "`True` if the hg repo has been modified."),
('dataset', Computed, "The actual dataset."),
('_cache', Excluded, """The cache. Items compiled in the current
session are stored here current to prevent having to load the entire
dataset.""")
]
process_vars()
def __init__(self, *v, **kw):
if not self.dir:
self.dir = theano.gof.compiledir.filter_compiledir(
theano.config.compiledir)
self.hg_checksum, self.modified = self.get_hg_checksum(self.hg_checksum)
if self.hg_checksum and self.use_dataset:
self.filename = os.path.join(
self.dir,
"compile_cache_%s_%s" % (self.prefix, self.hg_checksum))
else:
self.filename = None
if self.filename:
self.dataset = DataSet(self.filename, 'w')
if 'lookup' not in self.dataset:
self.dataset['lookup'] = {}
else:
self.dataset = None
self._cache = {}
def get_hg_checksum(self, hg_checksum=None):
r"""Return `(checksum, modified)`, the mercurial checksum of the current
directory and a boolean flag indicating if there are uncommitted
modifications. The checksum is `None` if the current directory is not
under version control."""
modified = True
cmd = ["hg","-q","id"]
working_dir = None
if self.module:
working_dir = os.path.dirname(self.module.__file__)
try:
if not hg_checksum:
hg_checksum = subprocess.check_output(
cmd, cwd=working_dir).strip()
if len(hg_checksum) not in [12, 13]:
hg_checksum = None
elif hg_checksum.endswith('+'):
hg_checksum = hg_checksum[:12]
else:
modified = False
except subprocess.CalledProcessError:
hg_checksum = None
return hg_checksum, modified
def compile(self, name, outputs, inputs, mode=None, *v, **kw):
r"""Return the compiled object and store it in the cache as `name`.
This always forces a compile."""
if mode is None:
mode = self.mode
function = theano.function(outputs, inputs, mode=mode, *v, **kw)
if self.dataset:
_hash = abs(hash(name))
key = "theano_function_%i" % (_hash,)
lookup = self.dataset['lookup']
lookup[name] = key
self.dataset['lookup'] = lookup
setattr(self.dataset, key, function)
self._cache[name] = function
return function
def function(self, name, outputs, inputs, mode=None, *v, **kw):
r"""Return the compiled object and store it in the cache as `name`.
Only compiles if the object is not in the cache or dataset."""
function = self[name]
if not function:
function = self.compile(name, outputs, inputs, mode=mode, *v, **kw)
return function
def __getitem__(self, name):
r"""Return the function `name` if it exists, otherwise return `False`.
The recommend use of this is to quickly check if a function exists
before proceeding to perform a lengthy calculation to generate it."""
if name not in self._cache:
if self.dataset and name in self.dataset['lookup']:
key = self.dataset['lookup'][name]
self._cache[name] = getattr(self.dataset, key)
return self._cache.get(name, False)
def __contains__(self, name):
r"""Return `True` if the compiled function `name` is cached."""
return ((name in self._cache)
or (self.dataset and name in self.dataset['lookup']))
[docs]class Polynomial(theano.Op):
r"""Theano wrapper for polynomials.
Examples
--------
>>> import theano.tensor as T
>>> x = T.scalar('x')
>>> c = T.vector('c')
>>> p = Polynomial()(c, x)
>>> dp = T.grad(p, x)
>>> ddp = T.grad(dp, x)
>>> f = theano.function([c, x], [p, dp, ddp], mode='FAST_COMPILE')
>>> _c = [1.0,2.0,-3.0,4.0]
>>> f(_c, 2.0)
[25.0, array(38.0), array(42.0)]
We can also vectorize the polynomial over `x`. Note the use of `sum()` when
constructing the derivatives. See
https://groups.google.com/d/topic/theano-users/81hG-LzvxOY/discussion
>>> import theano.tensor as T
>>> x = T.matrix('x')
>>> c = T.vector('c')
>>> p = Polynomial()(c, x)
>>> dp = T.grad(p.sum(), x)
>>> ddp = T.grad(dp.sum(), x)
>>> f = theano.function([c, x], [p, dp, ddp], mode='FAST_COMPILE')
>>> _c = [1.0,2.0,-3.0,4.0]
>>> _x = [[1.0,2.0],[3.0,4.0]]
>>> f(_c, _x)
[array([[ 4., 25.],
[ 88., 217.]]),
array([[ 8., 38.],
[ 92., 170.]]),
array([[ 18., 42.],
[ 66., 90.]])]
"""
[docs] def __init__(self, d=0):
self.d = d
def __eq__(self, other):
return type(self) == type(other) and self.d == other.d
def __hash__(self):
return hash(type(self)) + hash(self.d)
[docs] def make_node(self, c, x):
x_ = theano.tensor.as_tensor_variable(x)
c_ = theano.tensor.as_tensor_variable(c)
return theano.Apply(self,
inputs=[c_, x_],
outputs=[x_.type()])
[docs] def grad(self, inputs, output_gradients):
r"""We do not yet support differentiation wrt `c` though this should be
easy (except for broadcasting issues)."""
c, x = inputs
dC_dp = output_gradients[0]
dp_dx = Polynomial(d=self.d+1)(c, x)
return [None, dC_dp * dp_dx]
def _polynomial(c=theano.tensor.vector("c"), x=theano.tensor.scalar("x")):
r"""Return an Op representing a polynomial defined by the coefficients `c`
acting on `x`."""
max_coefficients_supported = 10000
# Generate the components of the polynomial
full_range = theano.tensor.arange(max_coefficients_supported)
components, updates = theano.scan(fn=lambda _coeff, power, free_var:
_coeff*(free_var ** power),
outputs_info=None,
sequences=[c, full_range],
non_sequences=x)
polynomial = components.sum()
return polynomial
[docs]class ScalarOpFromExpr(theano.scalar.ScalarOp):
r"""This class allows you to construct a :class:`theano.scalar.ScalarOp`
from a tensor expression. This allows for automatic derivative
calculations."""
[docs] def __init__(self, inputs, outputs, d=None, name=None,
fn=None, mode='FAST_COMPILE'):
self.name = name
self.mode = mode
while d:
outputs = [theano.tensor.grad(_o, d.pop()) for _o in outputs]
self.inputs = inputs
self.outputs = outputs
self.nin = len(inputs)
self.nout = len(outputs)
if not fn:
fn = theano.function(outputs, inputs, mode=mode)
self.fn = fn
def __hash__(self):
return hash(self.inputs) ^ hash(self.outputs) ^ hash(type(self))
def __eq__(self, other):
return hash(self) == hash(other)
[docs] def output_types(self):
return [theano.tensor.scalar_from_tensor(_o).type
for _o in self.outputs]
[docs] def impl(self, inputs):
return self.fn(*inputs)
[docs] def grad(self, inputs, grads):
res = []
for _i in self.inputs:
res.append(0)
for _g in grads:
res[-1] += _g*ScalarOpFromExpr(inputs=self.inputs,
outputs=self.outputs,
d=[_i],
name=self._get_d_name(self.name,
_i.name),
mode=self.mode)(inputs)
return res
def _get_d_name(self, name, dname):
r"""Return a new name of the form `'name_dname'` representing the
derivative of `name` wrt `dname`. If either is `None` then the `name`
will be returned unchanged."""
if name is not None and dname is not None:
name = name + "_" + dname
return name
[docs]class UnaryScalarOpFromExpr(ScalarOpFromExpr):
r"""Unary version of :class:`ScalarOpFromExpr`. Takes a single variable `x`
and a single expression `f_x` as inputs instead of lists. The derivatives
are also specified as an integer `d` now."""
nin = 1
nout = 1
[docs] def __init__(self, x, f_x, d=0, name=None,
fn=None, mode='FAST_COMPILE'):
self.mode = mode
self.name = name
while d > 0:
f_x = theano.tensor.grad(f_x, x)
d -= 1
self.x = x
self.f_x = f_x
if not fn:
fn = theano.function([x], f_x, mode=mode)
self.fn = fn
def __hash__(self):
return hash(self.x) ^ hash(self.f_x) ^ hash(type(self))
[docs] def output_types(self, _dtypes):
return [theano.tensor.scalar_from_tensor(self.f_x).type]
[docs] def impl(self, x):
return self.fn(x)
[docs] def grad(self, (x,), (gf,)):
return [gf*UnaryScalarOpFromExpr(
x=self.x, f_x=self.f_x, d=1, mode=self.mode,
name=self._get_d_name(self.name, self.x.name))(x)]
[docs]class UnaryScalarOp(theano.scalar.UnaryScalarOp):
r"""Theano wrapper for functions of a single variable that can compute their
own derivative via a flag `d`. To use this, simply subclass and define
:meth:`impl`. Several things happen automatically:
1) If not provided, the name will be set to the lowercase version of the
class name with primes or '^{(d)}' used to denote derivatives.
2) The :attr:`output_types_preference` is set to
:attr:`theano.scalar.upgrade_to_float_no_complex` (can override by
defining as a class variable.)
3) :meth:`__call__` is overloaded to call :meth:`impl` if none of the inputs
is a theano variable. This will use variables names as listed in the
arguments of :meth:`impl` as the theano variables, and will either use
the class variables of the same name or will use :attr:`_variable_type`
to create the variables.
4) Provides a convenience method :meth:`get_function` that returns the
compiled function, inspecting :meth:`impl` to determine the arguments and
using :attr:`_variable_type` to define the types.
.. note:: Be sure to wrap the generated Op with
:class:`theano.tensor.Elemwise` if you want to apply this to arrays.
"""
_max_primes = 3
_variable_type = theano.tensor.dscalar
_compile_mode = _FAST_PY
def __hash__(self):
return hash(type(self)) ^ hash(self.d)
def __eq__(self, other):
return hash(self) == hash(other)
[docs] def __init__(self, d=0, name=None, _proxy=None):
self.d = d
if name is None:
name = self.__class__.__name__.lower()
if d < self._max_primes:
self.name = "%s%s" % (name, "'"*d)
else:
self.name = "%s^(%i)" % (name, d)
if not hasattr(self, 'output_types_preference'):
self.output_types_preference = theano.scalar.upgrade_to_float
self._proxy = _proxy # This provides the implementation.
[docs] def impl(self, *v, **kw):
r"""Required. Compute and return the `self.d`th derivative
numerically."""
if self._proxy:
return self._proxy.impl(*v, **kw)
else:
raise NotImplementedError
[docs] def grad(self, inputs, output_gradients):
x, = inputs
gx, = output_gradients
return [gx * self.__class__(d=self.d + 1, _proxy=self._proxy)(x)]
def __call__(self, *v, **kw):
if _is_symbolic(v, kw):
return theano.scalar.UnaryScalarOp.__call__(self, *v, **kw)
else:
return self.impl(*v, **kw)
[docs] def get_function(self, vectorize=True):
r"""Return the op as a compiled function. Inspects :meth:`impl` to
determine the names of the arguments. This will of course fail if
`_proxy` is used."""
op = self
if vectorize:
op = theano.tensor.Elemwise(op)
names = inspect.getargs(self.impl.im_func.func_code).args[1:]
vars = [getattr(self.__class__, _n, self._variable_type(_n))
for _n in names]
return theano.function(vars, op(*vars), mode=self._compile_mode)
@property
[docs] def elemwise(self):
r"""Return as a vectorized op."""
return theano.tensor.Elemwise(self)
[docs]class InverseUnaryScalarOp(UnaryScalarOp):
r"""Class skeleton to represent the inverse of an operator. You must
provide the implementation (presumably using a root-finder of some sort.
Presently you need to provide an operator (not applied to its arguments) for
both `f` and its derivative `df`. This could be relaxed if we figure out
how to "unapply" an operation.
Notation, `y=f(x)`, this function thus has argument `y`.
"""
f = NotImplemented
df = NotImplemented
def __hash__(self):
return hash(type(self)) ^ hash(self.f) ^ hash(self.df)
[docs] def __init__(self, name=None, _proxy=None):
if name is None:
name = self.__class__.__name__.lower()
if not hasattr(self, 'output_types_preference'):
self.output_types_preference = \
theano.scalar.upgrade_to_float_no_complex
self._proxy = _proxy # This provides the implementation.
[docs] def impl(self, *v, **kw):
r"""Required. Compute and return the inverse function numerically."""
if self._proxy:
return self._proxy.impl(*v, **kw)
else:
raise NotImplementedError
[docs] def grad(self, (y,), (gz,)):
r"""This is the key: we use `self` to compute `x` then return `1/df`."""
x = self(y)
return [self.df(x).__rdiv__(gz)]
def _is_symbolic(v, kw):
r"""Return `True` if any of the arguments are symbolic."""
symbolic = False
v = list(v)
for _container, _iter in [(v, xrange(len(v))), (kw, kw)]:
for _k in _iter:
_v = _container[_k]
if isinstance(_v, theano.gof.Variable):
symbolic = True
return symbolic
[docs]class Elemwise(theano.tensor.Elemwise):
r"""Wrapper for :class:`theano.tensor.Elemwise` that overloads
:meth:`__call__` to directly call the implementation for numerical
arguments."""
def __call__(self, *v, **kw):
if _is_symbolic(v, kw):
return theano.tensor.Elemwise.__call__(self, *v, **kw)
else:
return self.scalar_op.impl(*v, **kw)
def test_1():
class Sq(UnaryScalarOp):
def impl(self, x):
if 0 == self.d:
return x*x
elif 1 == self.d:
return 2.0*x
elif 2 == self.d:
return 2.0
else:
return 0.0
sq = Elemwise(Sq())
def f(x):
r"""Example of a function using sq."""
return 2.0*sq(3.0 - sq(x))
assert np.allclose(72.0, f(3.0))
x = theano.tensor.dscalar('x')
df = theano.function([x], theano.tensor.grad(f(x), x))
_df = lambda x: -8.0*x*(3.0 - sq(x))
assert np.allclose(df(2.0), _df(2.0))
def test_inv():
class Sq(UnaryScalarOp):
def impl(self, x):
if 0 == self.d:
return x*x
elif 1 == self.d:
return 2.0*x
elif 2 == self.d:
return 2.0
else:
return 0.0
class Sqrt(InverseUnaryScalarOp):
f = Sq(d=0)
df = Sq(d=1)
def impl(self, y):
return np.sqrt(y)
sqrt = Elemwise(Sqrt())
def f(x, sqrt=sqrt):
r"""Example of a function using sq."""
return 2.0*sqrt(x + x**2)
def _f(_x, d=0):
import sympy
x = sympy.var('x')
y = f(x, sqrt=sympy.sqrt)
res = []
while 0 <= d:
res.append(float(y.n(subs=dict(x=_x))))
y = y.diff(x)
d -= 1
return res
assert np.allclose(2.0, sqrt(4.0))
x = theano.tensor.dscalar('x')
fs = [f(x)]
d = 3
for _n in xrange(d):
fs.append(theano.tensor.grad(fs[-1], x))
theano_f = theano.function([x], fs, mode='FAST_COMPILE')
assert np.allclose(theano_f(2.0), _f(2.0, d=d))