"""Some utilities to supplement twisted."""
from __future__ import with_statement
__all__ = ['callTwisted', 'TwistedThread', 'RIDict', 'run_reactor_as_thread',
'run_reactor', 'stop_reactor']
from twisted.internet import reactor
import twisted.internet
import twisted.internet.threads
import threading
import Queue
import logger as _logging
from server import _port # Default port used by server.
from utils import MySet, hasargs
_reactor_thread = None
if not hasattr(twisted.internet.threads, 'blockingCallFromThread'):
# Include function from newest version of twisted...
def blockingCallFromThread(reactor, f, *a, **kw):
from twisted.python import failure
queue = Queue.Queue()
def _callFromThread():
result = twisted.internet.defer.maybeDeferred(f, *a, **kw)
result.addBoth(queue.put)
reactor.callFromThread(_callFromThread)
result = queue.get()
if isinstance(result, failure.Failure):
result.raiseException()
return result
twisted.internet.threads.blockingCallFromThread =\
blockingCallFromThread
del blockingCallFromThread
@hasargs
[docs]def callTwisted(f, delay=0, block=False):
"""Decorate a function to be called from twisted. The function is
expected to return a value, not a deferred.
The function will do several things:
1) If the twisted reactor is not running, it will register itself
to be run at the startup and then will return a deferred.
2) If the twisted reactor is running from the current thread, it
calls the reactor (possibly with a delay).
3) If the twisted reactor is running from another thread, it calls
the reactor using the callFromThread method.
Parameters:
delay: Delay the call (only if block is False and the reactor is
running).
block: If True and the reactor is running, then this will block
and return the result of the function, otherwise it will
return a deferred.
If you are going to call something that the twisted reactor needs
to deal with, you should decorate with this. This will ensure
that if the current thread is not the reactor thread, then the
function will be called with reactor.callFromThread. Examples
include calls to:
- transport.write()
- transport.loseConnection()
"""
def threadsafe_f(*v, **k):
maybeDeferred = twisted.internet.defer.maybeDeferred
current_thread = threading.currentThread()
if reactor.running:
if (current_thread is _reactor_thread
or _reactor_thread is None):
# Reactor running in main thread. We return a
# deferred here and just call the function directly.
# Blocking is not an option, but one can use
# inlineCallbacks if you need to chain calls.
return maybeDeferred(f, *v, **k)
else:
# Reactor running in a different thread.
if block:
return twisted.internet.threads.blockingCallFromThread(
reactor, f, *v, **k)
else:
return reactor.callFromThread(f, *v, **k)
else:
# Reactor is not running. We must call the function
# later. In this case, we must return a deferred
return reactor.callWhenRunning(f, *v, **k)
return threadsafe_f
[docs]class TwistedThread(threading.Thread):
"""Python Thread that runs the twisted reactor."""
_run = False
[docs] def __init__(self):
"""Start the thread as a daemon (meaning that it will not
block the closing of the process when the main thread
executes."""
if self._run:
raise TwistedError("Reactor has already been run "
"and restarts are not yet supported.")
threading.Thread.__init__(self)
self.setDaemon(True)
self.start()
[docs] def run(self):
"""Run the reactor without signal handlers. (Signals go to
the main thread.)"""
self.__class__._run = True
reactor.run(installSignalHandlers=0)
[docs] def stop(self):
"""Stop the reactor and join the thread... i.e. block until
the reactor stops."""
reactor.stop()
#self.join()
[docs]class RIDict(dict):
"""Implements a remote IDict interface that connects to a
specified server."""
[docs] def __init__(self, host='localhost', port=_port):
global reactor
self.lock = threading.RLock()
self.host = host
self.port = port
self.logger = _logging.getLogger(self.__class__.__name__)
self._update_condition = MyCondition()
self._client_creator = \
twisted.internet.protocol.ClientCreator(
reactor, ClientProtocol)
self.callbacks = MySet()
self.connected = False
self.queue = Queue.Queue()
#@callTwisted(block=True)
[docs] def connect(self):
"""Try to connect to the specified host."""
deferred = self._client_creator.connectTCP('localhost', _port)
deferred.addCallbacks(self.onConnect, self.onFail)
return deferred
[docs] def onConnect(self, protocol):
self.logger.info("Connected.")
protocol.callbacks.update(self.callbacks)
self.callbacks = protocol.callbacks
self.callbacks.add(self.onUpdate)
self._protocol = protocol
self.connected = True
[docs] def onFail(self, failure):
self.logger.error("Connection failed: %s"%
failure.getErrorMessage())
raise SocketError(failure.getErrorMessage())
[docs] def onUpdate(self, idict):
self.logger.debug("Updating...")
self.update(idict)
self.queue.put(idict)
self.pds = self._protocol.pds
self.rrr = self._protocol.rrr
with self._update_condition:
print "notify of iteration %i"%self['iter']
self._update_condition.notifyAll()
@callTwisted
[docs] def subscribe(self, skip=0, delay=0):
"""Subscribe for update notification from the server.
skip: Skip this many updates.
delay: Minimum time delay between updates. (Could be longer
if the sever does not have any updates.)
"""
self._protocol.subscribe(skip, delay)
[docs] def wait_for_update(self, timeout=None):
"""Block until an update is received."""
with self._update_condition:
self._update_condition.wait(timeout)
[docs]def run_reactor_as_thread():
global _reactor_thread
if _reactor_thread:
_logger.error("Reactor thread has already been run." +
"Restarts are not supported.")
else:
_reactor_thread = TwistedThread()
[docs]def run_reactor():
reactor.run()
[docs]def stop_reactor():
global _reactor
global _reactor_thread
if _reactor_thread:
_reactor_thread.stop()
_reactor_thread = None
else:
reactor.stop()