Source code for mmf.async.client

"""This module implements a client that connects to a
ComputationServer using twisted.  The ComputationServer is presented
through the RIDict interface which acts like a remote IDict (see
server.py).  Presently, however, one needs to explicitly call update.

The twisted reactor is run in a separate thread, so the user can use
this at the python prompt.  We add a hook to the GUI mainloop (this
will be backend dependent, so right now we require TkAgg) to
periodically get the data and plot the results.  This hook looks for
a callable in the main_callback_queue.  These will be called from the
main thread (all Tk interactions must be called from the main
thread)."""

from __future__ import with_statement
__all__ = ['ISync', 'TkSync', 'register_plotter', 'idict', 'run']

#import threading
#import logging
#import cPickle as pickle
import Queue

import mmf.utils

import server

#from twisted_components import RIDict, Timeout

_max_queue_size = 2
callback_queue = Queue.Queue(_max_queue_size)
_tk_hook = None

[docs]class ISync(object): """This is a high-level synchronization object that provides an interface to the underlying event loop. It is used to register callbacks that need to be called from the thread in which the event loop is running. The callbacks are called, either at a regular interval, or when update is called."""
[docs] def __init__(self): """The object should be constructed from the main thread.""" pass
[docs] def register(self, callback, delay=None, wait=True, repeat=None): """Register callback to be called repeat times with a specified minimum delay. If wait is True, then only a call to update will trigger the callback. This should be threadsafe.""" raise NotImplementedError
[docs] def update(self): """Send an update notification. Thus must be threadsafe.""" raise NotImplementedError
[docs]class TkSync(ISync): """Sync object with Tk event loop.""" poll_delay = 100 # Polling delay in ms.
[docs] def __init__(self, tk=None): """Must be called with a tk instance. Will use the figure returned by pylab.gcf() if needed.""" if tk is None: import pylab tk = pylab.gcf().canvas._tkcanvas self.tk = tk self.callbacks = {} self._delayed_calls = set() self.queue = Queue.Queue() self._poll_handle = self.tk.after_idle( self._process) # Start polling loop.
def _process(self): """Must be called from main thread.""" try: while True: f = self.queue.get_nowait() f() self.tk.update_idletasks() except Queue.Empty: pass self._poll_handle = self.tk.after(self.poll_delay, self._process) def __del__(self): self.tk.after_cancel(self._poll_handle) def _get_callback_id(self,_id=[0]): """Return a unique callback id.""" _id[0] += 1 return _id[0] def _call(self, f, *v, **kw): """Call the function f(*v,**kw) as soon as possible. Threadsafe.""" def call(): f(*v,**kw) self.queue.put(call)
[docs] def register(self, callback, delay=None, wait=True, repeat=None): """Register callback to be called repeat times with a specified minimum delay. If wait is True, then only a call to update will trigger the callback. """ callback.repeat = repeat callback.wait = wait callback.timer = mmf.utils.Timer(delay) id = "<<Callback:%i>>"%(self._get_callback_id(),) self.callbacks[id] = callback def call(event=None): """Throw away event.""" callback.timer.reset() if callback.repeat: callback.repeat -= 1 callback() # Here we bind the callback to the virtual event id. self._call(self.tk.bind,id,call) if not wait: self._call(self._fire_event,id)
def _fire_event(self, id): """Fire event id once after the specified delay. Multiple calls will be ignored. Not threadsafe.""" assert(id in self.callbacks) callback = self.callbacks[id] if callback.repeat is not None and callback.repeat < 1: del self.callbacks[id] return def update_id(): """Callback to fire event again later.""" self._fire_event(id) delay = callback.timer.remaining() if delay > 0 and id not in self._delayed_calls: # Not ready yet self._delayed_calls.add(id) self.tk.after(int(delay*1000), update_id) else: # Fire the event now. print "Firing" self.tk.event_generate(id) if id in self._delayed_calls: self._delayed_calls.remove(id) # And schedule future calls if callback.wait: # Wait for update call. pass else: if callback.timer.timeout > 0: self._delayed_calls.add(id) self.tk.after(int(callback.timer.timeout*1000), update_id) else: # No delay, other than processing events. self.tk.after_idle(update_id)
[docs] def update(self): """Send an update notification. This may be called from other threads.""" ids = self.callbacks.keys() for id in ids: if id not in self._delayed_calls: self._delayed_calls.add(id) self._call(self._fire_event,id)
[docs]def register_plotter(plotter, ridict=None): """Registers plotter to be called by the event loop. Plotting works like this: The function plotter(idict) should plots the data in idict. """ global idict if ridict is None: ridict = idict import pylab pylab.ion() fig = pylab.figure() synchronizer = TkSync(fig.canvas._tkcanvas) def callback(idict=None): """Callback to add to idict. Only get update and plot""" synchronizer.update() def plot(): """Designed to always request an update.""" ridict.get() pylab.figure(fig.number) # Select correct figure try: plotter(ridict) pylab.title("Iteration = (%i)"%(idict['iter'])) except Exception, e: print "Exception! %s"%e synchronizer.register(plot, delay=1, wait=True) idict.callbacks.add(callback) return fig
global idict idict = None
[docs]def run(plotter): import matplotlib matplotlib.use('TkAgg') server.run_client() global idict idict = server.RIDict() idict.subscribe() import example plotter = example.Problem().plot register_plotter(plotter)