Source code for mmf.async.twisted_utils

"""Some utilities to supplement twisted."""
from __future__ import with_statement

__all__ = ['callTwisted', 'TwistedThread', 'RIDict', 'run_reactor_as_thread',
           'run_reactor', 'stop_reactor']

from twisted.internet import reactor
import twisted.internet
import twisted.internet.threads

import threading
import Queue

import logger as _logging
from server import _port        # Default port used by server.
from utils import MySet, hasargs

_reactor_thread = None

if not hasattr(twisted.internet.threads, 'blockingCallFromThread'):
    # Include function from newest version of twisted...
    def blockingCallFromThread(reactor, f, *a, **kw):
        from twisted.python import failure
        queue = Queue.Queue()
        def _callFromThread():
            result = twisted.internet.defer.maybeDeferred(f, *a, **kw)
            result.addBoth(queue.put)
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            result.raiseException()
        return result
    twisted.internet.threads.blockingCallFromThread =\
        blockingCallFromThread
    del blockingCallFromThread

@hasargs
[docs]def callTwisted(f, delay=0, block=False): """Decorate a function to be called from twisted. The function is expected to return a value, not a deferred. The function will do several things: 1) If the twisted reactor is not running, it will register itself to be run at the startup and then will return a deferred. 2) If the twisted reactor is running from the current thread, it calls the reactor (possibly with a delay). 3) If the twisted reactor is running from another thread, it calls the reactor using the callFromThread method. Parameters: delay: Delay the call (only if block is False and the reactor is running). block: If True and the reactor is running, then this will block and return the result of the function, otherwise it will return a deferred. If you are going to call something that the twisted reactor needs to deal with, you should decorate with this. This will ensure that if the current thread is not the reactor thread, then the function will be called with reactor.callFromThread. Examples include calls to: - transport.write() - transport.loseConnection() """ def threadsafe_f(*v, **k): maybeDeferred = twisted.internet.defer.maybeDeferred current_thread = threading.currentThread() if reactor.running: if (current_thread is _reactor_thread or _reactor_thread is None): # Reactor running in main thread. We return a # deferred here and just call the function directly. # Blocking is not an option, but one can use # inlineCallbacks if you need to chain calls. return maybeDeferred(f, *v, **k) else: # Reactor running in a different thread. if block: return twisted.internet.threads.blockingCallFromThread( reactor, f, *v, **k) else: return reactor.callFromThread(f, *v, **k) else: # Reactor is not running. We must call the function # later. In this case, we must return a deferred return reactor.callWhenRunning(f, *v, **k) return threadsafe_f
[docs]class TwistedThread(threading.Thread): """Python Thread that runs the twisted reactor.""" _run = False
[docs] def __init__(self): """Start the thread as a daemon (meaning that it will not block the closing of the process when the main thread executes.""" if self._run: raise TwistedError("Reactor has already been run " "and restarts are not yet supported.") threading.Thread.__init__(self) self.setDaemon(True) self.start()
[docs] def run(self): """Run the reactor without signal handlers. (Signals go to the main thread.)""" self.__class__._run = True reactor.run(installSignalHandlers=0)
[docs] def stop(self): """Stop the reactor and join the thread... i.e. block until the reactor stops.""" reactor.stop() #self.join()
[docs]class RIDict(dict): """Implements a remote IDict interface that connects to a specified server."""
[docs] def __init__(self, host='localhost', port=_port): global reactor self.lock = threading.RLock() self.host = host self.port = port self.logger = _logging.getLogger(self.__class__.__name__) self._update_condition = MyCondition() self._client_creator = \ twisted.internet.protocol.ClientCreator( reactor, ClientProtocol) self.callbacks = MySet() self.connected = False self.queue = Queue.Queue() #@callTwisted(block=True)
[docs] def connect(self): """Try to connect to the specified host.""" deferred = self._client_creator.connectTCP('localhost', _port) deferred.addCallbacks(self.onConnect, self.onFail) return deferred
[docs] def onConnect(self, protocol): self.logger.info("Connected.") protocol.callbacks.update(self.callbacks) self.callbacks = protocol.callbacks self.callbacks.add(self.onUpdate) self._protocol = protocol self.connected = True
[docs] def onFail(self, failure): self.logger.error("Connection failed: %s"% failure.getErrorMessage()) raise SocketError(failure.getErrorMessage())
[docs] def onUpdate(self, idict): self.logger.debug("Updating...") self.update(idict) self.queue.put(idict) self.pds = self._protocol.pds self.rrr = self._protocol.rrr with self._update_condition: print "notify of iteration %i"%self['iter'] self._update_condition.notifyAll()
@callTwisted
[docs] def subscribe(self, skip=0, delay=0): """Subscribe for update notification from the server. skip: Skip this many updates. delay: Minimum time delay between updates. (Could be longer if the sever does not have any updates.) """ self._protocol.subscribe(skip, delay)
[docs] def wait_for_update(self, timeout=None): """Block until an update is received.""" with self._update_condition: self._update_condition.wait(timeout)
[docs]def run_reactor_as_thread(): global _reactor_thread if _reactor_thread: _logger.error("Reactor thread has already been run." + "Restarts are not supported.") else: _reactor_thread = TwistedThread()
[docs]def run_reactor(): reactor.run()
[docs]def stop_reactor(): global _reactor global _reactor_thread if _reactor_thread: _reactor_thread.stop() _reactor_thread = None else: reactor.stop()