Source code for mmf.async.example
"""This file shows how to start a computation server.
Just run this at the python prompt...
"""
from __future__ import with_statement
__all__ = ['Problem', 'go_trivial']
import time
import numpy as np
#import mmf.async.server as server
import server
from mmf.objects import StateVars, process_vars
from mmf.objects import Required, Computed, Excluded, NoCopy
[docs]class Problem(StateVars):
"""Simple little problem. Takes a function f, and slowly
converges to a straight line between the endpoints a and b.
"""
_state_vars = [
('a', 1, "Left function value"),
('b', 2, "Right function value"),
('abs_tol', 1e-5, "Convergence criterion"),
('f', None, "Current guess"),
('verbosity', 1, "1 = messages, 2 = plot."),
('idict', NoCopy(server.IDict()), "Shared memory object.")]
process_vars()
def __init__(self, *v, **kw):
if self.f is None:
self.f = np.random.rand(100)
def iter(self,f):
"""Perform a single iteration."""
F = np.hstack(([self.a],f,[self.b]))
return (F[:-2] + F[2:])/2.0
def solve(self):
"""Solve the problem. Note: the only thing unusual here is
that we save the intermediate results in self.shared"""
start_time = time.time()
iter = 0
f1 = self.f
if self.verbosity > 0:
self.idict.callbacks.add(self.print_mesg)
if self.verbosity > 1:
self.idict.callbacks.add(self.plot)
with self.idict.lock:
self.idict['f'] = f1
self.idict['iter'] = iter
self.idict['abs_err'] = np.inf
self.idict.notify_update()
f0 = f1 + 2*self.abs_tol
while (not self.idict.terminated and
self.idict['abs_err'] > self.abs_tol
and iter < 10000):
# Give clients a chance to pause
self.idict.wait()
f0 = f1
f1 = self.iter(f0)
iter = iter + 1
with self.idict.lock:
self.idict['f'] = f1
self.idict['iter'] = iter
self.idict['abs_err'] = abs(f1-f0).max()
self.idict.notify_update()
self.idict.notify_update(final=True) # Notify of last update
print "Elapsed time = %gs"%(time.time() - start_time)
return f1
def plot(self, idict):
"""Here is how we would plot the intermediate solutions.
Importing the plotting library etc. should be done within the
function here because this should not slow down the
computational process with the import unless plotting is required.
"""
import pylab
interactive = pylab.isinteractive()
pylab.ioff()
pylab.clf()
# Note the use of idict for accessing data. If this was being
# run in a separate thread, then the idict instance would have
# to request the data from the computation server, however,
# the plot function can also be used within the server thread
# if desired.
pylab.plot(idict['f'])
pylab.draw()
if interactive:
pylab.ion()
pylab.draw()
def mesg(self, idict):
"""This returns a diagnostic string for output."""
mesg = "Iteration: %i, abs_err = %g, abs_tol = %g"%(
idict['iter'],
idict['abs_err'],
self.abs_tol)
return mesg
def print_mesg(self, idict):
print(self.mesg(idict))
[docs]def go_trivial():
"""Here is a trivial way of using this object."""
idict = server.IDict()
p = Problem(verbosity=2,idict=idict)
p.solve()
if __name__ == "__main__":
idict = server.IDict()
problem = Problem(idict=idict)
#raw_input("Press any key to start the computation server...\n")
# Start the computation server...
server_control = server.run_server(idict=idict)
try:
raw_input("Press any key to start the computation...\n")
# Now solve the problem.
problem.solve()
print("Problem solved.")
raw_input("Press any key to kill the "
"computation server...\n")
finally:
print("Waiting 60s for clients to close, "
"then terminating")
server_control.wait_then_close(timeout=60)