Source code for mmf.math.multigrid.stencil

r"""Stencil Module (:mod:`mmf.math.multigrid.stencil`)

This module provides access to matrices and operators defined by a
stencil.

.. autosummary::

   Stencil
   Interpolator

.. autoclass:: Code
   :members:
   
"""
from __future__ import division

__all__ = ['Stencil',  'Interpolator',  'Code',
           'PoissonBase','Poisson1D', 'Poisson2D', 'Poisson3D']

import warnings
import copy

import numpy as np
import scipy as sp
import scipy.sparse.linalg
import scipy.weave
from scipy.lib.blas import get_blas_funcs

import pylab as pl
import matplotlib.pyplot as plt
import random
from numpy import pi

from mmf.objects import StateVars,  process_vars
from mmf.objects import Computed,  Required,  ClassVar
import mmf.utils

_inner = (slice(1, -1, None), )            # Slice to remove boundary.
_abs_tol = 1e-8
_rel_tol = 1e-8
_tiny = np.finfo(float).tiny


[docs]class Stencil(StateVars): r"""Represents a matrix given a stencil. This class uses a stencil `S` together with a boundary condition to represent a matrix. With extended index notation, the matrix is: .. math:: A_{a, \vect{i}, b, \vect{i} + \vect{d}} = \delta_{a, b}s_{a}S_{\vect{d}} + \delta_{\vect{d}, \vect{0}} D_{a, b, \vect{i}} The motivation behind this was representing the Jacobian of the following system of equations: .. math:: -s_{a}\nabla^2 \phi_{a}(x) + F_{a}(\vect{\phi}) = 0 where the right-hand side is a local function of the various variables $\phi_i(x)$ and the stencil $S$ represents the operator $-\nabla^2$ on the left-hand side. The tensor $D$ is .. math:: D_{a, b, x} = \pdiff{F_{a}(\vect{\phi})}{\phi_{b}}(x). and $s_{a}$ is a set of scale factors. """ _state_vars = [ ('boundary_conditions', 'periodic', "One of `'periodic'` or `'Neumann'`"), ('S', Required, "3x3x... stencil."), ] process_vars() def __init__(self, *v, **kw): self.S = np.asarray(self.S) def apply(self, x, D=None, scale_factor=None): r"""Apply the stencil to `x`. Parameters ---------- x : array Input. Must have shape `(N, )*d` if `D` is `None` or `(n, ) + (N, )*d` otherwise. D : None, array, optional Diagonals. Must have shape `(n, n) + (N, )*d` scale_factor : array Scale factors for stencil. Must have shape `(n, )`. """ x_shape = x.shape if D is None: nd = 1 d = len(x_shape) x = x.reshape((nd, ) + x_shape) use_D = False D = np.empty((1, )*(d+2), dtype=float) else: d = len(x_shape) - 1 use_D = True if scale_factor is None: scale_factor = np.ones(D.shape[1], dtype=float) else: scale_factor = np.asarray(scale_factor, dtype=float) assert scale_factor.shape == D.shape[1:2] y = np.empty(x_shape, dtype=x.dtype) reps = dict(d=d, d1=d+1, d2=d+2) reps['counters'] = ('int id, jd, %s;' % (', '.join(["ix%(n)i, is%(n)i" % {'n':n} for n in xrange(d)]), )) reps['ix_s'] = ", ".join(["ix%i" % (n, ) for n in xrange(d)]) reps['is_s'] = ", ".join(["is%i" % (n, ) for n in xrange(d)]) reps['j_s'] = ", ".join(["j[%i]" % (n, ) for n in xrange(d)]) reps['for_ix'] = "\n".join( [("for (ix%(n)i=0; ix%(n)i<Nx[%(n)i+1];++ix%(n)i) {") % {'n':n} for n in xrange(d)]) reps['for_is'] = "\n".join( [(r'''for (is%(n)i=0; is%(n)i<Nstencil[%(n)i];++is%(n)i) { i[%(n)i] = ix%(n)i + is%(n)i - Nstencil[%(n)i]/2;''') % {'n':n} for n in xrange(d)]) reps['for_ix_end'] = "}"*d reps['for_is_end'] = "}"*d reps['bc'] = self.boundary_conditions def define(n, name): """Return a string defining the macro for access into the array `name` with `n` dimensions. Use for `n>4` as the macros for `n<5` are already provided by weave. """ return ("#define %s%i(%s) (*((double*)(%s_array->data + %s)))" % (name.upper(), n, ', '.join(['i__%i' % (i, ) for i in xrange(n)]), name, '+'.join(['(i__%i)*S%s[%i]' % (i, name, i) for i in xrange(n)]))) reps['defines'] = define(5, 'D') code = r''' %(defines)s %(counters)s std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> nx(&Nx[1], &Nx[%(d)i+1]); for (id=0;id<Nx[0];++id) { %(for_ix)s Y%(d1)i(id, %(ix_s)s) = 0; %(for_is)s %(bc)s(i, nx, j); Y%(d1)i(id, %(ix_s)s) += scale_factor[id] *STENCIL%(d)i(%(is_s)s)*X%(d1)i(id, %(j_s)s); %(for_is_end)s if (use_D) { for (jd=0;jd<Nx[0];++jd) { Y%(d1)i(id, %(ix_s)s) += D%(d2)i(id, jd, %(ix_s)s)*X%(d1)i(jd, %(ix_s)s); } } %(for_ix_end)s }''' % reps local_dict = dict(x=x, y=y, stencil=self.S, D=D, use_D=int(use_D), scale_factor=scale_factor) mmf.utils.inline(code, local_dict.keys(), local_dict=local_dict, support_code=Code.index_support_code, verbose=2, language='c++') return y.reshape(x.shape) def to_mat(self, shape, D=None, scale_factor=None): r"""Version using :mod:`scipy.weave`. Parameters ---------- shape : array Shape of abscissa `x.shape`. Must be `(N, )*d`. D : None, array, optional Diagonals. Must have shape `(n, n) + (N, )*d` scale_factor : array Scale factors for stencil. Must have shape `(n, )`. """ # Estimate number of entries Ns = np.prod(self.S.shape) Nx = np.prod(shape) Nd = 1 if D is not None: assert 2 + len(shape) == len(D.shape) assert shape == D.shape[2:] assert D.shape[0] == D.shape[1] Nd = D.shape[0] D = D.reshape((Nd, Nd, Nx)) n_ij = (Nx*Ns*Nd # Diagonal blocks + Nx*Nd*(Nd-1)) # Off-diagonal blocks are diagonal ij = np.empty((2, n_ij), dtype=int) data = np.empty(n_ij, dtype=float) stencil = self.S periodic = int('periodic' == self.boundary_conditions) n_ij = np.array(0) d = D if d is None: n_dim = len(shape) reps = {} reps['d'] = n_dim reps['for_j_start'] = "\n".join( [("for (j[%(n)i]=0; j[%(n)i]<n[%(n)i]; ++j[%(n)i]) {") % {"n":n} for n in xrange(n_dim)]) reps['for_s_start'] = "\n".join( [("for (s[%(n)i]=0; s[%(n)i]<Nstencil[%(n)i]; ++s[%(n)i]) {") % {"n":n} for n in xrange(n_dim)]) reps['define_i'] = "\n".join( [("i[%(n)i] = j[%(n)i] + s[%(n)i] - (Nstencil[%(n)i] - 1)/2;") % {"n":n} for n in xrange(n_dim)]) reps['for_end'] = "}"*n_dim reps['s_args'] = ", ".join(["s[%i]" % (n, ) for n in xrange(n_dim)]) reps['bc'] = self.boundary_conditions code = r''' // Code for computing the stencil in arbitrary dimensions std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> s(%(d)i); std::vector<int> n(shape, shape + %(d)i); int ind_i, ind_j; typedef std::map<std::pair<int, int>, double> dat_T; dat_T dat; dat_T::key_type key; %(for_j_start)s ind_j = index(j, n); %(for_s_start)s %(define_i)s %(bc)s(i, n, i); ind_i = index(i, n); key = std::make_pair(ind_i, ind_j); dat[key] += scale_factor[0] *STENCIL%(d)i(%(s_args)s); %(for_end)s %(for_end)s dat_T::const_iterator iter; int c = 0; for (iter=dat.begin();iter != dat.end();++iter) { IJ2(0, c) = iter->first.first; IJ2(1, c) = iter->first.second; data[c] = iter->second; ++c; } n_ij[0] = c; ''' % reps if scale_factor is None: scale_factor = np.ones(1, dtype=float) else: scale_factor = np.asarray(scale_factor, dtype=float) assert scale_factor.shape == (1, ) args = dict(shape=np.array(shape, dtype='i'), stencil=stencil, ij=ij, n_ij=n_ij, data=data, scale_factor=scale_factor) else: n_dim = len(shape) reps = {} reps['d'] = n_dim reps['for_j_start'] = "\n".join( [("for (j[%(n)i]=0; j[%(n)i]<n[%(n)i]; ++j[%(n)i]) {") % {"n":n} for n in xrange(n_dim)]) reps['for_s_start'] = "\n".join( [("for (s[%(n)i]=0; s[%(n)i]<Nstencil[%(n)i]; ++s[%(n)i]) {") % {"n":n} for n in xrange(n_dim)]) reps['define_i'] = "\n".join( [("i[%(n)i] = j[%(n)i] + s[%(n)i] - (Nstencil[%(n)i] - 1)/2;") % {"n":n} for n in xrange(n_dim)]) reps['for_end'] = "}"*n_dim reps['s_args'] = ", ".join(["s[%i]" % (n, ) for n in xrange(n_dim)]) reps['bc'] = self.boundary_conditions code = r''' // Code for computing the stencil in arbitrary dimensions std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> s(%(d)i); std::vector<int> n(shape, shape + %(d)i); int id, jd, ind_i, ind_j; typedef std::map<std::pair<int, int>, double> dat_T; dat_T dat; dat_T::key_type key; %(for_j_start)s for (jd=0;jd<Nd[1];++jd) { ind_j = index_d(jd, j, n); for (id=0;id<Nd[0];++id) { ind_i = index_d(id, j, n); key = std::make_pair(ind_i, ind_j); dat[key] += D3(id, jd, index(j, n)); } } // Only diagonals have stencil. %(for_s_start)s %(define_i)s %(bc)s(i, n, i); for (jd=0;jd<Nd[1];jd++) { ind_i = index_d(jd, i, n); ind_j = index_d(jd, j, n); key = std::make_pair(ind_i, ind_j); dat[key] += scale_factor[jd] *STENCIL%(d)i(%(s_args)s); } %(for_end)s %(for_end)s dat_T::const_iterator iter; int c = 0; for (iter=dat.begin();iter != dat.end();++iter) { IJ2(0, c) = iter->first.first; IJ2(1, c) = iter->first.second; data[c] = iter->second; ++c; } n_ij[0] = c; ''' % reps if scale_factor is None: scale_factor = np.ones(d.shape[1], dtype=float) else: scale_factor = np.asarray(scale_factor, dtype=float) assert scale_factor.shape == d.shape[1:2] args = dict(d=d, shape=np.array(shape, dtype='i'), stencil=stencil, ij=ij, n_ij=n_ij, data=data, scale_factor=scale_factor) mmf.utils.inline(code, args.keys(), local_dict=args, support_code=Code.index_support_code, verbose=2, headers=["<map>"], language='c++') return sp.sparse.csr_matrix((data[:n_ij], ij[:, :n_ij]), shape=(Nx*Nd, )*2)
[docs]class PoissonBase(object): r"""Base class for some objects representing derivatives.""" d = NotImplemented @classmethod
[docs] def AB(cls, delta_inv, boundary_conditions): r"""Return `(A, B)` the stencil for the operators .. math:: \op{A} &= \left(\mat{\delta}^{-1}\right)^T \left(\op{T} + \op{T}^{T}\right)\mat{\delta}^{-1},\\ \op{B} &= \left(\mat{\delta}^{-1}\right)^T \left(\op{T} + \op{T}^{T}\right)\mat{\delta}^{-1} \left(\mat{\delta}^{-1}\right)^{T} = \op{A}\left(\mat{\delta}^{-1}\right)^T. These are used as follows: .. math:: -\pdiff{\op{\nabla}^2}{\delta_{ij}} &= \op{B}_{ij},\\ \frac{\partial^2\op{\nabla}^2} {\partial\delta_{ij}\partial\delta_{ab}} &= \mat{\delta}^{-1}_{ja}\op{B}_{ib} + \mat{\delta}^{-1}_{bi}\op{B}_{aj} + \left[\mat{\delta}^{-1}\left(\mat{\delta}^{-1} \right)^T\right]_{jb} \op{A}_{ai}. """ _d2 = np.dot(delta_inv, delta_inv.T) A = {} B = {} for i in xrange(cls.d): for j in xrange(cls.d): Sa = np.zeros((3,)*cls.d, dtype=float) Sb = np.zeros((3,)*cls.d, dtype=float) for a in xrange(cls.d): for b in xrange(cls.d): _T2 = (cls.T[a, b].S + cls.T[b, a].S) Sa += (delta_inv.T[i, a]*_T2*delta_inv[b, j]) Sb += (delta_inv.T[i, a]*_T2*_d2[b, j]) A[i, j] = Stencil(S=Sa, boundary_conditions=boundary_conditions) B[i, j] = Stencil(S=Sb, boundary_conditions=boundary_conditions) return A, B
[docs]class Poisson1D(PoissonBase): r"""Some stencils for 1d derivatives. >>> Poisson1D.Laplacian array([ 1., -2., 1.]) """ d = 1 T = {} S = np.zeros((3,)*d, dtype=float) S[:] = np.array([1., -2., 1.], dtype=float) T[0, 0] = Stencil(S=S) Laplacian = sum([T[i,i].S for i in xrange(d)]) del S, i
[docs]class Poisson2D(PoissonBase): r"""Some stencils for 2d derivatives. >>> Poisson2D.Laplacian array([[ 0., 1., 0.], [ 1., -4., 1.], [ 0., 1., 0.]]) # Now check that this is the same in any orthonormal basis >>> np.random.seed(0) >>> d, _R = np.linalg.qr(np.random.random((2,2))) >>> dinv = d.T >>> L = sum(dinv.T[i,j]*Poisson2D.T[j,k].S*dinv[k,i] ... for i in xrange(2) for j in xrange(2) ... for k in xrange(2)) >>> np.allclose(L, Poisson2D.Laplacian) True """ d = 2 T = {} S = np.zeros((3,)*d, dtype=float) S[:, 1] = [1, -2, 1] T[0, 0] = Stencil(S=S) S = 0*S S[1, :] = [1, -2, 1] T[1, 1] = Stencil(S=S) S = 0*S S[:, :] = (1.0/4)*np.array([[1, 0, -1], [0, 0, 0], [-1, 0, 1]]) T[0, 1] = T[1, 0] = Stencil(S=S) Laplacian = sum([T[i,i].S for i in xrange(d)]) del S, i
[docs]class Poisson3D(PoissonBase): r"""Some stencils for 3d derivatives. >>> Poisson3D.Laplacian array([[[ 0., 0., 0.], [ 0., 1., 0.], [ 0., 0., 0.]], <BLANKLINE> [[ 0., 1., 0.], [ 1., -6., 1.], [ 0., 1., 0.]], <BLANKLINE> [[ 0., 0., 0.], [ 0., 1., 0.], [ 0., 0., 0.]]]) # Now check that this is the same in any orthonormal basis >>> np.random.seed(0) >>> d, _R = np.linalg.qr(np.random.random((3,3))) >>> dinv = d.T >>> L = sum(dinv.T[i,j]*Poisson3D.T[j,k].S*dinv[k,i] ... for i in xrange(3) for j in xrange(3) ... for k in xrange(3)) >>> np.allclose(L, Poisson3D.Laplacian) True """ d = 3 T = {} S = np.zeros((3,)*d, dtype=float) S[:, 1, 1] = [1, -2, 1] T[0, 0] = Stencil(S=S) S = 0*S S[1, :, 1] = [1, -2, 1] T[1, 1] = Stencil(S=S) S = 0*S S[1, 1, :] = [1, -2, 1] T[2, 2] = Stencil(S=S) S = 0*S S[:, :, 1] = (1.0/4)*np.array([[1, 0, -1], [0, 0, 0], [-1, 0, 1]]) T[0, 1] = T[1, 0] = Stencil(S=S) S = 0*S S[:, 1, :] = (1.0/4)*np.array([[1, 0, -1], [0, 0, 0], [-1, 0, 1]]) T[0, 2] = T[2, 0] = Stencil(S=S) S = 0*S S[1, :, :] = (1.0/4)*np.array([[1, 0, -1], [0, 0, 0], [-1, 0, 1]]) T[1, 2] = T[2, 1] = Stencil(S=S) Laplacian = sum([T[i,i].S for i in xrange(d)]) del S, i
[docs]class Interpolator(StateVars): r"""Implements an interpolation/restriction pair along a given dimension. This class provides two functions :meth:`I` and :meth:`R` the provide interpolation and restriction functionality for a multigrid algorithm. Examples -------- First we demonstrate periodic boundary conditions with the various offsets and different sized output arrays: >>> i = Interpolator(offset=0, boundary_conditions='periodic') >>> f = np.array([1.0, 2.0, 3.0]) >>> i.IR(f, (5, )) #doctest: +NORMALIZE_WHITESPACE array([ 1. , 1.5, 2. , 2.5, 3. ]) >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 1. , 1.5, 2. , 2.5, 3. , 2. ]) >>> i.offset = -1 >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 2. , 1. , 1.5, 2. , 2.5, 3. ]) >>> i.offset = 1 >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 1.5, 2. , 2.5, 3. , 2. , 1. ]) Now we demonstrate the same with Neumann boundary conditions: >>> i = Interpolator(offset=0, boundary_conditions='Neumann') >>> f = np.array([1.0, 2.0, 3.0]) >>> i.IR(f, (5, )) #doctest: +NORMALIZE_WHITESPACE array([ 1. , 1.5, 2. , 2.5, 3. ]) >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 1. , 1.5, 2. , 2.5, 3. , 3. ]) >>> i.offset = -1 >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 1. , 1. , 1.5, 2. , 2.5, 3. ]) >>> i.offset = 1 >>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE array([ 1.5, 2. , 2.5, 3. , 3. , 3. ]) The operators are constructed explicitly so that `R.T*2 == I`: >>> a = np.zeros((3, 4, 3)) >>> b = np.zeros((3, 8, 3)) >>> I = np.eye(len(b.ravel()), len(a.ravel())) >>> R = np.eye(len(a.ravel()), len(b.ravel())) >>> for n in xrange(len(a.ravel())): ... a.flat[n] = 1 ... I[:, n] = i.IR(a, b.shape).ravel() ... a.flat[n] = 0 >>> for n in xrange(len(b.ravel())): ... b.flat[n] = 1 ... R[:, n] = i.IR(b, a.shape).ravel() ... b.flat[n] = 0 >>> abs(R.T*2 - I).max() 0.0 """ _state_vars = [ ('offset', 0, r""" offset : (0, -1, 1), optional Integer offset for lower endpoint: ``offset == -1`` : Here the first point depends on the augmented array structure: .. math:: \mat{I} = \begin{pmatrix} \makebox[0pt][r]{\ensuremath{\tfrac{1}{2}\quad}} \tfrac{1}{2}\\ 1 \\ \tfrac{1}{2} & \tfrac{1}{2}\\ & & \ddots \end{pmatrix} ``offset == 0`` : .. math:: \mat{I} = \begin{pmatrix} 1\\ \tfrac{1}{2} & \tfrac{1}{2}\\ & 1 \\ & \tfrac{1}{2} & \tfrac{1}{2}\\ & & & \ddots \end{pmatrix} ``offset == 1`` : .. math:: \mat{I} = \begin{pmatrix} \tfrac{1}{2} & \tfrac{1}{2}\\ & 1 \\ & \tfrac{1}{2} & \tfrac{1}{2}\\ & & & \ddots \end{pmatrix} The conditions at the right side depend on the size of the input and output arrays."""), ('boundary_conditions', 'periodic', r"""This must be the name of one of the boundary condition transformation functions defined in :attr:`support_code`."""), ] process_vars()
[docs] def IR(self, f, new_shape): r"""Interpolation/restriction operator. Interpolates (restricts) `f_in` along the axes where `new_shape` has more (fewer) elements than the corresponding dimension of `f_in`. """ assert len(f.shape) == len(new_shape) for n, b in enumerate(new_shape): a = f.shape[n] if a < b: # Interpolate to b shape = list(f.shape) shape[n] = b res = np.empty(shape, dtype=f.dtype) f = self.I_inline(f_in=f, f_out=res, axis=n) elif a > b: # Restrict to b shape = list(f.shape) shape[n] = b res = np.zeros(shape, dtype=f.dtype) f = self.R_inline(f_in=f, f_out=res, axis=n) else: pass return f
[docs] def IR_mat(self, shape, new_shape): r"""Return CSR representation of the interpolation (restriction) operator along the axes where `new_shape` has more (fewer) elements than the corresponding dimension of `shape`. """ IR = None shape = list(shape) for n in xrange(len(shape)): if shape[n] < new_shape[n]: M = self.I_mat(shape, n_new=new_shape[n], axis=n) shape[n] = new_shape[n] elif shape[n] > new_shape[n]: M = self.R_mat(shape, n_new=new_shape[n], axis=n) shape[n] = new_shape[n] else: M = None if M is not None: if IR is None: IR = M else: IR = M*IR return IR
[docs] def R_mat(self, shape, n_new, axis=0): r"""Return a CSR sparse matrix representing the restriction matrix. Parameters ---------- shape : array Shape of input array. n : int New length of restriction axis. axis : int, optional Axis to perform restriction along. (Default is 0). """ new_shape = list(shape) new_shape[axis] = n_new I = self.I_mat(shape=new_shape, n_new=shape[axis], axis=axis) return I.T
[docs] def I_mat(self, shape, n_new, axis=0): r"""Return a CSR sparse matrix representing the interpolation matrix. Parameters ---------- shape : array Shape of input array. n : int New length of interpolated axis. axis : int, optional Axis to perform interpolation along. (Default is 0). Notes ----- This is a bit of generated C++ code that performs the action of I. The code is generated to allow for arbitrary dimensions etc. in an efficient manner. Here is an example of the completed code for a three-dimensional computation with periodic boundary conditions. .. code-block:: c std::vector<int> i(3); std::vector<int> j(3); std::vector<int> j_(3); std::vector<int> new_shape(&shape[0], &shape[3]); new_shape[axis] = n; int ind = 0; for (i[0]=j[0]=0;i[0]<new_shape[0];j[0]=++i[0]) { for (i[1]=j[1]=0;i[1]<new_shape[1];j[1]=++i[1]) { for (i[2]=j[2]=0;i[2]<new_shape[2];j[2]=++i[2]) { indptr[index(i, new_shape)] = ind; if (0 == (2 + i[axis] + offset)%2) { // Direct term j[axis] = (i[axis] + offset)/2; periodic(j, Nf_in, j_); data[ind] = 1.0; indices[ind++] = index(j_, shape); } else { // Interpolation terms j[axis] = (i[axis] + offset - 1)/2; periodic(j, n_i, j_); data[ind] = 0.5; indices[ind++] = index(j_, shape); j[axis] = (i[axis] + offset + 1)/2; periodic(j, n_i, j_); data[ind] = 0.5; indices[ind++] = index(j_, shape); } } } } indptr[index(i, n_i)] = ind; Examples -------- Interpolation should always preserve constant functions: >>> i = Interpolator(offset=0, boundary_conditions='Neumann') >>> f_in = np.ones((4, 3, 3, 3), dtype=float) >>> f_out = np.ones((4, 3, 6, 3), dtype=float) >>> res = i.I_inline(f_in=f_in, f_out=f_out, axis=2) >>> abs(res - 1).max() 0.0 """ d = len(shape) reps = {} reps['d'] = d reps['for_i_start'] = "\n".join( [("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_i[%(n)i];" + "j[%(n)i]=++i[%(n)i]) {") % {"n":n} for n in xrange(d)]) reps['for_i_end'] = "}"*d reps['bc'] = self.boundary_conditions code = r''' std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> j_(%(d)i); std::vector<int> n_j(shape, shape + %(d)i); std::vector<int> n_i(shape, shape + %(d)i); n_i[axis] = n; int ind = 0; %(for_i_start)s indptr[index(i, n_i)] = ind; if (0 == (2 + i[axis] + offset)%%2) { // Direct terms: j[axis] = (i[axis] + offset)/2; %(bc)s(j, n_j, j_); data[ind] = 1.0; indices[ind++] = index(j_, n_j); } else { // Interpolation terms j[axis] = (i[axis] + offset - 1)/2; %(bc)s(j, n_j, j_); data[ind] = 0.5; indices[ind++] = index(j_, n_j); j[axis] = (i[axis] + offset + 1)/2; %(bc)s(j, n_j, j_); data[ind] = 0.5; indices[ind++] = index(j_, n_j); } %(for_i_end)s indptr[prod(n_i)] = ind; ''' % reps shape = np.array(shape, dtype='i') Nj = np.prod(shape) Ni = Nj//shape[axis]*n_new # Compute amount of data (overestimate) nnz = Ni*2+1 offset = self.offset data = np.empty(nnz, dtype=float) indices = np.empty(nnz, dtype=int) indptr = np.empty(Ni+1, dtype=int) mmf.utils.inline(code, ['shape', 'n', 'axis', 'offset', 'data', 'indices', 'indptr'], local_dict=dict(shape=shape, n=n_new, axis=axis, offset=offset, data=data, indices=indices, indptr=indptr), support_code=Code.index_support_code, verbose=2, language='c++') nnz = indptr[-1] return sp.sparse.csr_matrix( (data[:nnz], indices[:nnz], indptr), shape=(Ni, Nj))
[docs] def I_inline(self, f_in, f_out, axis=0): r"""Interpolation operator. Parameters ---------- f_in, f_out : array Pre-allocated arrays of the correct shape representing the input and output (the shape will be used to determine the upper boundary condition). These must have `d` dimensions. The output array need not be initialized. axis : int, optional Axis to perform interpolation along. (Default is 0). Notes ----- This is a bit of generated C++ code that performs the action of I. The code is generated to allow for arbitrary dimensions etc. in an efficient manner. Here is an example of the completed code for a three-dimensional computation with periodic boundary conditions. .. code-block:: c std::vector<int> i(3); std::vector<int> j(3); std::vector<int> j_(3); for (i[0]=j[0]=0;i[0]<n_i[0];j[0]=++i[0]) { for (i[1]=j[1]=0;i[1]<n_i[1];j[1]=++i[1]) { for (i[2]=j[2]=0;i[2]<n_i[2];j[2]=++i[2]) { if (0 == (2 + i[axis] + offset)%2) { // Direct term j[axis] = (i[axis] + offset)/2; periodic(j, Nf_in, j_); F_OUT3(i[0], i[1], i[2]) = F_IN3(j_[0], j_[1], j_[2]); } else { // Interpolation terms j[axis] = (i[axis] + offset - 1)/2; periodic(j, n_i, j_); F_OUT3(i[0], i[1], i[2]) = 0.5*F_IN3(j_[0], j_[1], j_[2]); j[axis] = (i[axis] + offset + 1)/2; periodic(j, n_i, j_); F_OUT3(i[0], i[1], i[2]) += 0.5*F_IN3(j_[0], j_[1], j_[2]); } } } } Examples -------- Interpolation should always preserve constant functions: >>> i = Interpolator(offset=0, boundary_conditions='Neumann') >>> f_in = np.ones((4, 3, 3, 3), dtype=float) >>> f_out = np.ones((4, 3, 6, 3), dtype=float) >>> res = i.I_inline(f_in=f_in, f_out=f_out, axis=2) >>> abs(res - 1).max() 0.0 """ d = len(f_in.shape) reps = {} reps['d'] = d reps['i_s'] = ", ".join(["i[%i]" % (n, ) for n in xrange(d)]) reps['j_s'] = ", ".join(["j_[%i]" % (n, ) for n in xrange(d)]) reps['for_i_start'] = "\n".join( [("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_i[%(n)i];" + "j[%(n)i]=++i[%(n)i]) {") % {"n":n} for n in xrange(d)]) reps['for_i_end'] = "}"*d reps['bc'] = self.boundary_conditions code = r''' std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> j_(%(d)i); std::vector<int> n_i(&Nf_out[0], &Nf_out[%(d)i]); std::vector<int> n_j(&Nf_in[0], &Nf_in[%(d)i]); %(for_i_start)s if (0 == (2 + i[axis] + offset)%%2) { // Direct terms: j[axis] = (i[axis] + offset)/2; %(bc)s(j, n_j, j_); F_OUT%(d)i(%(i_s)s) = F_IN%(d)i(%(j_s)s); } else { // Interpolation terms j[axis] = (i[axis] + offset - 1)/2; %(bc)s(j, n_j, j_); F_OUT%(d)i(%(i_s)s) = 0.5*F_IN%(d)i(%(j_s)s); j[axis] = (i[axis] + offset + 1)/2; %(bc)s(j, n_j, j_); F_OUT%(d)i(%(i_s)s) += 0.5*F_IN%(d)i(%(j_s)s); } %(for_i_end)s ''' % reps offset = self.offset mmf.utils.inline(code, ['f_in', 'f_out', 'axis', 'offset'], local_dict=locals(), support_code=Code.index_support_code, verbose=2, language='c++') return f_out
[docs] def R_inline(self, f_in, f_out, axis=0): r"""Restriction operator. Implements the transpose of :meth:`I`. Parameters ---------- f_in, f_out : array Pre-allocated arrays of the correct shape representing the input and output (the shape will be used to determine the upper boundary condition). These must have `d` dimensions. .. note:: The output array must be initialized to zero. axis : int, optional Axis to perform interpolation along. (Default is 0). Notes ----- The only difference in the code from :meth:`I` are: 1) Exchange of `f_in` and `f_out`. 2) Exchange the indices `i_s` with `j_s`. 3) Turn all `=` into `+=` since each output is no-longer guaranteed to be visited once. This is why we require `f_out` initialized to zero. 4) Normalize by $1/2$. Examples -------- """ d = len(f_in.shape) reps = {} reps['d'] = d reps['i_s'] = ", ".join(["i[%i]" % (n, ) for n in xrange(d)]) reps['j_s'] = ", ".join(["j_[%i]" % (n, ) for n in xrange(d)]) reps['for_i_start'] = "\n".join( [("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_j[%(n)i]; " + "j[%(n)i]=++i[%(n)i]) {") % {"n":n} for n in xrange(d)]) reps['for_i_end'] = "}"*d reps['bc'] = self.boundary_conditions code = r''' std::vector<int> i(%(d)i); std::vector<int> j(%(d)i); std::vector<int> j_(%(d)i); std::vector<int> n_i(&Nf_out[0], &Nf_out[%(d)i]); std::vector<int> n_j(&Nf_in[0], &Nf_in[%(d)i]); %(for_i_start)s if (0 == (2 + i[axis] + offset)%%2) { // Direct terms: j[axis] = (i[axis] + offset)/2; %(bc)s(j, n_i, j_); F_OUT%(d)i(%(j_s)s) += F_IN%(d)i(%(i_s)s); } else { // Interpolation terms j[axis] = (i[axis] + offset - 1)/2; %(bc)s(j, n_i, j_); F_OUT%(d)i(%(j_s)s) += 0.5*F_IN%(d)i(%(i_s)s); j[axis] = (i[axis] + offset + 1)/2; %(bc)s(j, n_i, j_); F_OUT%(d)i(%(j_s)s) += 0.5*F_IN%(d)i(%(i_s)s); } %(for_i_end)s ''' % reps offset = self.offset mmf.utils.inline(code, ['f_in', 'f_out', 'axis', 'offset'], local_dict=locals(), support_code=Code.index_support_code, verbose=2, language='c++') f_out /= 2 return f_out
[docs]class Code(object): """Various pieces of C++ code and code generators.""" @staticmethod
[docs] def solve(N, a_var='a', b_var='b'): r"""Return `(support_code, code, libs, dirs)` for solving `a*x = b`. Parameters ---------- N : int Length of `b`. a_var : str, optional String with the name of the array `a`. b_var : str, optional String with the name of the array `b`. Returns ------- support_code : str Add this to the support code for `weave.inline`. This includes the libraries etc. pre_code : str Insert this near the start of your code. This allocates the array `a` and defines the accesser function solve_code : str Insert this into your code where you need to solve the system. The following must be pre-allocated and assigned:: double a[N][N]; // Done by pre_code double b[N]; One should assign to `a` as `a[j][[i] = a[i, j]` so that Fortan order is provided as required. The result will be computed in place in the array `b` which m. libs : [str] Append these to the `libraries` argument for `weave.inline`. These are the required libs. dirs : [str] Append these to the `linrary_dirs` argument for `weave.inline`. These locate the libraries. Examples -------- >>> a = np.array([[1, 2, 3, 4], ... [1, 3, 5, 7], ... [1, -2, 3, -4], ... [1, 2, -3, -4]], dtype=float) >>> b = np.array([0, 1, 2, 3], dtype=float) >>> support_code, pre_code, solve_code, libs, dirs = Code.solve(4) >>> code = r''' ... %(pre_code)s ... int i, j; ... for (i=0;i<4;i++) { ... for (j=0;j<4;j++) { ... A2(i, j) = A_IN2(i, j); ... } ... } ... %(solve_code)s ... ''' % dict(pre_code=pre_code, solve_code=solve_code) >>> sp.weave.inline(code=code, arg_names=['a_in', 'b'], ... local_dict=dict(a_in=a, b=b), ... support_code=support_code, ... libraries=libs, library_dirs=dirs) >>> b #doctest: +NORMALIZE_WHITESPACE array([-3.5, 2.5, 1.5, -1.5]) >> x = np.linalg.solve(a, b) >> y = lu_solve(a, b) >> np.allclose(a, y) True """ support_code = """ extern "C" { void dgesv_(int *N, int *NRHS, double* A, int *LDA, int *IPIV, double *B, int *LDB, int *INFO); } """ pre_code = r''' double %(a)s[%(N)i*%(N)i]; // Ensures fortran ordering for solver. #define %(A)s2(i, j) (%(a)s[%(N)i*(j) + (i)]) ''' % dict(N=N, a=a_var, A=a_var.upper(), b=b_var) solve_code = r''' { int ok, flag = 1; int size = %(N)i; int p[%(N)i]; flag = 1; dgesv_(&size, &flag, %(a)s, &size, p, %(b)s, &size, &ok); } ''' % dict(N=N, a=a_var, b=b_var) libs = ['lapack'] dirs = [] return (support_code, pre_code, solve_code, libs, dirs)
index_support_code = r''' #include <vector> inline void periodic(const std::vector<int> &in, const std::vector<int> &n, std::vector<int> &out) { for (int c=0;c<in.size();++c) { out[c] = (in[c] + n[c]) % n[c]; } }; inline void Neumann(const std::vector<int> &in, const std::vector<int> &n, std::vector<int> &out) { for (int c=0;c<in.size();++c) { if (in[c] < 0) { out[c] = -(in[c] + 1); } else if (in[c] >= n[c]) { out[c] = 2*n[c] - in[c] - 1; } else { out[c] = in[c]; } } }; // Full parity symmetry. Only half the points in the // first dimension should be specified. // // (nx + 2Nx, ny + Ny, nz + Nx) = (nx, ny, nz) // (-nx, ny, nz) = (nx - 1, Ny - ny - 1, Nz - nz - 1) inline void parity(const std::vector<int> &in, const std::vector<int> &n, std::vector<int> &out) { int c; // First bring all other axes into box with periodic // boundary conditions. for (int c=1;c<in.size();++c) { out[c] = (in[c] + n[c]) % n[c]; } // Now deal with x-axis out[0] = in[0]; if (out[0] >= n[0]) { out[0] = out[0] - 2*n[0]; } if (out[0] < 0) { out[0] = -(out[0] + 1); for (c=1;c<in.size();++c) { out[c] = n[c] - out[c] - 1; } } }; // Symmetry consistent with sheared lattice. Includes // parity (Neumann boundary conditions) along the x // direction and parity about the origin. // Only half the points should be specified // in the first-two directions. Any shear should be // applied in the third dimension. // // (nx + 2Nx, ny + Ny, nz + Nx) = (nx, ny, nz) // (-nx, ny, nz) = (nx - 1, ny, nz) // (nx, -ny, -nz) = (nx, ny - 1, nz -1) // (nx, -ny, nz) = (nx, ny - 1, Nz - nz - 1) inline void shear(const std::vector<int> &in, const std::vector<int> &n, std::vector<int> &out) { // First bring all other axes into box with periodic // boundary conditions. int c; for (c=2;c<in.size();++c) { out[c] = (in[c] + n[c]) % n[c]; } // Now deal with x-axis if (in[0] < 0) { out[0] = -(in[0] + 1); } else if (in[0] >= n[0]) { out[0] = 2*n[0] - in[0] - 1; } else { out[0] = in[0]; } // Now deal with y-axis out[1] = in[1]; if (out[1] >= n[1]) { out[1] = out[1] - 2*n[1]; } if (out[1] < 0) { out[1] = -(out[1] + 1); for (c=2;c<in.size();++c) { out[c] = n[c] - out[c] - 1; } } }; // Compute the linear index. inline int index(const std::vector<int> &i, const std::vector<int> &n) { int c; int res = i[0]; for (c=1;c<i.size();++c) { res *= n[c]; res += i[c]; } return res; }; // Compute the linear index with first dimension res = d. inline int index_d(int res, const std::vector<int> &i, const std::vector<int> &n) { int c; for (c=0;c<i.size();++c) { res *= n[c]; res += i[c]; } return res; }; template<typename T> inline T prod(const std::vector<T> &x) { typename std::vector<T>::const_iterator iter; T res = 1; for (iter=x.begin();iter<x.end();++iter) { res *= *iter; } return res; };'''