Source code for mmf.async.pb

"""Perspective broker implementation of an asynchronous inspector.

Use this to build a two-process client-server application where the
server runs a computation that clients can connect to, inspect, and
control.

The server should including the following boilerplate code::

   from __future__ import with_statement
   ...
   import server
   ...

   def run_computation(..., idict, ...):
      ...
      while (not idict.terminated and ...): # Allows clients to terminate
          idict.wait()                      # Allows clients to pause
          with idict.lock:                  # Use lock to be threadsafe
              idict[...] = ...     # Put intermediate results into idict.
          ...
          idict.notify_update()    # Call to notify clients of changes.
          ... <good time to checkpoint>
      ...
      idict.notify_update(final=True) # Notify of final update.

   def plot_results(idict):
       "Plot intermediate results from idict."
       ...

   def print_mesg(idict):
       "Print messages from idict."
       ...

   def run(...):
       idict = server.IDict()
       idict.callbacks.add(plot_results) # Add these if you want the server
       idict.callbacks.add(print_mesg)   # to plot and print on updates.
       server_controller = server.run_server(idict)
       run_computation(..., idict=idict, ...)
       server_controller.wait_then_close(timeout=...)

   if __name__ = '__main__':2
       run()

This sets up a Perspective Broker using twisted that makes the idict
available as the root object that clients can connect to and inspect.
"""
from __future__ import with_statement

__all__ = [ 'IDict' ]

import twisted.spread.pb

import cPickle as pickle
import threading
import time

import twisted_utils
callTwisted = twisted_utils.callTwisted
reactor = twisted_utils.reactor

import utils
import logger as _logging

_port = 2728

[docs]class IDict(dict, twisted.spread.jelly.Jellyable, twisted.spread.jelly.Unjellyable): """Interactive dictionary for use by the computation server. This dictionary maintains some state relevant for the server, such as a lock, and a set of callbacks. This object is also jellyable and unjellyable so that it can be serialized and sent across the network. If entries in the dictionary cannot be jellied by default, then one will have to modify getStateFor and setStateFor (presently these use pickles, but this is *NOT SECURE*) Use the lock as follows with idict.lock: idict['a'].x = 4 Don't use the lock when calling functions like notify_update() which use the lock. """
[docs] def __init__(self, sync=None): self.callbacks = set() self.lock = threading.Lock() self.terminate_server = False
[docs] def getStateFor(self, jellier): """Convert self to some jellyable object and return that.""" with self.lock: return pickle.dumps(dict(self))
[docs] def setStateFor(self, unjellier, state): """Restore from state which was created by getStateFor.""" self.clear() self.update(pickle.loads(state))
[docs] def notify_update(self, final=False): """The calculation server should call this whenever it is finished updating. This calls the callbacks with the current idict.""" with self.lock: for c in self.callbacks: try: c(self, final) except TypeError: c(self)
[docs] def add_callback(self, callback): """Register the specified callback to receive update notices. Note: this is redundant. Just use self.callbacks.add(). """ self.callbacks.add(callback)
[docs] def wait(self,timeout=None): """Wait if synchronization object is specified.""" return self.sync.wait(timeout=timeout)
[docs] def terminate(self): """Ask the computation server to terminate.""" self.terminate_server = True
@property
[docs] def terminated(self): """Return True if calculation should be terminated.""" return self.terminate_server return self.sync.terminate # Register IDict so that it can be jellyed.
twisted.spread.jelly.setUnjellyableForClass(IDict, IDict) class Server(twisted.spread.pb.Root): """Server that manages connections and their access to the idict using the twisted perspective broker interface. Methods that start "remote_" are accessible remotely. """ def __init__(self, idict): self.idict = idict self.logger = _logging.getLogger(self.__class__.__name__) self.subscriptions = set() def rootObject(self, broker): """Return the root object. This is just self, but we add the broker as a state variable called protocol.""" self.protocol = broker return self ############################################################ # Remote functions. Implement these to provide remote support for # the clients. def remote_get(self, name=None): """Get the remote dictionary or a value from it.""" if name is None: self.logger.debug("Getting idict") return self.idict else: self.logger.debug("Getting %r from idict"%(name,)) return self.idict[name] def remote_subscribe(self, subscriber, delay): """Add the subscriber to the list of subscribers.""" self.logger.info("Subscribing %r with %gs delay."% (subscriber,delay)) self.protocol.factory.subscribe(subscriber, delay) def remote_unsubscribe(self, subscriber): """Remove the connection to the list of subscribers.""" self.logger.debug("Unsubscribing %r."%subscriber) if subscriber in self.protocol.factory.subscribers: self.protocol.factory.subscribers.remove(subscriber) def remote_terminate(self): """Terminate the calculation (if it is willing!)""" self.idict.terminate() class ServerFactory(twisted.spread.pb.PBServerFactory): def __init__(self, *v, **kw): twisted.spread.pb.PBServerFactory.__init__(self, *v, **kw) self.connections = set() self.callbacks = set() self.subscribers = set() self.logger = _logging.getLogger(self.__class__.__name__) def clientConnectionMade(self, protocol): self.logger.info("Connection from %s"%protocol.transport.getPeer()) self.connections.add(protocol) ############################################################ # Local functions. Implement these to provide support for the # server. Note that these are called by the main thread, so they # must be wrapped def subscribe(self, subscriber, delay): subscriber.delay = delay subscriber.last_update_time = 0 self.subscribers.add(subscriber) @callTwisted def do_update(self, idict, last): """Send an update to the client.""" for s in self.subscribers: if (s.delay <= (time.time() - s.last_update_time) or last): # Push an update if enough time has elapsed, or it is # the last update. s.callRemote("update", idict) s.last_update_time = time.time() for c in self.callbacks: c(idict) @callTwisted def wait_then_close(self, timeout=None): twisted_utils.stop_reactor() def run_server(idict): twisted_utils.run_reactor_as_thread() factory = ServerFactory(Server(idict)) idict.callbacks.add(factory.do_update) callTwisted(reactor.listenTCP)(_port, factory) return factory class RIDict(dict, twisted.spread.pb.Referenceable): """These objects can be passed to the Server with a subscription request and are used by the server to push updates. If you register this using subscribe, then you will get updates pushed to you (and can attach handlers to self.callbacks). """ def __init__(self, host='localhost', port=_port): self.host = host self.port = port self.logger = _logging.getLogger(self.__class__.__name__) self.callbacks = set() self.connect() def connect(self): """Connect to the specified server.""" factory = twisted.spread.pb.PBClientFactory() callTwisted(block=True)(reactor.connectTCP)( self.host, self.port, factory) self.root = callTwisted(block=True)(factory.getRootObject)() @callTwisted(block=True) def subscribe(self, delay=0): """Ask the server to subscribe us for updates. delay: Minimum delay between updates. """ self.root.callRemote("subscribe", self, delay) @callTwisted(block=True) def unsubscribe(self): """Ask the server to unsubscribe from updates.""" self.root.callRemote("unsubscribe", self) def remote_update(self, idict): """Called by the server to push updates.""" self.update(idict) for c in self.callbacks: c(idict) @callTwisted(block=True) def terminate_server(self): """Ask the computation server to terminate.""" self.root.callRemote("terminate")