"""Perspective broker implementation of an asynchronous inspector.
Use this to build a two-process client-server application where the
server runs a computation that clients can connect to, inspect, and
control.
The server should including the following boilerplate code::
from __future__ import with_statement
...
import server
...
def run_computation(..., idict, ...):
...
while (not idict.terminated and ...): # Allows clients to terminate
idict.wait() # Allows clients to pause
with idict.lock: # Use lock to be threadsafe
idict[...] = ... # Put intermediate results into idict.
...
idict.notify_update() # Call to notify clients of changes.
... <good time to checkpoint>
...
idict.notify_update(final=True) # Notify of final update.
def plot_results(idict):
"Plot intermediate results from idict."
...
def print_mesg(idict):
"Print messages from idict."
...
def run(...):
idict = server.IDict()
idict.callbacks.add(plot_results) # Add these if you want the server
idict.callbacks.add(print_mesg) # to plot and print on updates.
server_controller = server.run_server(idict)
run_computation(..., idict=idict, ...)
server_controller.wait_then_close(timeout=...)
if __name__ = '__main__':2
run()
This sets up a Perspective Broker using twisted that makes the idict
available as the root object that clients can connect to and inspect.
"""
from __future__ import with_statement
__all__ = [ 'IDict' ]
import twisted.spread.pb
import cPickle as pickle
import threading
import time
import twisted_utils
callTwisted = twisted_utils.callTwisted
reactor = twisted_utils.reactor
import utils
import logger as _logging
_port = 2728
[docs]class IDict(dict,
twisted.spread.jelly.Jellyable,
twisted.spread.jelly.Unjellyable):
"""Interactive dictionary for use by the computation server.
This dictionary maintains some state relevant for the server, such
as a lock, and a set of callbacks. This object is also jellyable
and unjellyable so that it can be serialized and sent across the
network. If entries in the dictionary cannot be jellied by
default, then one will have to modify getStateFor and setStateFor
(presently these use pickles, but this is *NOT SECURE*)
Use the lock as follows
with idict.lock:
idict['a'].x = 4
Don't use the lock when calling functions like notify_update()
which use the lock.
"""
[docs] def __init__(self, sync=None):
self.callbacks = set()
self.lock = threading.Lock()
self.terminate_server = False
[docs] def getStateFor(self, jellier):
"""Convert self to some jellyable object and return that."""
with self.lock:
return pickle.dumps(dict(self))
[docs] def setStateFor(self, unjellier, state):
"""Restore from state which was created by getStateFor."""
self.clear()
self.update(pickle.loads(state))
[docs] def notify_update(self, final=False):
"""The calculation server should call this whenever it is
finished updating. This calls the callbacks with the current
idict."""
with self.lock:
for c in self.callbacks:
try:
c(self, final)
except TypeError:
c(self)
[docs] def add_callback(self, callback):
"""Register the specified callback to receive update
notices.
Note: this is redundant. Just use self.callbacks.add().
"""
self.callbacks.add(callback)
[docs] def wait(self,timeout=None):
"""Wait if synchronization object is specified."""
return
self.sync.wait(timeout=timeout)
[docs] def terminate(self):
"""Ask the computation server to terminate."""
self.terminate_server = True
@property
[docs] def terminated(self):
"""Return True if calculation should be terminated."""
return self.terminate_server
return self.sync.terminate
# Register IDict so that it can be jellyed.
twisted.spread.jelly.setUnjellyableForClass(IDict, IDict)
class Server(twisted.spread.pb.Root):
"""Server that manages connections and their access to the
idict using the twisted perspective broker interface. Methods
that start "remote_" are accessible remotely.
"""
def __init__(self, idict):
self.idict = idict
self.logger = _logging.getLogger(self.__class__.__name__)
self.subscriptions = set()
def rootObject(self, broker):
"""Return the root object. This is just self, but we add the
broker as a state variable called protocol."""
self.protocol = broker
return self
############################################################
# Remote functions. Implement these to provide remote support for
# the clients.
def remote_get(self, name=None):
"""Get the remote dictionary or a value from it."""
if name is None:
self.logger.debug("Getting idict")
return self.idict
else:
self.logger.debug("Getting %r from idict"%(name,))
return self.idict[name]
def remote_subscribe(self, subscriber, delay):
"""Add the subscriber to the list of subscribers."""
self.logger.info("Subscribing %r with %gs delay."%
(subscriber,delay))
self.protocol.factory.subscribe(subscriber, delay)
def remote_unsubscribe(self, subscriber):
"""Remove the connection to the list of subscribers."""
self.logger.debug("Unsubscribing %r."%subscriber)
if subscriber in self.protocol.factory.subscribers:
self.protocol.factory.subscribers.remove(subscriber)
def remote_terminate(self):
"""Terminate the calculation (if it is willing!)"""
self.idict.terminate()
class ServerFactory(twisted.spread.pb.PBServerFactory):
def __init__(self, *v, **kw):
twisted.spread.pb.PBServerFactory.__init__(self, *v, **kw)
self.connections = set()
self.callbacks = set()
self.subscribers = set()
self.logger = _logging.getLogger(self.__class__.__name__)
def clientConnectionMade(self, protocol):
self.logger.info("Connection from %s"%protocol.transport.getPeer())
self.connections.add(protocol)
############################################################
# Local functions. Implement these to provide support for the
# server. Note that these are called by the main thread, so they
# must be wrapped
def subscribe(self, subscriber, delay):
subscriber.delay = delay
subscriber.last_update_time = 0
self.subscribers.add(subscriber)
@callTwisted
def do_update(self, idict, last):
"""Send an update to the client."""
for s in self.subscribers:
if (s.delay <= (time.time() - s.last_update_time)
or last):
# Push an update if enough time has elapsed, or it is
# the last update.
s.callRemote("update", idict)
s.last_update_time = time.time()
for c in self.callbacks:
c(idict)
@callTwisted
def wait_then_close(self, timeout=None):
twisted_utils.stop_reactor()
def run_server(idict):
twisted_utils.run_reactor_as_thread()
factory = ServerFactory(Server(idict))
idict.callbacks.add(factory.do_update)
callTwisted(reactor.listenTCP)(_port, factory)
return factory
class RIDict(dict, twisted.spread.pb.Referenceable):
"""These objects can be passed to the Server with a subscription
request and are used by the server to push updates.
If you register this using subscribe, then you will get updates
pushed to you (and can attach handlers to self.callbacks).
"""
def __init__(self, host='localhost', port=_port):
self.host = host
self.port = port
self.logger = _logging.getLogger(self.__class__.__name__)
self.callbacks = set()
self.connect()
def connect(self):
"""Connect to the specified server."""
factory = twisted.spread.pb.PBClientFactory()
callTwisted(block=True)(reactor.connectTCP)(
self.host, self.port, factory)
self.root = callTwisted(block=True)(factory.getRootObject)()
@callTwisted(block=True)
def subscribe(self, delay=0):
"""Ask the server to subscribe us for updates.
delay: Minimum delay between updates.
"""
self.root.callRemote("subscribe", self, delay)
@callTwisted(block=True)
def unsubscribe(self):
"""Ask the server to unsubscribe from updates."""
self.root.callRemote("unsubscribe", self)
def remote_update(self, idict):
"""Called by the server to push updates."""
self.update(idict)
for c in self.callbacks:
c(idict)
@callTwisted(block=True)
def terminate_server(self):
"""Ask the computation server to terminate."""
self.root.callRemote("terminate")