Source code for mmf.async.utils

"""Some useful non-twisted dependent utilities."""
from __future__ import division

__all__ = ['ProtocolError', 'IFormatter', 'MySet', 'MyCondition',
           'IntStringFormatter', 'MsgQueue', 'hasargs']

import functools
partial = functools.partial
import threading
import Queue
import collections

import zope.interface

[docs]class ProtocolError(Exception): """Malformed message."""
[docs]def hasargs(decorator): """Meta-decorator to allow a decorator to accept kwargs or no arg (but use the default values). Also applies wraps. >>> @hasargs ... def decorator_with_args(f, m=2, b=1): ... return lambda x: m*f(x) + b >>> @decorator_with_args ... def f(x): return x**2 >>> f(2.0) 9.0 >>> @decorator_with_args(m=3, b=2) ... def f(x): return x**2 >>> f(2.0) 14.0 """ def new_decorator(*v, **kw): if (kw or 1 < len(v) or not callable(v[0])): # decorator called with arguments rather than a function. return lambda f:functools.wraps(f)(decorator(f, *v, **kw)) else: f = v[0] return functools.wraps(f)(decorator(f)) return functools.wraps(decorator)(new_decorator)
[docs]class MySet(list): """A set-like object that allows for unhashable entries."""
[docs] def __init__(self, *v, **kw): list.__init__(self, *v, **kw) self._make_unique()
def _make_unique(self): for k in self: while 1 < self.count(k): self.pop(k)
[docs] def add(self, item): if item not in self: self.append(item)
[docs] def remove(self, item, throw=True): if throw and item not in self: raise KeyError while item in self: list.remove(self, item)
[docs] def update(self, iterable): for k in iterable: self.add(k)
[docs]class MyCondition(threading._Condition):
[docs] def wait(self, timeout=None): """Like regular wait, but raises and exception upon timeout.""" if not self._is_owned(): raise RuntimeError("cannot wait on un-aquired lock") waiter = threading._allocate_lock() waiter.acquire() self._Condition__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). endtime = threading._time() + timeout delay = 0.0005 # 500 us -> initial delay of 1 ms while True: gotit = waiter.acquire(0) if gotit: break remaining = endtime - threading._time() if remaining <= 0: raise Timeout delay = min(delay * 2, remaining, .05) threading._sleep(delay) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: self._Condition__waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) finally: self._acquire_restore(saved_state)
[docs]class IFormatter(zope.interface.Interface):
[docs] def format(msg): """Return the formatted message msg."""
[docs] def split(data): """Return (msg, tail) where msg is the first complete message encoded in data, and tail is the remaining data. If there is no message the msg == None (which is different from an empty message). raises ProtocolError if the message is malformed. """
[docs]class IntStringFormatter(object): """Formatter for messages of the format "%i.%s"(len(msg),msg) >>> F = IntStringFormatter >>> msg = F.format("Hello") >>> msg '5.Hello' >>> F.split(msg) ('Hello', '') >>> F.split("2.Hi5.There") ('Hi', '5.There') Invalid messages will raise a ProtocolError >>> F.split("1j.Hello") Traceback (most recent call last): ... ProtocolError: Received non-digit 'j' in message prefix '1j' """ zope.interface.implements(IFormatter) @staticmethod
[docs] def split(data): """Return (msg, tail) where msg is the first complete message encoded in data, and tail is the remaining data. If there is no message the msg == None (which is different from an empty message). raises ProtocolError if the message is malformed. """ head_len = data.find('.') head = data[:head_len] tail = data[head_len+1:] for c in head: if c not in "0123456789": # Some character is not a number! raise ProtocolError( "Received non-digit '%s' in message prefix '%s'"% (c, head)) if head_len < 0: # Incomplete prefix msg = None tail = head + tail else: try: msg_len = int(head) except ValueError: raise ProtocolError( "Invalid message prefix '%s' " "(must cast to an int)"%(head)) if len(tail) < msg_len: # Incomplete message... msg = None tail = head + tail else: msg = tail[:msg_len] tail = tail[msg_len:] return (msg, tail)
@staticmethod
[docs] def format(msg): """Return the formatted message.""" return "%i.%s"%(len(msg),msg)
[docs]class MsgQueue(Queue.Queue): """Implements a threadsafe message queue that allows you to add data and then yields parsed and complete messages. """ Formatter = IntStringFormatter # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held def _init(self, maxsize): self.maxsize = maxsize self.data_queue = collections.deque() self.msg_queue = collections.deque() def _qsize(self): return len(self.msg_queue) # Check whether the queue is empty def _empty(self): self._process() return not self.msg_queue # Check whether the queue is full def _full(self): return self.maxsize > 0 and len(self.queue) == self.maxsize # Put a new item in the queue def _put(self, item): self.data_queue.append(item) # Get an item from the queue def _get(self): self._process() return self.msg_queue.popleft() # Custom messages for dealing with the data def _process(self): """Transfer data from the data_queue to the msg_queue. Ultimately, we would like to add some error checking and put bad data back on the queue in its original form for debugging... """ data = "".join(self.data_queue) msg, tail = self.Formatter.split(data) while msg is not None: self.msg_queue.append(msg) msg, tail = self.Formatter.split(tail) self.data_queue.clear() if 0 < len(tail): self.data_queue.appendleft(tail) def __repr__(self): return repr((self.msg_queue, self.data_queue))