Source code for mmf.async.client
"""This module implements a client that connects to a
ComputationServer using twisted. The ComputationServer is presented
through the RIDict interface which acts like a remote IDict (see
server.py). Presently, however, one needs to explicitly call update.
The twisted reactor is run in a separate thread, so the user can use
this at the python prompt. We add a hook to the GUI mainloop (this
will be backend dependent, so right now we require TkAgg) to
periodically get the data and plot the results. This hook looks for
a callable in the main_callback_queue. These will be called from the
main thread (all Tk interactions must be called from the main
thread)."""
from __future__ import with_statement
__all__ = ['ISync', 'TkSync', 'register_plotter', 'idict', 'run']
#import threading
#import logging
#import cPickle as pickle
import Queue
import mmf.utils
import server
#from twisted_components import RIDict, Timeout
_max_queue_size = 2
callback_queue = Queue.Queue(_max_queue_size)
_tk_hook = None
[docs]class ISync(object):
"""This is a high-level synchronization object that provides an
interface to the underlying event loop. It is used to register
callbacks that need to be called from the thread in which the
event loop is running. The callbacks are called, either at a
regular interval, or when update is called."""
[docs] def __init__(self):
"""The object should be constructed from the main thread."""
pass
[docs] def register(self, callback, delay=None, wait=True, repeat=None):
"""Register callback to be called repeat times with a
specified minimum delay. If wait is True, then only a call to
update will trigger the callback. This should be threadsafe."""
raise NotImplementedError
[docs] def update(self):
"""Send an update notification. Thus must be threadsafe."""
raise NotImplementedError
[docs]class TkSync(ISync):
"""Sync object with Tk event loop."""
poll_delay = 100 # Polling delay in ms.
[docs] def __init__(self, tk=None):
"""Must be called with a tk instance. Will use the figure
returned by pylab.gcf() if needed."""
if tk is None:
import pylab
tk = pylab.gcf().canvas._tkcanvas
self.tk = tk
self.callbacks = {}
self._delayed_calls = set()
self.queue = Queue.Queue()
self._poll_handle = self.tk.after_idle(
self._process) # Start polling loop.
def _process(self):
"""Must be called from main thread."""
try:
while True:
f = self.queue.get_nowait()
f()
self.tk.update_idletasks()
except Queue.Empty:
pass
self._poll_handle = self.tk.after(self.poll_delay,
self._process)
def __del__(self):
self.tk.after_cancel(self._poll_handle)
def _get_callback_id(self,_id=[0]):
"""Return a unique callback id."""
_id[0] += 1
return _id[0]
def _call(self, f, *v, **kw):
"""Call the function f(*v,**kw) as soon as possible.
Threadsafe."""
def call():
f(*v,**kw)
self.queue.put(call)
[docs] def register(self, callback, delay=None, wait=True, repeat=None):
"""Register callback to be called repeat times with a
specified minimum delay. If wait is True, then only a call to
update will trigger the callback.
"""
callback.repeat = repeat
callback.wait = wait
callback.timer = mmf.utils.Timer(delay)
id = "<<Callback:%i>>"%(self._get_callback_id(),)
self.callbacks[id] = callback
def call(event=None):
"""Throw away event."""
callback.timer.reset()
if callback.repeat: callback.repeat -= 1
callback()
# Here we bind the callback to the virtual event id.
self._call(self.tk.bind,id,call)
if not wait:
self._call(self._fire_event,id)
def _fire_event(self, id):
"""Fire event id once after the specified delay. Multiple
calls will be ignored. Not threadsafe."""
assert(id in self.callbacks)
callback = self.callbacks[id]
if callback.repeat is not None and callback.repeat < 1:
del self.callbacks[id]
return
def update_id():
"""Callback to fire event again later."""
self._fire_event(id)
delay = callback.timer.remaining()
if delay > 0 and id not in self._delayed_calls:
# Not ready yet
self._delayed_calls.add(id)
self.tk.after(int(delay*1000), update_id)
else:
# Fire the event now.
print "Firing"
self.tk.event_generate(id)
if id in self._delayed_calls:
self._delayed_calls.remove(id)
# And schedule future calls
if callback.wait: # Wait for update call.
pass
else:
if callback.timer.timeout > 0:
self._delayed_calls.add(id)
self.tk.after(int(callback.timer.timeout*1000),
update_id)
else:
# No delay, other than processing events.
self.tk.after_idle(update_id)
[docs] def update(self):
"""Send an update notification. This may be called from other
threads."""
ids = self.callbacks.keys()
for id in ids:
if id not in self._delayed_calls:
self._delayed_calls.add(id)
self._call(self._fire_event,id)
[docs]def register_plotter(plotter, ridict=None):
"""Registers plotter to be called by the event loop.
Plotting works like this:
The function plotter(idict) should plots the data in idict.
"""
global idict
if ridict is None:
ridict = idict
import pylab
pylab.ion()
fig = pylab.figure()
synchronizer = TkSync(fig.canvas._tkcanvas)
def callback(idict=None):
"""Callback to add to idict. Only get update and plot"""
synchronizer.update()
def plot():
"""Designed to always request an update."""
ridict.get()
pylab.figure(fig.number) # Select correct figure
try:
plotter(ridict)
pylab.title("Iteration = (%i)"%(idict['iter']))
except Exception, e:
print "Exception! %s"%e
synchronizer.register(plot, delay=1, wait=True)
idict.callbacks.add(callback)
return fig
global idict
idict = None
[docs]def run(plotter):
import matplotlib
matplotlib.use('TkAgg')
server.run_client()
global idict
idict = server.RIDict()
idict.subscribe()
import example
plotter = example.Problem().plot
register_plotter(plotter)