Source code for mmf.async.queueserver
"""Implementation of the QueueServer core functionality.
The QueueServer manages communication between the Connections and the
Computation. The QueueServer periodically polls the {UpdateQueue}.
If there is data waiting (in the form of a copy of the {IDict}), then
it interprets this as an update notification, and it pushes this data
to all of the subscribed Connections. It can monitor the status of
the {UpdateQueue}, dumping data for example if the {UpdateQueue} is
filling up too fast.
It also periodically polls the {RequestQueue} for direct requests of
data from the Connections. It will consolidate such requests, then
get the data directly from the {IDict} (because the {UpdateQueue}
might contain older data) and sends it to the requesting connections.
The QueueServer is responsible for minimizing the blocking of the
Computation. The only blocking will occur during direct access to the
{IDict} object. To mitigate this, the QueueServer should consolidate
all direct requests, and limit the frequency of direct requests. It
should also monitor the size of the {UpdateQueue} to prevent memory
problems (dropping data as required to keep the computation running
smoothly.)
"""
from __future__ import with_statement
__all__ = ['QueueServer']
import Queue
import logger as _logging
import cPickle as pickle
_logLevel = _logging.logLevel
[docs]class QueueServer(object):
##################################################################
# These functions must be overloaded to ensure that the queues are
# properly processed.
[docs] def onUpdate(self):
"""Called after a pickled idict has been put onto the
update_queue.
Overload to suitable notify the controlling process that
process_updates should be called."""
raise NotImplementedError
[docs] def onRequest(self):
"""Called after a request has been put onto the request_queue.
Overload to suitable notify the controlling process that
process_requests should be called."""
raise NotImplementedError
# End of required functions.
###########################################################################
[docs] def __init__(self, idict, connections=None, logger=None):
"""Initialize the QueueServer.
idict: Dictionary to fetch requests from.
connections: Set of connections to dispatch to.
logger: A logging object.
"""
if logger is None:
logger = _logging.getLogger(self.__class__.__name__)
self.logger = logger
self.update_queue = Queue.Queue()
self.request_queue = Queue.Queue()
self.idict = idict
if connections is None:
connections = set()
self.connections = connections
[docs] def put_update(self, idict):
"""Put a copy of idict on the UdateQueue.
Called by the computation thread. This should block until
idict is copied, then return as soon as possible, leaving the
rest of the processing to another thread if possible.
"""
self.logger.debug("Pickling idict...")
pickled_idict = pickle.dumps(dict(idict))
self.logger.debug("Putting pickled idict on request_queue...")
self.update_queue.put(pickled_idict)
self.onUpdate()
[docs] def put_request(self, request):
"""Put request on the RequestQueue."""
self.logger.debug("Putting request on request_queue...")
self.request_queue.put(request)
self.onRequest()
[docs] def process_updates(self):
"""Process updates on the queue."""
self.logger.debug("Checking for updates...")
try:
pickled_idict = self.update_queue.get_nowait()
#pickled_idict = pickle.dumps(
# self.update_queue.get_nowait())
except Queue.Empty:
pass
else:
self.logger.debug("Dispatching updates...")
for connection in self.connections:
if connection.active and connection.subscribed:
# ***** Error handling
self.logger.debug(\
"Dispatching update to connection %i..."
%connection.id)
connection.send_update(pickled_idict)
[docs] def process_requests(self):
"""Process requests on the request_queue."""
self.logger.debug("Checking for requests...")
try:
request = self.request_queue.get_nowait()
except Queue.Empty:
pass
else:
self.logger.debug("Processing requests...")
connection = request[0]
msg = request[1]
args = request[2:]
if msg == "get":
with self.idict.lock:
if len(args) == 0:
# Send whole idict
self.logger.debug("Getting idict...")
idict = dict(self.idict)
else:
names = [name for name in args
if name in self.idict]
self.logger.debug(
"Getting %s from idict..."%
(names,))
idict = dict([(name, self.idict[name])
for name in names])
self.logger.debug("Pickling get request...")
pickled_idict = pickle.dumps(dict(idict))
self.logger.debug("Sending pickle...")
connection.send_set(pickled_idict)
else:
self.logger.error(
"Request %r not implemented..."%msg)