"""Some utilities to supplement twisted."""
from __future__ import with_statement
__all__ = ['callTwisted', 'TwistedThread', 'RIDict', 'run_reactor_as_thread',
           'run_reactor', 'stop_reactor']
from twisted.internet import reactor
import twisted.internet
import twisted.internet.threads
import threading
import Queue
import logger as _logging
from server import _port        # Default port used by server.
from utils import MySet, hasargs
_reactor_thread = None
if not hasattr(twisted.internet.threads, 'blockingCallFromThread'):
    # Include function from newest version of twisted...
    def blockingCallFromThread(reactor, f, *a, **kw):
        from twisted.python import failure
        queue = Queue.Queue()
        def _callFromThread():
            result = twisted.internet.defer.maybeDeferred(f, *a, **kw)
            result.addBoth(queue.put)
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            result.raiseException()
        return result
    twisted.internet.threads.blockingCallFromThread =\
        blockingCallFromThread
    del blockingCallFromThread
@hasargs
[docs]def callTwisted(f, delay=0, block=False):
    """Decorate a function to be called from twisted.  The function is
    expected to return a value, not a deferred.
    The function will do several things:
    1) If the twisted reactor is not running, it will register itself
       to be run at the startup and then will return a deferred.
    2) If the twisted reactor is running from the current thread, it
       calls the reactor (possibly with a delay).
    3) If the twisted reactor is running from another thread, it calls
       the reactor using the callFromThread method.
    Parameters:
    delay: Delay the call (only if block is False and the reactor is
           running).
    block: If True and the reactor is running, then this will block
           and return the result of the function, otherwise it will
           return a deferred.
    If you are going to call something that the twisted reactor needs
    to deal with, you should decorate with this.  This will ensure
    that if the current thread is not the reactor thread, then the
    function will be called with reactor.callFromThread.  Examples
    include calls to:
    - transport.write()
    - transport.loseConnection()
    """
    def threadsafe_f(*v, **k):
        maybeDeferred = twisted.internet.defer.maybeDeferred
        current_thread = threading.currentThread()
        if reactor.running:
            if (current_thread is _reactor_thread
                or _reactor_thread is None):
                # Reactor running in main thread.  We return a
                # deferred here and just call the function directly.
                # Blocking is not an option, but one can use
                # inlineCallbacks if you need to chain calls.
                return maybeDeferred(f, *v, **k)
            else:
                # Reactor running in a different thread.
                if block:
                    return twisted.internet.threads.blockingCallFromThread(
                        reactor, f, *v, **k)
                else:
                    return reactor.callFromThread(f, *v, **k)
        else:
            # Reactor is not running.  We must call the function
            # later.  In this case, we must return a deferred
            return reactor.callWhenRunning(f, *v, **k)
    return threadsafe_f
 
[docs]class TwistedThread(threading.Thread):
    """Python Thread that runs the twisted reactor."""
    _run = False
[docs]    def __init__(self):
        """Start the thread as a daemon (meaning that it will not
        block the closing of the process when the main thread
        executes."""
        if self._run:
            raise TwistedError("Reactor has already been run "
                               "and restarts are not yet supported.")
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.start()
 
[docs]    def run(self):
        """Run the reactor without signal handlers.  (Signals go to
        the main thread.)"""
        self.__class__._run = True
        reactor.run(installSignalHandlers=0)
 
[docs]    def stop(self):
        """Stop the reactor and join the thread... i.e. block until
        the reactor stops."""
        reactor.stop()
        #self.join()
  
[docs]class RIDict(dict):
    """Implements a remote IDict interface that connects to a
    specified server."""
[docs]    def __init__(self, host='localhost', port=_port):
        global reactor
        self.lock = threading.RLock()
        self.host = host
        self.port = port
        self.logger = _logging.getLogger(self.__class__.__name__)
        self._update_condition = MyCondition()
        self._client_creator = \
            
twisted.internet.protocol.ClientCreator(
            reactor, ClientProtocol)
        self.callbacks = MySet()
        self.connected = False
        self.queue = Queue.Queue()
    #@callTwisted(block=True) 
[docs]    def connect(self):
        """Try to connect to the specified host."""
        deferred = self._client_creator.connectTCP('localhost', _port)
        deferred.addCallbacks(self.onConnect, self.onFail)
        return deferred
 
[docs]    def onConnect(self, protocol):
        self.logger.info("Connected.")
        protocol.callbacks.update(self.callbacks)
        self.callbacks = protocol.callbacks
        self.callbacks.add(self.onUpdate)
        self._protocol = protocol
        self.connected = True
 
[docs]    def onFail(self, failure):
        self.logger.error("Connection failed: %s"%
                          failure.getErrorMessage())
        raise SocketError(failure.getErrorMessage())
 
[docs]    def onUpdate(self, idict):
        self.logger.debug("Updating...")
        self.update(idict)
        self.queue.put(idict)
        self.pds = self._protocol.pds
        self.rrr = self._protocol.rrr
        with self._update_condition:
            print "notify of iteration %i"%self['iter']
            self._update_condition.notifyAll()
 
    @callTwisted
[docs]    def subscribe(self, skip=0, delay=0):
        """Subscribe for update notification from the server.
        
        skip:  Skip this many updates.
        delay: Minimum time delay between updates.  (Could be longer
               if the sever does not have any updates.)
        """
        self._protocol.subscribe(skip, delay)
 
[docs]    def wait_for_update(self, timeout=None):
        """Block until an update is received."""
        with self._update_condition:
            self._update_condition.wait(timeout)
  
[docs]def run_reactor_as_thread():
    global _reactor_thread
    if _reactor_thread:
        _logger.error("Reactor thread has already been run." +
                      "Restarts are not supported.")
    else:
        _reactor_thread = TwistedThread()
 
[docs]def run_reactor():
    reactor.run()
 
[docs]def stop_reactor():
    global _reactor
    global _reactor_thread
    if _reactor_thread:
        _reactor_thread.stop()
        _reactor_thread = None
    else:
        reactor.stop()