Source code for mmf.async.server

"""Perspective broker implementation of an asynchronous inspector.

Use this to build a two-process client-server application where the
server runs a computation that clients can connect to, inspect, and
control.

The server should including the following boilerplate code::

   from __future__ import with_statement
   ...
   import server
   ...

   def run_computation(..., idict, ...):
      ...
      while (not idict.terminated and ...): # Allows clients to terminate
          idict.wait()                      # Allows clients to pause
          with idict.lock:                  # Use lock to be threadsafe
              idict[...] = ...     # Put intermediate results into idict.
          ...
          idict.notify_update()    # Call to notify clients of changes.
          ... <good time to checkpoint>
      ...
      idict.notify_update(final=True) # Notify of final update.

   def plot_results(idict):
       "Plot intermediate results from idict."
       ...

   def print_mesg(idict):
       "Print messages from idict."
       ...

   def run(...):
       idict = server.IDict()
       idict.callbacks.add(plot_results) # Add these if you want the server
       idict.callbacks.add(print_mesg)   # to plot and print on updates.
       server_controller = server.run_server(idict)
       run_computation(..., idict=idict, ...)
       server_controller.wait_then_close(timeout=...)

   if __name__ = '__main__':2
       run()

This sets up a Perspective Broker using twisted that makes the idict
available as the root object that clients can connect to and inspect.
"""
from __future__ import with_statement

__all__ = [ 'IDict', '_port' ]

import twisted.spread.pb

import cPickle as pickle
import threading
import time

_port = 2728                    # Needs to be before twisted_utils

import twisted_utils
callTwisted = twisted_utils.callTwisted
reactor = twisted_utils.reactor

import utils
import logger as _logging


[docs]class IDict(dict, twisted.spread.jelly.Jellyable, twisted.spread.jelly.Unjellyable): """Interactive dictionary for use by the computation server. This dictionary maintains some state relevant for the server, such as a lock, and a set of callbacks. This object is also jellyable and unjellyable so that it can be serialized and sent across the network. If entries in the dictionary cannot be jellied by default, then one will have to modify getStateFor and setStateFor (presently these use pickles, but this is *NOT SECURE*) Use the lock as follows with idict.lock: idict['a'].x = 4 Don't use the lock when calling functions like notify_update() which use the lock. """
[docs] def __init__(self, *v, **kw): dict.__init__(self, *v, **kw) self.callbacks = set() self.lock = threading.Lock() self.terminate_server = False
[docs] def getStateFor(self, jellier): """Convert self to some jellyable object and return that.""" with self.lock: return pickle.dumps(dict(self))
[docs] def setStateFor(self, unjellier, state): """Restore from state which was created by getStateFor.""" self.clear() self.update(pickle.loads(state))
[docs] def notify_update(self, final=False): """The calculation server should call this whenever it is finished updating. This calls the callbacks with the current idict.""" with self.lock: for c in self.callbacks: try: c(self, final) except TypeError: c(self)
[docs] def add_callback(self, callback): """Register the specified callback to receive update notices. Note: this is redundant. Just use self.callbacks.add() """ self.callbacks.add(callback)
[docs] def wait(self,timeout=None): """Wait if synchronization object is specified.""" return self.sync.wait(timeout=timeout)
[docs] def terminate(self): """Ask the computation server to terminate.""" self.terminate_server = True
@property
[docs] def terminated(self): """Return True if calculation should be terminated.""" return self.terminate_server return self.sync.terminate # Register IDict so that it can be jellyed.
twisted.spread.jelly.setUnjellyableForClass(IDict, IDict) class RIDict(dict, twisted.spread.pb.Referenceable): """Remote IDict. These objects can be passed to the Server with a subscription request and are used by the server to push updates. Ultimately it would be nice if all dictionary calls would check and update if needed. For now this is just a local cache of the latest results requested (or pushed) If you register this using subscribe, then you will get updates pushed to you (and can attach handlers to self.callbacks). """ def __init__(self, host='localhost', port=_port): self.host = host self.port = port self.lock = threading.Lock() self.logger = _logging.getLogger(self.__class__.__name__) self.callbacks = set() self.connect() def __getitem__(self, name): with self.lock: return dict.__getitem__(self, name) def __setitem__(self, name, value): with self.lock: return dict.__setitem__(self, name, value) def __repr__(self): with self.lock: return dict.__repr__(self) def connect(self): """Connect to the specified server.""" factory = twisted.spread.pb.PBClientFactory() callTwisted(block=True)(reactor.connectTCP)( self.host, self.port, factory) self.root = callTwisted(block=True)(factory.getRootObject)() @callTwisted(block=True) def _call(self, cmd, *v, **kw): """Issue cmd to remote server, wait for result, and return it.""" return self.root.callRemote(cmd, *v, **kw) def get(self, names=None): """Return requested variable or update.""" idict = self._call("get", names) self.update(idict) return idict def subscribe(self, delay=0, push=False): """Ask the server to subscribe us for updates. delay: Minimum delay between updates. push: Set to True if you want the whole idict each time. """ self._call("subscribe", self, delay, push) def unsubscribe(self): """Ask the server to unsubscribe from updates.""" self._call("unsubscribe", self) def remote_update(self, idict=None): """Called by the server to push or notify of updates.""" if idict: with self.lock: self.update(idict) for c in self.callbacks: c(idict) def terminate_server(self): """Ask the computation server to terminate.""" self._call("terminate") class Server(twisted.spread.pb.Root): """Server that manages connections and their access to the idict using the twisted perspective broker interface. Methods that start "remote_" are accessible remotely. """ def __init__(self, idict): self.idict = idict self.logger = _logging.getLogger(self.__class__.__name__) self.subscriptions = set() def rootObject(self, broker): """Return the root object. This is just self, but we add the broker as a state variable called protocol.""" self.protocol = broker return self ############################################################ # Remote functions. Implement these to provide remote support for # the clients. def remote_get(self, names=None): """Get the remote dictionary or a dictionary with the specified keys in it as a dictionary.""" if names is None: self.logger.debug("Getting idict") return IDict(self.idict) else: self.logger.debug("Getting %r from idict"%(names,)) return IDict([(k, self.idict[k]) for k in names if k in self.idict]) def remote_subscribe(self, subscriber, delay, push): """Add the subscriber to the list of subscribers.""" self.logger.info("Subscribing %r with %gs delay: push=%s"% (subscriber,delay, push)) self.protocol.factory.subscribe(subscriber, delay, push) def remote_unsubscribe(self, subscriber): """Remove the connection to the list of subscribers.""" self.logger.debug("Unsubscribing %r."%subscriber) if subscriber in self.protocol.factory.subscribers: self.protocol.factory.subscribers.remove(subscriber) def remote_terminate(self): """Terminate the calculation (if it is willing!)""" self.idict.terminate() class ServerFactory(twisted.spread.pb.PBServerFactory): def __init__(self, *v, **kw): twisted.spread.pb.PBServerFactory.__init__(self, *v, **kw) self.connections = set() self.callbacks = set() self.subscribers = set() self.logger = _logging.getLogger(self.__class__.__name__) def clientConnectionMade(self, protocol): self.logger.info("Connection from %s"%protocol.transport.getPeer()) self.connections.add(protocol) ############################################################ # Local functions. Implement these to provide support for the # server. Note that these are called by the main thread, so they # must be wrapped def subscribe(self, subscriber, delay, push): subscriber.delay = delay subscriber.last_update_time = 0 subscriber.push = push self.subscribers.add(subscriber) @callTwisted def do_update(self, idict, last): """Send an update to the client.""" for s in self.subscribers: if (s.delay <= (time.time() - s.last_update_time) or last): # Push an update if enough time has elapsed, or it is # the last update. if s.push: s.callRemote("update", idict) else: s.callRemote("update") s.last_update_time = time.time() for c in self.callbacks: c(idict) @callTwisted def wait_then_close(self, timeout=None): twisted_utils.stop_reactor() def run_server(idict): twisted_utils.run_reactor_as_thread() factory = ServerFactory(Server(idict)) idict.callbacks.add(factory.do_update) callTwisted(reactor.listenTCP)(_port, factory) return factory def run_client(): return twisted_utils.run_reactor_as_thread() class Synchronizer(object): """This class provides a mechanism for synchronizing communication between the Computation thread and the Connections. """ def __init__(self): try: import threading except: import dummy_threading as threading self.lock = threading.RLock() # Lock to use when modifying Dict. self.suspends = {} # If > 0, calc should wait. self.terminate = False # If True, calc should terminate. self.calculation_condition = threading.Condition() @property def suspended(self): """Return True if calculation should be suspended. This uses the suspends dictionary. The calculation will be suspended if any active thread in this dictionbary has a value greater than zero. """ with self.lock: for k in self.suspends: if (k.active and (self.suspends[k] > 0)): return True return False @property def terminated(self): """Return True if calculation should be terminated.""" return self.terminate ########################################## # Client functionality def terminate(self): """Tell the computation to terminate (if supported by server.)""" with self.lock: self.terminate = True def suspend(self, client): """Tell the computation to pause (if supported by server.) (Multiple suspend calls are effectively ignored...) """ with self.lock: self.suspends[client] = 1 def resume(self, client, all=False): """Resume calculation (if supported by server).""" with self.lock: if all: self.suspends = {} else: if client in self.suspends: del self.suspends[client] if not self.suspended: # Notify waiting threads if no-longer suspended... with self.calculation_condition: self.calculation_condition.notify() def resume1(self, client): """Resume calculation for a single step (if supported by server).""" with self.lock: if client in self.suspends: self.suspends[client] = 0 if not self.suspended: # Notify waiting threads if no-longer suspended... with self.calculation_condition: self.calculation_condition.notify() ########################################### # Server functionality def wait(self,timeout=None): """Wait until all resume() commands have been called.""" have_time = Timer(timeout) with self.calculation_condition: while (self.suspended and not self.terminated and have_time): self.calculation_condition.wait(_timeout) for k in self.suspends: # Resuspend those threads that called resume1... self.suspends[k] = 1