"""Some useful non-twisted dependent utilities."""
from __future__ import division
__all__ = ['ProtocolError', 'IFormatter', 'MySet', 'MyCondition',
'IntStringFormatter', 'MsgQueue', 'hasargs']
import functools
partial = functools.partial
import threading
import Queue
import collections
import zope.interface
[docs]class ProtocolError(Exception):
"""Malformed message."""
[docs]def hasargs(decorator):
"""Meta-decorator to allow a decorator to accept kwargs or no
arg (but use the default values). Also applies wraps.
>>> @hasargs
... def decorator_with_args(f, m=2, b=1):
... return lambda x: m*f(x) + b
>>> @decorator_with_args
... def f(x): return x**2
>>> f(2.0)
9.0
>>> @decorator_with_args(m=3, b=2)
... def f(x): return x**2
>>> f(2.0)
14.0
"""
def new_decorator(*v, **kw):
if (kw or 1 < len(v) or not callable(v[0])):
# decorator called with arguments rather than a function.
return lambda f:functools.wraps(f)(decorator(f, *v, **kw))
else:
f = v[0]
return functools.wraps(f)(decorator(f))
return functools.wraps(decorator)(new_decorator)
[docs]class MySet(list):
"""A set-like object that allows for unhashable entries."""
[docs] def __init__(self, *v, **kw):
list.__init__(self, *v, **kw)
self._make_unique()
def _make_unique(self):
for k in self:
while 1 < self.count(k):
self.pop(k)
[docs] def add(self, item):
if item not in self:
self.append(item)
[docs] def remove(self, item, throw=True):
if throw and item not in self:
raise KeyError
while item in self:
list.remove(self, item)
[docs] def update(self, iterable):
for k in iterable:
self.add(k)
[docs]class MyCondition(threading._Condition):
[docs] def wait(self, timeout=None):
"""Like regular wait, but raises and exception upon timeout."""
if not self._is_owned():
raise RuntimeError("cannot wait on un-aquired lock")
waiter = threading._allocate_lock()
waiter.acquire()
self._Condition__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = threading._time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - threading._time()
if remaining <= 0:
raise Timeout
delay = min(delay * 2, remaining, .05)
threading._sleep(delay)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
try:
self._Condition__waiters.remove(waiter)
except ValueError:
pass
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
finally:
self._acquire_restore(saved_state)
[docs]class MsgQueue(Queue.Queue):
"""Implements a threadsafe message queue that allows you to add
data and then yields parsed and complete messages.
"""
Formatter = IntStringFormatter
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
def _init(self, maxsize):
self.maxsize = maxsize
self.data_queue = collections.deque()
self.msg_queue = collections.deque()
def _qsize(self):
return len(self.msg_queue)
# Check whether the queue is empty
def _empty(self):
self._process()
return not self.msg_queue
# Check whether the queue is full
def _full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize
# Put a new item in the queue
def _put(self, item):
self.data_queue.append(item)
# Get an item from the queue
def _get(self):
self._process()
return self.msg_queue.popleft()
# Custom messages for dealing with the data
def _process(self):
"""Transfer data from the data_queue to the msg_queue.
Ultimately, we would like to add some error checking and put
bad data back on the queue in its original form for debugging...
"""
data = "".join(self.data_queue)
msg, tail = self.Formatter.split(data)
while msg is not None:
self.msg_queue.append(msg)
msg, tail = self.Formatter.split(tail)
self.data_queue.clear()
if 0 < len(tail):
self.data_queue.appendleft(tail)
def __repr__(self):
return repr((self.msg_queue, self.data_queue))