r"""Stencil Module (:mod:`mmf.math.multigrid.stencil`)
This module provides access to matrices and operators defined by a
stencil.
.. autosummary::
Stencil
Interpolator
.. autoclass:: Code
:members:
"""
from __future__ import division
__all__ = ['Stencil', 'Interpolator', 'Code',
'PoissonBase','Poisson1D', 'Poisson2D', 'Poisson3D']
import warnings
import copy
import numpy as np
import scipy as sp
import scipy.sparse.linalg
import scipy.weave
from scipy.lib.blas import get_blas_funcs
import pylab as pl
import matplotlib.pyplot as plt
import random
from numpy import pi
from mmf.objects import StateVars, process_vars
from mmf.objects import Computed, Required, ClassVar
import mmf.utils
_inner = (slice(1, -1, None), ) # Slice to remove boundary.
_abs_tol = 1e-8
_rel_tol = 1e-8
_tiny = np.finfo(float).tiny
[docs]class Stencil(StateVars):
r"""Represents a matrix given a stencil.
This class uses a stencil `S` together with a boundary condition
to represent a matrix. With extended index notation, the matrix
is:
.. math::
A_{a, \vect{i}, b, \vect{i} + \vect{d}} =
\delta_{a, b}s_{a}S_{\vect{d}}
+ \delta_{\vect{d}, \vect{0}} D_{a, b, \vect{i}}
The motivation behind this was representing the Jacobian of the
following system of equations:
.. math::
-s_{a}\nabla^2 \phi_{a}(x) + F_{a}(\vect{\phi}) = 0
where the right-hand side is a local function of the various
variables $\phi_i(x)$ and the stencil $S$ represents the operator
$-\nabla^2$ on the left-hand side. The tensor $D$ is
.. math::
D_{a, b, x} = \pdiff{F_{a}(\vect{\phi})}{\phi_{b}}(x).
and $s_{a}$ is a set of scale factors.
"""
_state_vars = [
('boundary_conditions', 'periodic',
"One of `'periodic'` or `'Neumann'`"),
('S', Required, "3x3x... stencil."),
]
process_vars()
def __init__(self, *v, **kw):
self.S = np.asarray(self.S)
def apply(self, x, D=None, scale_factor=None):
r"""Apply the stencil to `x`.
Parameters
----------
x : array
Input. Must have shape `(N, )*d` if `D` is `None` or `(n, )
+ (N, )*d` otherwise.
D : None, array, optional
Diagonals. Must have shape `(n, n) + (N, )*d`
scale_factor : array
Scale factors for stencil. Must have shape `(n, )`.
"""
x_shape = x.shape
if D is None:
nd = 1
d = len(x_shape)
x = x.reshape((nd, ) + x_shape)
use_D = False
D = np.empty((1, )*(d+2), dtype=float)
else:
d = len(x_shape) - 1
use_D = True
if scale_factor is None:
scale_factor = np.ones(D.shape[1], dtype=float)
else:
scale_factor = np.asarray(scale_factor, dtype=float)
assert scale_factor.shape == D.shape[1:2]
y = np.empty(x_shape, dtype=x.dtype)
reps = dict(d=d, d1=d+1, d2=d+2)
reps['counters'] = ('int id, jd, %s;'
% (', '.join(["ix%(n)i, is%(n)i" % {'n':n}
for n in xrange(d)]), ))
reps['ix_s'] = ", ".join(["ix%i" % (n, ) for n in xrange(d)])
reps['is_s'] = ", ".join(["is%i" % (n, ) for n in xrange(d)])
reps['j_s'] = ", ".join(["j[%i]" % (n, ) for n in xrange(d)])
reps['for_ix'] = "\n".join(
[("for (ix%(n)i=0; ix%(n)i<Nx[%(n)i+1];++ix%(n)i) {") % {'n':n}
for n in xrange(d)])
reps['for_is'] = "\n".join(
[(r'''for (is%(n)i=0; is%(n)i<Nstencil[%(n)i];++is%(n)i) {
i[%(n)i] = ix%(n)i + is%(n)i - Nstencil[%(n)i]/2;''')
% {'n':n}
for n in xrange(d)])
reps['for_ix_end'] = "}"*d
reps['for_is_end'] = "}"*d
reps['bc'] = self.boundary_conditions
def define(n, name):
"""Return a string defining the macro for access into the
array `name` with `n` dimensions. Use for `n>4` as the
macros for `n<5` are already provided by weave.
"""
return ("#define %s%i(%s) (*((double*)(%s_array->data + %s)))" %
(name.upper(),
n,
', '.join(['i__%i' % (i, )
for i in xrange(n)]),
name,
'+'.join(['(i__%i)*S%s[%i]' % (i, name, i)
for i in xrange(n)])))
reps['defines'] = define(5, 'D')
code = r'''
%(defines)s
%(counters)s
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> nx(&Nx[1], &Nx[%(d)i+1]);
for (id=0;id<Nx[0];++id) {
%(for_ix)s
Y%(d1)i(id, %(ix_s)s) = 0;
%(for_is)s
%(bc)s(i, nx, j);
Y%(d1)i(id, %(ix_s)s)
+= scale_factor[id]
*STENCIL%(d)i(%(is_s)s)*X%(d1)i(id, %(j_s)s);
%(for_is_end)s
if (use_D) {
for (jd=0;jd<Nx[0];++jd) {
Y%(d1)i(id, %(ix_s)s)
+= D%(d2)i(id, jd, %(ix_s)s)*X%(d1)i(jd, %(ix_s)s);
}
}
%(for_ix_end)s
}''' % reps
local_dict = dict(x=x,
y=y,
stencil=self.S,
D=D,
use_D=int(use_D),
scale_factor=scale_factor)
mmf.utils.inline(code,
local_dict.keys(),
local_dict=local_dict,
support_code=Code.index_support_code,
verbose=2,
language='c++')
return y.reshape(x.shape)
def to_mat(self, shape, D=None, scale_factor=None):
r"""Version using :mod:`scipy.weave`.
Parameters
----------
shape : array
Shape of abscissa `x.shape`. Must be `(N, )*d`.
D : None, array, optional
Diagonals. Must have shape `(n, n) + (N, )*d`
scale_factor : array
Scale factors for stencil. Must have shape `(n, )`.
"""
# Estimate number of entries
Ns = np.prod(self.S.shape)
Nx = np.prod(shape)
Nd = 1
if D is not None:
assert 2 + len(shape) == len(D.shape)
assert shape == D.shape[2:]
assert D.shape[0] == D.shape[1]
Nd = D.shape[0]
D = D.reshape((Nd, Nd, Nx))
n_ij = (Nx*Ns*Nd # Diagonal blocks
+ Nx*Nd*(Nd-1)) # Off-diagonal blocks are diagonal
ij = np.empty((2, n_ij), dtype=int)
data = np.empty(n_ij, dtype=float)
stencil = self.S
periodic = int('periodic' == self.boundary_conditions)
n_ij = np.array(0)
d = D
if d is None:
n_dim = len(shape)
reps = {}
reps['d'] = n_dim
reps['for_j_start'] = "\n".join(
[("for (j[%(n)i]=0; j[%(n)i]<n[%(n)i]; ++j[%(n)i]) {")
% {"n":n} for n in xrange(n_dim)])
reps['for_s_start'] = "\n".join(
[("for (s[%(n)i]=0; s[%(n)i]<Nstencil[%(n)i]; ++s[%(n)i]) {")
% {"n":n} for n in xrange(n_dim)])
reps['define_i'] = "\n".join(
[("i[%(n)i] = j[%(n)i] + s[%(n)i] - (Nstencil[%(n)i] - 1)/2;")
% {"n":n} for n in xrange(n_dim)])
reps['for_end'] = "}"*n_dim
reps['s_args'] = ", ".join(["s[%i]" % (n, ) for n in xrange(n_dim)])
reps['bc'] = self.boundary_conditions
code = r'''
// Code for computing the stencil in arbitrary dimensions
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> s(%(d)i);
std::vector<int> n(shape, shape + %(d)i);
int ind_i, ind_j;
typedef std::map<std::pair<int, int>, double> dat_T;
dat_T dat;
dat_T::key_type key;
%(for_j_start)s
ind_j = index(j, n);
%(for_s_start)s
%(define_i)s
%(bc)s(i, n, i);
ind_i = index(i, n);
key = std::make_pair(ind_i, ind_j);
dat[key] += scale_factor[0]
*STENCIL%(d)i(%(s_args)s);
%(for_end)s
%(for_end)s
dat_T::const_iterator iter;
int c = 0;
for (iter=dat.begin();iter != dat.end();++iter) {
IJ2(0, c) = iter->first.first;
IJ2(1, c) = iter->first.second;
data[c] = iter->second;
++c;
}
n_ij[0] = c;
''' % reps
if scale_factor is None:
scale_factor = np.ones(1, dtype=float)
else:
scale_factor = np.asarray(scale_factor, dtype=float)
assert scale_factor.shape == (1, )
args = dict(shape=np.array(shape, dtype='i'),
stencil=stencil,
ij=ij,
n_ij=n_ij,
data=data,
scale_factor=scale_factor)
else:
n_dim = len(shape)
reps = {}
reps['d'] = n_dim
reps['for_j_start'] = "\n".join(
[("for (j[%(n)i]=0; j[%(n)i]<n[%(n)i]; ++j[%(n)i]) {")
% {"n":n} for n in xrange(n_dim)])
reps['for_s_start'] = "\n".join(
[("for (s[%(n)i]=0; s[%(n)i]<Nstencil[%(n)i]; ++s[%(n)i]) {")
% {"n":n} for n in xrange(n_dim)])
reps['define_i'] = "\n".join(
[("i[%(n)i] = j[%(n)i] + s[%(n)i] - (Nstencil[%(n)i] - 1)/2;")
% {"n":n} for n in xrange(n_dim)])
reps['for_end'] = "}"*n_dim
reps['s_args'] = ", ".join(["s[%i]" % (n, ) for n in xrange(n_dim)])
reps['bc'] = self.boundary_conditions
code = r'''
// Code for computing the stencil in arbitrary dimensions
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> s(%(d)i);
std::vector<int> n(shape, shape + %(d)i);
int id, jd, ind_i, ind_j;
typedef std::map<std::pair<int, int>, double> dat_T;
dat_T dat;
dat_T::key_type key;
%(for_j_start)s
for (jd=0;jd<Nd[1];++jd) {
ind_j = index_d(jd, j, n);
for (id=0;id<Nd[0];++id) {
ind_i = index_d(id, j, n);
key = std::make_pair(ind_i, ind_j);
dat[key] += D3(id, jd, index(j, n));
}
}
// Only diagonals have stencil.
%(for_s_start)s
%(define_i)s
%(bc)s(i, n, i);
for (jd=0;jd<Nd[1];jd++) {
ind_i = index_d(jd, i, n);
ind_j = index_d(jd, j, n);
key = std::make_pair(ind_i, ind_j);
dat[key] += scale_factor[jd]
*STENCIL%(d)i(%(s_args)s);
}
%(for_end)s
%(for_end)s
dat_T::const_iterator iter;
int c = 0;
for (iter=dat.begin();iter != dat.end();++iter) {
IJ2(0, c) = iter->first.first;
IJ2(1, c) = iter->first.second;
data[c] = iter->second;
++c;
}
n_ij[0] = c;
''' % reps
if scale_factor is None:
scale_factor = np.ones(d.shape[1], dtype=float)
else:
scale_factor = np.asarray(scale_factor, dtype=float)
assert scale_factor.shape == d.shape[1:2]
args = dict(d=d,
shape=np.array(shape, dtype='i'),
stencil=stencil,
ij=ij,
n_ij=n_ij,
data=data,
scale_factor=scale_factor)
mmf.utils.inline(code,
args.keys(),
local_dict=args,
support_code=Code.index_support_code,
verbose=2,
headers=["<map>"],
language='c++')
return sp.sparse.csr_matrix((data[:n_ij], ij[:, :n_ij]),
shape=(Nx*Nd, )*2)
[docs]class PoissonBase(object):
r"""Base class for some objects representing derivatives."""
d = NotImplemented
@classmethod
[docs] def AB(cls, delta_inv, boundary_conditions):
r"""Return `(A, B)` the stencil for the operators
.. math::
\op{A} &= \left(\mat{\delta}^{-1}\right)^T
\left(\op{T} + \op{T}^{T}\right)\mat{\delta}^{-1},\\
\op{B} &= \left(\mat{\delta}^{-1}\right)^T
\left(\op{T} + \op{T}^{T}\right)\mat{\delta}^{-1}
\left(\mat{\delta}^{-1}\right)^{T}
= \op{A}\left(\mat{\delta}^{-1}\right)^T.
These are used as follows:
.. math::
-\pdiff{\op{\nabla}^2}{\delta_{ij}} &= \op{B}_{ij},\\
\frac{\partial^2\op{\nabla}^2}
{\partial\delta_{ij}\partial\delta_{ab}}
&= \mat{\delta}^{-1}_{ja}\op{B}_{ib}
+ \mat{\delta}^{-1}_{bi}\op{B}_{aj}
+ \left[\mat{\delta}^{-1}\left(\mat{\delta}^{-1}
\right)^T\right]_{jb}
\op{A}_{ai}.
"""
_d2 = np.dot(delta_inv, delta_inv.T)
A = {}
B = {}
for i in xrange(cls.d):
for j in xrange(cls.d):
Sa = np.zeros((3,)*cls.d, dtype=float)
Sb = np.zeros((3,)*cls.d, dtype=float)
for a in xrange(cls.d):
for b in xrange(cls.d):
_T2 = (cls.T[a, b].S + cls.T[b, a].S)
Sa += (delta_inv.T[i, a]*_T2*delta_inv[b, j])
Sb += (delta_inv.T[i, a]*_T2*_d2[b, j])
A[i, j] = Stencil(S=Sa, boundary_conditions=boundary_conditions)
B[i, j] = Stencil(S=Sb, boundary_conditions=boundary_conditions)
return A, B
[docs]class Poisson1D(PoissonBase):
r"""Some stencils for 1d derivatives.
>>> Poisson1D.Laplacian
array([ 1., -2., 1.])
"""
d = 1
T = {}
S = np.zeros((3,)*d, dtype=float)
S[:] = np.array([1., -2., 1.], dtype=float)
T[0, 0] = Stencil(S=S)
Laplacian = sum([T[i,i].S for i in xrange(d)])
del S, i
[docs]class Poisson2D(PoissonBase):
r"""Some stencils for 2d derivatives.
>>> Poisson2D.Laplacian
array([[ 0., 1., 0.],
[ 1., -4., 1.],
[ 0., 1., 0.]])
# Now check that this is the same in any orthonormal basis
>>> np.random.seed(0)
>>> d, _R = np.linalg.qr(np.random.random((2,2)))
>>> dinv = d.T
>>> L = sum(dinv.T[i,j]*Poisson2D.T[j,k].S*dinv[k,i]
... for i in xrange(2) for j in xrange(2)
... for k in xrange(2))
>>> np.allclose(L, Poisson2D.Laplacian)
True
"""
d = 2
T = {}
S = np.zeros((3,)*d, dtype=float)
S[:, 1] = [1, -2, 1]
T[0, 0] = Stencil(S=S)
S = 0*S
S[1, :] = [1, -2, 1]
T[1, 1] = Stencil(S=S)
S = 0*S
S[:, :] = (1.0/4)*np.array([[1, 0, -1],
[0, 0, 0],
[-1, 0, 1]])
T[0, 1] = T[1, 0] = Stencil(S=S)
Laplacian = sum([T[i,i].S for i in xrange(d)])
del S, i
[docs]class Poisson3D(PoissonBase):
r"""Some stencils for 3d derivatives.
>>> Poisson3D.Laplacian
array([[[ 0., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 0.]],
<BLANKLINE>
[[ 0., 1., 0.],
[ 1., -6., 1.],
[ 0., 1., 0.]],
<BLANKLINE>
[[ 0., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 0.]]])
# Now check that this is the same in any orthonormal basis
>>> np.random.seed(0)
>>> d, _R = np.linalg.qr(np.random.random((3,3)))
>>> dinv = d.T
>>> L = sum(dinv.T[i,j]*Poisson3D.T[j,k].S*dinv[k,i]
... for i in xrange(3) for j in xrange(3)
... for k in xrange(3))
>>> np.allclose(L, Poisson3D.Laplacian)
True
"""
d = 3
T = {}
S = np.zeros((3,)*d, dtype=float)
S[:, 1, 1] = [1, -2, 1]
T[0, 0] = Stencil(S=S)
S = 0*S
S[1, :, 1] = [1, -2, 1]
T[1, 1] = Stencil(S=S)
S = 0*S
S[1, 1, :] = [1, -2, 1]
T[2, 2] = Stencil(S=S)
S = 0*S
S[:, :, 1] = (1.0/4)*np.array([[1, 0, -1],
[0, 0, 0],
[-1, 0, 1]])
T[0, 1] = T[1, 0] = Stencil(S=S)
S = 0*S
S[:, 1, :] = (1.0/4)*np.array([[1, 0, -1],
[0, 0, 0],
[-1, 0, 1]])
T[0, 2] = T[2, 0] = Stencil(S=S)
S = 0*S
S[1, :, :] = (1.0/4)*np.array([[1, 0, -1],
[0, 0, 0],
[-1, 0, 1]])
T[1, 2] = T[2, 1] = Stencil(S=S)
Laplacian = sum([T[i,i].S for i in xrange(d)])
del S, i
[docs]class Interpolator(StateVars):
r"""Implements an interpolation/restriction pair along a given
dimension.
This class provides two functions :meth:`I` and :meth:`R` the
provide interpolation and restriction functionality for a
multigrid algorithm.
Examples
--------
First we demonstrate periodic boundary conditions with the various
offsets and different sized output arrays:
>>> i = Interpolator(offset=0, boundary_conditions='periodic')
>>> f = np.array([1.0, 2.0, 3.0])
>>> i.IR(f, (5, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1. , 1.5, 2. , 2.5, 3. ])
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1. , 1.5, 2. , 2.5, 3. , 2. ])
>>> i.offset = -1
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 2. , 1. , 1.5, 2. , 2.5, 3. ])
>>> i.offset = 1
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1.5, 2. , 2.5, 3. , 2. , 1. ])
Now we demonstrate the same with Neumann boundary conditions:
>>> i = Interpolator(offset=0, boundary_conditions='Neumann')
>>> f = np.array([1.0, 2.0, 3.0])
>>> i.IR(f, (5, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1. , 1.5, 2. , 2.5, 3. ])
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1. , 1.5, 2. , 2.5, 3. , 3. ])
>>> i.offset = -1
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1. , 1. , 1.5, 2. , 2.5, 3. ])
>>> i.offset = 1
>>> i.IR(f, (6, )) #doctest: +NORMALIZE_WHITESPACE
array([ 1.5, 2. , 2.5, 3. , 3. , 3. ])
The operators are constructed explicitly so that `R.T*2 == I`:
>>> a = np.zeros((3, 4, 3))
>>> b = np.zeros((3, 8, 3))
>>> I = np.eye(len(b.ravel()), len(a.ravel()))
>>> R = np.eye(len(a.ravel()), len(b.ravel()))
>>> for n in xrange(len(a.ravel())):
... a.flat[n] = 1
... I[:, n] = i.IR(a, b.shape).ravel()
... a.flat[n] = 0
>>> for n in xrange(len(b.ravel())):
... b.flat[n] = 1
... R[:, n] = i.IR(b, a.shape).ravel()
... b.flat[n] = 0
>>> abs(R.T*2 - I).max()
0.0
"""
_state_vars = [
('offset', 0,
r"""
offset : (0, -1, 1), optional
Integer offset for lower endpoint:
``offset == -1`` : Here the first point depends on the
augmented array structure:
.. math::
\mat{I} = \begin{pmatrix}
\makebox[0pt][r]{\ensuremath{\tfrac{1}{2}\quad}}
\tfrac{1}{2}\\
1 \\
\tfrac{1}{2} & \tfrac{1}{2}\\
& & \ddots
\end{pmatrix}
``offset == 0`` :
.. math::
\mat{I} = \begin{pmatrix}
1\\
\tfrac{1}{2} & \tfrac{1}{2}\\
& 1 \\
& \tfrac{1}{2} & \tfrac{1}{2}\\
& & & \ddots
\end{pmatrix}
``offset == 1`` :
.. math::
\mat{I} = \begin{pmatrix}
\tfrac{1}{2} & \tfrac{1}{2}\\
& 1 \\
& \tfrac{1}{2} & \tfrac{1}{2}\\
& & & \ddots
\end{pmatrix}
The conditions at the right side depend on the size of the
input and output arrays."""),
('boundary_conditions', 'periodic',
r"""This must be the name of one of the boundary condition
transformation functions defined in :attr:`support_code`."""),
]
process_vars()
[docs] def IR(self, f, new_shape):
r"""Interpolation/restriction operator.
Interpolates (restricts) `f_in` along the axes where
`new_shape` has more (fewer) elements than the corresponding
dimension of `f_in`.
"""
assert len(f.shape) == len(new_shape)
for n, b in enumerate(new_shape):
a = f.shape[n]
if a < b:
# Interpolate to b
shape = list(f.shape)
shape[n] = b
res = np.empty(shape, dtype=f.dtype)
f = self.I_inline(f_in=f, f_out=res, axis=n)
elif a > b:
# Restrict to b
shape = list(f.shape)
shape[n] = b
res = np.zeros(shape, dtype=f.dtype)
f = self.R_inline(f_in=f, f_out=res, axis=n)
else:
pass
return f
[docs] def IR_mat(self, shape, new_shape):
r"""Return CSR representation of the interpolation
(restriction) operator along the axes where `new_shape` has
more (fewer) elements than the corresponding dimension of
`shape`.
"""
IR = None
shape = list(shape)
for n in xrange(len(shape)):
if shape[n] < new_shape[n]:
M = self.I_mat(shape, n_new=new_shape[n], axis=n)
shape[n] = new_shape[n]
elif shape[n] > new_shape[n]:
M = self.R_mat(shape, n_new=new_shape[n], axis=n)
shape[n] = new_shape[n]
else:
M = None
if M is not None:
if IR is None:
IR = M
else:
IR = M*IR
return IR
[docs] def R_mat(self, shape, n_new, axis=0):
r"""Return a CSR sparse matrix representing the restriction matrix.
Parameters
----------
shape : array
Shape of input array.
n : int
New length of restriction axis.
axis : int, optional
Axis to perform restriction along. (Default is 0).
"""
new_shape = list(shape)
new_shape[axis] = n_new
I = self.I_mat(shape=new_shape, n_new=shape[axis], axis=axis)
return I.T
[docs] def I_mat(self, shape, n_new, axis=0):
r"""Return a CSR sparse matrix representing the interpolation
matrix.
Parameters
----------
shape : array
Shape of input array.
n : int
New length of interpolated axis.
axis : int, optional
Axis to perform interpolation along. (Default is 0).
Notes
-----
This is a bit of generated C++ code that performs the action
of I. The code is generated to allow for arbitrary dimensions
etc. in an efficient manner. Here is an example of the
completed code for a three-dimensional computation with
periodic boundary conditions.
.. code-block:: c
std::vector<int> i(3);
std::vector<int> j(3);
std::vector<int> j_(3);
std::vector<int> new_shape(&shape[0], &shape[3]);
new_shape[axis] = n;
int ind = 0;
for (i[0]=j[0]=0;i[0]<new_shape[0];j[0]=++i[0]) {
for (i[1]=j[1]=0;i[1]<new_shape[1];j[1]=++i[1]) {
for (i[2]=j[2]=0;i[2]<new_shape[2];j[2]=++i[2]) {
indptr[index(i, new_shape)] = ind;
if (0 == (2 + i[axis] + offset)%2) {
// Direct term
j[axis] = (i[axis] + offset)/2;
periodic(j, Nf_in, j_);
data[ind] = 1.0;
indices[ind++] = index(j_, shape);
} else {
// Interpolation terms
j[axis] = (i[axis] + offset - 1)/2;
periodic(j, n_i, j_);
data[ind] = 0.5;
indices[ind++] = index(j_, shape);
j[axis] = (i[axis] + offset + 1)/2;
periodic(j, n_i, j_);
data[ind] = 0.5;
indices[ind++] = index(j_, shape);
}
}
}
}
indptr[index(i, n_i)] = ind;
Examples
--------
Interpolation should always preserve constant functions:
>>> i = Interpolator(offset=0, boundary_conditions='Neumann')
>>> f_in = np.ones((4, 3, 3, 3), dtype=float)
>>> f_out = np.ones((4, 3, 6, 3), dtype=float)
>>> res = i.I_inline(f_in=f_in, f_out=f_out, axis=2)
>>> abs(res - 1).max()
0.0
"""
d = len(shape)
reps = {}
reps['d'] = d
reps['for_i_start'] = "\n".join(
[("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_i[%(n)i];" +
"j[%(n)i]=++i[%(n)i]) {") % {"n":n}
for n in xrange(d)])
reps['for_i_end'] = "}"*d
reps['bc'] = self.boundary_conditions
code = r'''
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> j_(%(d)i);
std::vector<int> n_j(shape, shape + %(d)i);
std::vector<int> n_i(shape, shape + %(d)i);
n_i[axis] = n;
int ind = 0;
%(for_i_start)s
indptr[index(i, n_i)] = ind;
if (0 == (2 + i[axis] + offset)%%2) {
// Direct terms:
j[axis] = (i[axis] + offset)/2;
%(bc)s(j, n_j, j_);
data[ind] = 1.0;
indices[ind++] = index(j_, n_j);
} else {
// Interpolation terms
j[axis] = (i[axis] + offset - 1)/2;
%(bc)s(j, n_j, j_);
data[ind] = 0.5;
indices[ind++] = index(j_, n_j);
j[axis] = (i[axis] + offset + 1)/2;
%(bc)s(j, n_j, j_);
data[ind] = 0.5;
indices[ind++] = index(j_, n_j);
}
%(for_i_end)s
indptr[prod(n_i)] = ind;
''' % reps
shape = np.array(shape, dtype='i')
Nj = np.prod(shape)
Ni = Nj//shape[axis]*n_new
# Compute amount of data (overestimate)
nnz = Ni*2+1
offset = self.offset
data = np.empty(nnz, dtype=float)
indices = np.empty(nnz, dtype=int)
indptr = np.empty(Ni+1, dtype=int)
mmf.utils.inline(code,
['shape', 'n', 'axis', 'offset',
'data', 'indices', 'indptr'],
local_dict=dict(shape=shape, n=n_new, axis=axis,
offset=offset, data=data,
indices=indices, indptr=indptr),
support_code=Code.index_support_code,
verbose=2,
language='c++')
nnz = indptr[-1]
return sp.sparse.csr_matrix(
(data[:nnz], indices[:nnz], indptr),
shape=(Ni, Nj))
[docs] def I_inline(self, f_in, f_out, axis=0):
r"""Interpolation operator.
Parameters
----------
f_in, f_out : array
Pre-allocated arrays of the correct shape representing the
input and output (the shape will be used to determine the
upper boundary condition). These must have `d` dimensions.
The output array need not be initialized.
axis : int, optional
Axis to perform interpolation along. (Default is 0).
Notes
-----
This is a bit of generated C++ code that performs the action
of I. The code is generated to allow for arbitrary dimensions
etc. in an efficient manner. Here is an example of the
completed code for a three-dimensional computation with
periodic boundary conditions.
.. code-block:: c
std::vector<int> i(3);
std::vector<int> j(3);
std::vector<int> j_(3);
for (i[0]=j[0]=0;i[0]<n_i[0];j[0]=++i[0]) {
for (i[1]=j[1]=0;i[1]<n_i[1];j[1]=++i[1]) {
for (i[2]=j[2]=0;i[2]<n_i[2];j[2]=++i[2]) {
if (0 == (2 + i[axis] + offset)%2) {
// Direct term
j[axis] = (i[axis] + offset)/2;
periodic(j, Nf_in, j_);
F_OUT3(i[0], i[1], i[2]) = F_IN3(j_[0], j_[1], j_[2]);
} else {
// Interpolation terms
j[axis] = (i[axis] + offset - 1)/2;
periodic(j, n_i, j_);
F_OUT3(i[0], i[1], i[2]) = 0.5*F_IN3(j_[0], j_[1], j_[2]);
j[axis] = (i[axis] + offset + 1)/2;
periodic(j, n_i, j_);
F_OUT3(i[0], i[1], i[2]) += 0.5*F_IN3(j_[0], j_[1], j_[2]);
}
}
}
}
Examples
--------
Interpolation should always preserve constant functions:
>>> i = Interpolator(offset=0, boundary_conditions='Neumann')
>>> f_in = np.ones((4, 3, 3, 3), dtype=float)
>>> f_out = np.ones((4, 3, 6, 3), dtype=float)
>>> res = i.I_inline(f_in=f_in, f_out=f_out, axis=2)
>>> abs(res - 1).max()
0.0
"""
d = len(f_in.shape)
reps = {}
reps['d'] = d
reps['i_s'] = ", ".join(["i[%i]" % (n, ) for n in xrange(d)])
reps['j_s'] = ", ".join(["j_[%i]" % (n, ) for n in xrange(d)])
reps['for_i_start'] = "\n".join(
[("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_i[%(n)i];" +
"j[%(n)i]=++i[%(n)i]) {") % {"n":n}
for n in xrange(d)])
reps['for_i_end'] = "}"*d
reps['bc'] = self.boundary_conditions
code = r'''
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> j_(%(d)i);
std::vector<int> n_i(&Nf_out[0], &Nf_out[%(d)i]);
std::vector<int> n_j(&Nf_in[0], &Nf_in[%(d)i]);
%(for_i_start)s
if (0 == (2 + i[axis] + offset)%%2) {
// Direct terms:
j[axis] = (i[axis] + offset)/2;
%(bc)s(j, n_j, j_);
F_OUT%(d)i(%(i_s)s) = F_IN%(d)i(%(j_s)s);
} else {
// Interpolation terms
j[axis] = (i[axis] + offset - 1)/2;
%(bc)s(j, n_j, j_);
F_OUT%(d)i(%(i_s)s) = 0.5*F_IN%(d)i(%(j_s)s);
j[axis] = (i[axis] + offset + 1)/2;
%(bc)s(j, n_j, j_);
F_OUT%(d)i(%(i_s)s) += 0.5*F_IN%(d)i(%(j_s)s);
}
%(for_i_end)s
''' % reps
offset = self.offset
mmf.utils.inline(code,
['f_in', 'f_out', 'axis', 'offset'],
local_dict=locals(),
support_code=Code.index_support_code,
verbose=2,
language='c++')
return f_out
[docs] def R_inline(self, f_in, f_out, axis=0):
r"""Restriction operator. Implements the transpose of
:meth:`I`.
Parameters
----------
f_in, f_out : array
Pre-allocated arrays of the correct shape representing the
input and output (the shape will be used to determine the
upper boundary condition). These must have `d` dimensions.
.. note:: The output array must be initialized to zero.
axis : int, optional
Axis to perform interpolation along. (Default is 0).
Notes
-----
The only difference in the code from :meth:`I` are:
1) Exchange of `f_in` and `f_out`.
2) Exchange the indices `i_s` with `j_s`.
3) Turn all `=` into `+=` since each output is no-longer
guaranteed to be visited once. This is why we require
`f_out` initialized to zero.
4) Normalize by $1/2$.
Examples
--------
"""
d = len(f_in.shape)
reps = {}
reps['d'] = d
reps['i_s'] = ", ".join(["i[%i]" % (n, ) for n in xrange(d)])
reps['j_s'] = ", ".join(["j_[%i]" % (n, ) for n in xrange(d)])
reps['for_i_start'] = "\n".join(
[("for (i[%(n)i]=j[%(n)i]=0; i[%(n)i]<n_j[%(n)i]; " +
"j[%(n)i]=++i[%(n)i]) {") % {"n":n}
for n in xrange(d)])
reps['for_i_end'] = "}"*d
reps['bc'] = self.boundary_conditions
code = r'''
std::vector<int> i(%(d)i);
std::vector<int> j(%(d)i);
std::vector<int> j_(%(d)i);
std::vector<int> n_i(&Nf_out[0], &Nf_out[%(d)i]);
std::vector<int> n_j(&Nf_in[0], &Nf_in[%(d)i]);
%(for_i_start)s
if (0 == (2 + i[axis] + offset)%%2) {
// Direct terms:
j[axis] = (i[axis] + offset)/2;
%(bc)s(j, n_i, j_);
F_OUT%(d)i(%(j_s)s) += F_IN%(d)i(%(i_s)s);
} else {
// Interpolation terms
j[axis] = (i[axis] + offset - 1)/2;
%(bc)s(j, n_i, j_);
F_OUT%(d)i(%(j_s)s) += 0.5*F_IN%(d)i(%(i_s)s);
j[axis] = (i[axis] + offset + 1)/2;
%(bc)s(j, n_i, j_);
F_OUT%(d)i(%(j_s)s) += 0.5*F_IN%(d)i(%(i_s)s);
}
%(for_i_end)s
''' % reps
offset = self.offset
mmf.utils.inline(code,
['f_in', 'f_out', 'axis', 'offset'],
local_dict=locals(),
support_code=Code.index_support_code,
verbose=2,
language='c++')
f_out /= 2
return f_out
[docs]class Code(object):
"""Various pieces of C++ code and code generators."""
@staticmethod
[docs] def solve(N, a_var='a', b_var='b'):
r"""Return `(support_code, code, libs, dirs)` for solving `a*x =
b`.
Parameters
----------
N : int
Length of `b`.
a_var : str, optional
String with the name of the array `a`.
b_var : str, optional
String with the name of the array `b`.
Returns
-------
support_code : str
Add this to the support code for `weave.inline`. This
includes the libraries etc.
pre_code : str
Insert this near the start of your code. This allocates
the array `a` and defines the accesser function
solve_code : str
Insert this into your code where you need to solve the
system. The following must be pre-allocated and assigned::
double a[N][N]; // Done by pre_code
double b[N];
One should assign to `a` as `a[j][[i] = a[i, j]` so that Fortan
order is provided as required. The result will be computed in
place in the array `b` which m.
libs : [str]
Append these to the `libraries` argument for
`weave.inline`. These are the required libs.
dirs : [str]
Append these to the `linrary_dirs` argument for
`weave.inline`. These locate the libraries.
Examples
--------
>>> a = np.array([[1, 2, 3, 4],
... [1, 3, 5, 7],
... [1, -2, 3, -4],
... [1, 2, -3, -4]], dtype=float)
>>> b = np.array([0, 1, 2, 3], dtype=float)
>>> support_code, pre_code, solve_code, libs, dirs = Code.solve(4)
>>> code = r'''
... %(pre_code)s
... int i, j;
... for (i=0;i<4;i++) {
... for (j=0;j<4;j++) {
... A2(i, j) = A_IN2(i, j);
... }
... }
... %(solve_code)s
... ''' % dict(pre_code=pre_code, solve_code=solve_code)
>>> sp.weave.inline(code=code, arg_names=['a_in', 'b'],
... local_dict=dict(a_in=a, b=b),
... support_code=support_code,
... libraries=libs, library_dirs=dirs)
>>> b #doctest: +NORMALIZE_WHITESPACE
array([-3.5, 2.5, 1.5, -1.5])
>> x = np.linalg.solve(a, b)
>> y = lu_solve(a, b)
>> np.allclose(a, y)
True
"""
support_code = """
extern "C" {
void dgesv_(int *N, int *NRHS, double* A, int *LDA,
int *IPIV, double *B, int *LDB, int *INFO);
}
"""
pre_code = r'''
double %(a)s[%(N)i*%(N)i];
// Ensures fortran ordering for solver.
#define %(A)s2(i, j) (%(a)s[%(N)i*(j) + (i)])
''' % dict(N=N, a=a_var, A=a_var.upper(), b=b_var)
solve_code = r'''
{
int ok, flag = 1;
int size = %(N)i;
int p[%(N)i];
flag = 1;
dgesv_(&size, &flag, %(a)s, &size, p, %(b)s, &size, &ok);
}
''' % dict(N=N, a=a_var, b=b_var)
libs = ['lapack']
dirs = []
return (support_code, pre_code, solve_code, libs, dirs)
index_support_code = r'''
#include <vector>
inline
void
periodic(const std::vector<int> &in,
const std::vector<int> &n,
std::vector<int> &out) {
for (int c=0;c<in.size();++c) {
out[c] = (in[c] + n[c]) % n[c];
}
};
inline
void
Neumann(const std::vector<int> &in,
const std::vector<int> &n,
std::vector<int> &out) {
for (int c=0;c<in.size();++c) {
if (in[c] < 0) {
out[c] = -(in[c] + 1);
} else if (in[c] >= n[c]) {
out[c] = 2*n[c] - in[c] - 1;
} else {
out[c] = in[c];
}
}
};
// Full parity symmetry. Only half the points in the
// first dimension should be specified.
//
// (nx + 2Nx, ny + Ny, nz + Nx) = (nx, ny, nz)
// (-nx, ny, nz) = (nx - 1, Ny - ny - 1, Nz - nz - 1)
inline
void
parity(const std::vector<int> &in,
const std::vector<int> &n,
std::vector<int> &out) {
int c;
// First bring all other axes into box with periodic
// boundary conditions.
for (int c=1;c<in.size();++c) {
out[c] = (in[c] + n[c]) % n[c];
}
// Now deal with x-axis
out[0] = in[0];
if (out[0] >= n[0]) {
out[0] = out[0] - 2*n[0];
}
if (out[0] < 0) {
out[0] = -(out[0] + 1);
for (c=1;c<in.size();++c) {
out[c] = n[c] - out[c] - 1;
}
}
};
// Symmetry consistent with sheared lattice. Includes
// parity (Neumann boundary conditions) along the x
// direction and parity about the origin.
// Only half the points should be specified
// in the first-two directions. Any shear should be
// applied in the third dimension.
//
// (nx + 2Nx, ny + Ny, nz + Nx) = (nx, ny, nz)
// (-nx, ny, nz) = (nx - 1, ny, nz)
// (nx, -ny, -nz) = (nx, ny - 1, nz -1)
// (nx, -ny, nz) = (nx, ny - 1, Nz - nz - 1)
inline
void
shear(const std::vector<int> &in,
const std::vector<int> &n,
std::vector<int> &out) {
// First bring all other axes into box with periodic
// boundary conditions.
int c;
for (c=2;c<in.size();++c) {
out[c] = (in[c] + n[c]) % n[c];
}
// Now deal with x-axis
if (in[0] < 0) {
out[0] = -(in[0] + 1);
} else if (in[0] >= n[0]) {
out[0] = 2*n[0] - in[0] - 1;
} else {
out[0] = in[0];
}
// Now deal with y-axis
out[1] = in[1];
if (out[1] >= n[1]) {
out[1] = out[1] - 2*n[1];
}
if (out[1] < 0) {
out[1] = -(out[1] + 1);
for (c=2;c<in.size();++c) {
out[c] = n[c] - out[c] - 1;
}
}
};
// Compute the linear index.
inline
int
index(const std::vector<int> &i,
const std::vector<int> &n) {
int c;
int res = i[0];
for (c=1;c<i.size();++c) {
res *= n[c];
res += i[c];
}
return res;
};
// Compute the linear index with first dimension res = d.
inline
int
index_d(int res, const std::vector<int> &i,
const std::vector<int> &n) {
int c;
for (c=0;c<i.size();++c) {
res *= n[c];
res += i[c];
}
return res;
};
template<typename T>
inline
T
prod(const std::vector<T> &x) {
typename std::vector<T>::const_iterator iter;
T res = 1;
for (iter=x.begin();iter<x.end();++iter) {
res *= *iter;
}
return res;
};'''