Source code for mmf.async.queueserver

"""Implementation of the QueueServer core functionality.

The QueueServer manages communication between the Connections and the
Computation.  The QueueServer periodically polls the {UpdateQueue}.
If there is data waiting (in the form of a copy of the {IDict}), then
it interprets this as an update notification, and it pushes this data
to all of the subscribed Connections.  It can monitor the status of
the {UpdateQueue}, dumping data for example if the {UpdateQueue} is
filling up too fast.
    
It also periodically polls the {RequestQueue} for direct requests of
data from the Connections.  It will consolidate such requests, then
get the data directly from the {IDict} (because the {UpdateQueue}
might contain older data) and sends it to the requesting connections.
    
The QueueServer is responsible for minimizing the blocking of the
Computation.  The only blocking will occur during direct access to the
{IDict} object.  To mitigate this, the QueueServer should consolidate
all direct requests, and limit the frequency of direct requests.  It
should also monitor the size of the {UpdateQueue} to prevent memory
problems (dropping data as required to keep the computation running
smoothly.)
"""
from __future__ import with_statement

__all__ = ['QueueServer']

import Queue
import logger as _logging
import cPickle as pickle

_logLevel = _logging.logLevel

[docs]class QueueServer(object): ################################################################## # These functions must be overloaded to ensure that the queues are # properly processed.
[docs] def onUpdate(self): """Called after a pickled idict has been put onto the update_queue. Overload to suitable notify the controlling process that process_updates should be called.""" raise NotImplementedError
[docs] def onRequest(self): """Called after a request has been put onto the request_queue. Overload to suitable notify the controlling process that process_requests should be called.""" raise NotImplementedError # End of required functions. ###########################################################################
[docs] def __init__(self, idict, connections=None, logger=None): """Initialize the QueueServer. idict: Dictionary to fetch requests from. connections: Set of connections to dispatch to. logger: A logging object. """ if logger is None: logger = _logging.getLogger(self.__class__.__name__) self.logger = logger self.update_queue = Queue.Queue() self.request_queue = Queue.Queue() self.idict = idict if connections is None: connections = set() self.connections = connections
[docs] def put_update(self, idict): """Put a copy of idict on the UdateQueue. Called by the computation thread. This should block until idict is copied, then return as soon as possible, leaving the rest of the processing to another thread if possible. """ self.logger.debug("Pickling idict...") pickled_idict = pickle.dumps(dict(idict)) self.logger.debug("Putting pickled idict on request_queue...") self.update_queue.put(pickled_idict) self.onUpdate()
[docs] def put_request(self, request): """Put request on the RequestQueue.""" self.logger.debug("Putting request on request_queue...") self.request_queue.put(request) self.onRequest()
[docs] def process_updates(self): """Process updates on the queue.""" self.logger.debug("Checking for updates...") try: pickled_idict = self.update_queue.get_nowait() #pickled_idict = pickle.dumps( # self.update_queue.get_nowait()) except Queue.Empty: pass else: self.logger.debug("Dispatching updates...") for connection in self.connections: if connection.active and connection.subscribed: # ***** Error handling self.logger.debug(\ "Dispatching update to connection %i..." %connection.id) connection.send_update(pickled_idict)
[docs] def process_requests(self): """Process requests on the request_queue.""" self.logger.debug("Checking for requests...") try: request = self.request_queue.get_nowait() except Queue.Empty: pass else: self.logger.debug("Processing requests...") connection = request[0] msg = request[1] args = request[2:] if msg == "get": with self.idict.lock: if len(args) == 0: # Send whole idict self.logger.debug("Getting idict...") idict = dict(self.idict) else: names = [name for name in args if name in self.idict] self.logger.debug( "Getting %s from idict..."% (names,)) idict = dict([(name, self.idict[name]) for name in names]) self.logger.debug("Pickling get request...") pickled_idict = pickle.dumps(dict(idict)) self.logger.debug("Sending pickle...") connection.send_set(pickled_idict) else: self.logger.error( "Request %r not implemented..."%msg)