Source code for mmf.async.example

"""This file shows how to start a computation server.

Just run this at the python prompt...
"""
from __future__ import with_statement

__all__ = ['Problem', 'go_trivial']

import time
import numpy as np
#import mmf.async.server as server
import server

from mmf.objects import StateVars, process_vars
from mmf.objects import Required, Computed, Excluded, NoCopy

[docs]class Problem(StateVars): """Simple little problem. Takes a function f, and slowly converges to a straight line between the endpoints a and b. """ _state_vars = [ ('a', 1, "Left function value"), ('b', 2, "Right function value"), ('abs_tol', 1e-5, "Convergence criterion"), ('f', None, "Current guess"), ('verbosity', 1, "1 = messages, 2 = plot."), ('idict', NoCopy(server.IDict()), "Shared memory object.")] process_vars() def __init__(self, *v, **kw): if self.f is None: self.f = np.random.rand(100) def iter(self,f): """Perform a single iteration.""" F = np.hstack(([self.a],f,[self.b])) return (F[:-2] + F[2:])/2.0 def solve(self): """Solve the problem. Note: the only thing unusual here is that we save the intermediate results in self.shared""" start_time = time.time() iter = 0 f1 = self.f if self.verbosity > 0: self.idict.callbacks.add(self.print_mesg) if self.verbosity > 1: self.idict.callbacks.add(self.plot) with self.idict.lock: self.idict['f'] = f1 self.idict['iter'] = iter self.idict['abs_err'] = np.inf self.idict.notify_update() f0 = f1 + 2*self.abs_tol while (not self.idict.terminated and self.idict['abs_err'] > self.abs_tol and iter < 10000): # Give clients a chance to pause self.idict.wait() f0 = f1 f1 = self.iter(f0) iter = iter + 1 with self.idict.lock: self.idict['f'] = f1 self.idict['iter'] = iter self.idict['abs_err'] = abs(f1-f0).max() self.idict.notify_update() self.idict.notify_update(final=True) # Notify of last update print "Elapsed time = %gs"%(time.time() - start_time) return f1 def plot(self, idict): """Here is how we would plot the intermediate solutions. Importing the plotting library etc. should be done within the function here because this should not slow down the computational process with the import unless plotting is required. """ import pylab interactive = pylab.isinteractive() pylab.ioff() pylab.clf() # Note the use of idict for accessing data. If this was being # run in a separate thread, then the idict instance would have # to request the data from the computation server, however, # the plot function can also be used within the server thread # if desired. pylab.plot(idict['f']) pylab.draw() if interactive: pylab.ion() pylab.draw() def mesg(self, idict): """This returns a diagnostic string for output.""" mesg = "Iteration: %i, abs_err = %g, abs_tol = %g"%( idict['iter'], idict['abs_err'], self.abs_tol) return mesg def print_mesg(self, idict): print(self.mesg(idict))
[docs]def go_trivial(): """Here is a trivial way of using this object.""" idict = server.IDict() p = Problem(verbosity=2,idict=idict) p.solve()
if __name__ == "__main__": idict = server.IDict() problem = Problem(idict=idict) #raw_input("Press any key to start the computation server...\n") # Start the computation server... server_control = server.run_server(idict=idict) try: raw_input("Press any key to start the computation...\n") # Now solve the problem. problem.solve() print("Problem solved.") raw_input("Press any key to kill the " "computation server...\n") finally: print("Waiting 60s for clients to close, " "then terminating") server_control.wait_then_close(timeout=60)