| # Author: Ovidiu Predescu |
| # Date: July 2011 |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| # Note: This module's docs are not currently extracted automatically, |
| # so changes must be made manually to twisted.rst |
| # TODO: refactor doc build process to use an appropriate virtualenv |
| """A Twisted reactor built on the Tornado IOLoop. |
| |
| This module lets you run applications and libraries written for |
| Twisted in a Tornado application. To use it, simply call `install` at |
| the beginning of the application:: |
| |
| import tornado.platform.twisted |
| tornado.platform.twisted.install() |
| from twisted.internet import reactor |
| |
| When the app is ready to start, call `IOLoop.instance().start()` |
| instead of `reactor.run()`. This will allow you to use a mixture of |
| Twisted and Tornado code in the same process. |
| |
| It is also possible to create a non-global reactor by calling |
| `tornado.platform.twisted.TornadoReactor(io_loop)`. However, if |
| the `IOLoop` and reactor are to be short-lived (such as those used in |
| unit tests), additional cleanup may be required. Specifically, it is |
| recommended to call:: |
| |
| reactor.fireSystemEvent('shutdown') |
| reactor.disconnectAll() |
| |
| before closing the `IOLoop`. |
| |
| This module has been tested with Twisted versions 11.0.0 and 11.1.0. |
| """ |
| |
| from __future__ import with_statement, absolute_import |
| |
| import functools |
| import logging |
| import time |
| |
| from twisted.internet.posixbase import PosixReactorBase |
| from twisted.internet.interfaces import \ |
| IReactorFDSet, IDelayedCall, IReactorTime |
| from twisted.python import failure, log |
| from twisted.internet import error |
| |
| from zope.interface import implements |
| |
| import tornado |
| import tornado.ioloop |
| from tornado.stack_context import NullContext |
| from tornado.ioloop import IOLoop |
| |
| |
| class TornadoDelayedCall(object): |
| """DelayedCall object for Tornado.""" |
| implements(IDelayedCall) |
| |
| def __init__(self, reactor, seconds, f, *args, **kw): |
| self._reactor = reactor |
| self._func = functools.partial(f, *args, **kw) |
| self._time = self._reactor.seconds() + seconds |
| self._timeout = self._reactor._io_loop.add_timeout(self._time, |
| self._called) |
| self._active = True |
| |
| def _called(self): |
| self._active = False |
| self._reactor._removeDelayedCall(self) |
| try: |
| self._func() |
| except: |
| logging.error("_called caught exception", exc_info=True) |
| |
| def getTime(self): |
| return self._time |
| |
| def cancel(self): |
| self._active = False |
| self._reactor._io_loop.remove_timeout(self._timeout) |
| self._reactor._removeDelayedCall(self) |
| |
| def delay(self, seconds): |
| self._reactor._io_loop.remove_timeout(self._timeout) |
| self._time += seconds |
| self._timeout = self._reactor._io_loop.add_timeout(self._time, |
| self._called) |
| |
| def reset(self, seconds): |
| self._reactor._io_loop.remove_timeout(self._timeout) |
| self._time = self._reactor.seconds() + seconds |
| self._timeout = self._reactor._io_loop.add_timeout(self._time, |
| self._called) |
| |
| def active(self): |
| return self._active |
| |
| class TornadoReactor(PosixReactorBase): |
| """Twisted reactor built on the Tornado IOLoop. |
| |
| Since it is intented to be used in applications where the top-level |
| event loop is ``io_loop.start()`` rather than ``reactor.run()``, |
| it is implemented a little differently than other Twisted reactors. |
| We override `mainLoop` instead of `doIteration` and must implement |
| timed call functionality on top of `IOLoop.add_timeout` rather than |
| using the implementation in `PosixReactorBase`. |
| """ |
| implements(IReactorTime, IReactorFDSet) |
| |
| def __init__(self, io_loop=None): |
| if not io_loop: |
| io_loop = tornado.ioloop.IOLoop.instance() |
| self._io_loop = io_loop |
| self._readers = {} # map of reader objects to fd |
| self._writers = {} # map of writer objects to fd |
| self._fds = {} # a map of fd to a (reader, writer) tuple |
| self._delayedCalls = {} |
| PosixReactorBase.__init__(self) |
| |
| # IOLoop.start() bypasses some of the reactor initialization. |
| # Fire off the necessary events if they weren't already triggered |
| # by reactor.run(). |
| def start_if_necessary(): |
| if not self._started: |
| self.fireSystemEvent('startup') |
| self._io_loop.add_callback(start_if_necessary) |
| |
| # IReactorTime |
| def seconds(self): |
| return time.time() |
| |
| def callLater(self, seconds, f, *args, **kw): |
| dc = TornadoDelayedCall(self, seconds, f, *args, **kw) |
| self._delayedCalls[dc] = True |
| return dc |
| |
| def getDelayedCalls(self): |
| return [x for x in self._delayedCalls if x._active] |
| |
| def _removeDelayedCall(self, dc): |
| if dc in self._delayedCalls: |
| del self._delayedCalls[dc] |
| |
| # IReactorThreads |
| def callFromThread(self, f, *args, **kw): |
| """See `twisted.internet.interfaces.IReactorThreads.callFromThread`""" |
| assert callable(f), "%s is not callable" % f |
| p = functools.partial(f, *args, **kw) |
| self._io_loop.add_callback(p) |
| |
| # We don't need the waker code from the super class, Tornado uses |
| # its own waker. |
| def installWaker(self): |
| pass |
| |
| def wakeUp(self): |
| pass |
| |
| # IReactorFDSet |
| def _invoke_callback(self, fd, events): |
| (reader, writer) = self._fds[fd] |
| if reader: |
| err = None |
| if reader.fileno() == -1: |
| err = error.ConnectionLost() |
| elif events & IOLoop.READ: |
| err = log.callWithLogger(reader, reader.doRead) |
| if err is None and events & IOLoop.ERROR: |
| err = error.ConnectionLost() |
| if err is not None: |
| self.removeReader(reader) |
| reader.readConnectionLost(failure.Failure(err)) |
| if writer: |
| err = None |
| if writer.fileno() == -1: |
| err = error.ConnectionLost() |
| elif events & IOLoop.WRITE: |
| err = log.callWithLogger(writer, writer.doWrite) |
| if err is None and events & IOLoop.ERROR: |
| err = error.ConnectionLost() |
| if err is not None: |
| self.removeWriter(writer) |
| writer.writeConnectionLost(failure.Failure(err)) |
| |
| def addReader(self, reader): |
| """Add a FileDescriptor for notification of data available to read.""" |
| if reader in self._readers: |
| # Don't add the reader if it's already there |
| return |
| fd = reader.fileno() |
| self._readers[reader] = fd |
| if fd in self._fds: |
| (_, writer) = self._fds[fd] |
| self._fds[fd] = (reader, writer) |
| if writer: |
| # We already registered this fd for write events, |
| # update it for read events as well. |
| self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) |
| else: |
| with NullContext(): |
| self._fds[fd] = (reader, None) |
| self._io_loop.add_handler(fd, self._invoke_callback, |
| IOLoop.READ) |
| |
| def addWriter(self, writer): |
| """Add a FileDescriptor for notification of data available to write.""" |
| if writer in self._writers: |
| return |
| fd = writer.fileno() |
| self._writers[writer] = fd |
| if fd in self._fds: |
| (reader, _) = self._fds[fd] |
| self._fds[fd] = (reader, writer) |
| if reader: |
| # We already registered this fd for read events, |
| # update it for write events as well. |
| self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) |
| else: |
| with NullContext(): |
| self._fds[fd] = (None, writer) |
| self._io_loop.add_handler(fd, self._invoke_callback, |
| IOLoop.WRITE) |
| |
| def removeReader(self, reader): |
| """Remove a Selectable for notification of data available to read.""" |
| if reader in self._readers: |
| fd = self._readers.pop(reader) |
| (_, writer) = self._fds[fd] |
| if writer: |
| # We have a writer so we need to update the IOLoop for |
| # write events only. |
| self._fds[fd] = (None, writer) |
| self._io_loop.update_handler(fd, IOLoop.WRITE) |
| else: |
| # Since we have no writer registered, we remove the |
| # entry from _fds and unregister the handler from the |
| # IOLoop |
| del self._fds[fd] |
| self._io_loop.remove_handler(fd) |
| |
| def removeWriter(self, writer): |
| """Remove a Selectable for notification of data available to write.""" |
| if writer in self._writers: |
| fd = self._writers.pop(writer) |
| (reader, _) = self._fds[fd] |
| if reader: |
| # We have a reader so we need to update the IOLoop for |
| # read events only. |
| self._fds[fd] = (reader, None) |
| self._io_loop.update_handler(fd, IOLoop.READ) |
| else: |
| # Since we have no reader registered, we remove the |
| # entry from the _fds and unregister the handler from |
| # the IOLoop. |
| del self._fds[fd] |
| self._io_loop.remove_handler(fd) |
| |
| def removeAll(self): |
| return self._removeAll(self._readers, self._writers) |
| |
| def getReaders(self): |
| return self._readers.keys() |
| |
| def getWriters(self): |
| return self._writers.keys() |
| |
| # The following functions are mainly used in twisted-style test cases; |
| # it is expected that most users of the TornadoReactor will call |
| # IOLoop.start() instead of Reactor.run(). |
| def stop(self): |
| PosixReactorBase.stop(self) |
| self._io_loop.stop() |
| |
| def crash(self): |
| PosixReactorBase.crash(self) |
| self._io_loop.stop() |
| |
| def doIteration(self, delay): |
| raise NotImplementedError("doIteration") |
| |
| def mainLoop(self): |
| self._io_loop.start() |
| if self._stopped: |
| self.fireSystemEvent("shutdown") |
| |
| class _TestReactor(TornadoReactor): |
| """Subclass of TornadoReactor for use in unittests. |
| |
| This can't go in the test.py file because of import-order dependencies |
| with the Twisted reactor test builder. |
| """ |
| def __init__(self): |
| # always use a new ioloop |
| super(_TestReactor, self).__init__(IOLoop()) |
| |
| def listenTCP(self, port, factory, backlog=50, interface=''): |
| # default to localhost to avoid firewall prompts on the mac |
| if not interface: |
| interface = '127.0.0.1' |
| return super(_TestReactor, self).listenTCP( |
| port, factory, backlog=backlog, interface=interface) |
| |
| def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): |
| if not interface: |
| interface = '127.0.0.1' |
| return super(_TestReactor, self).listenUDP( |
| port, protocol, interface=interface, maxPacketSize=maxPacketSize) |
| |
| |
| |
| def install(io_loop=None): |
| """Install this package as the default Twisted reactor.""" |
| if not io_loop: |
| io_loop = tornado.ioloop.IOLoop.instance() |
| reactor = TornadoReactor(io_loop) |
| from twisted.internet.main import installReactor |
| installReactor(reactor) |
| return reactor |