| """Lowest-common-denominator implementations of platform functionality.""" |
| from __future__ import absolute_import, division, with_statement |
| |
| import errno |
| import socket |
| |
| from tornado.platform import interface |
| from tornado.util import b |
| |
| |
| class Waker(interface.Waker): |
| """Create an OS independent asynchronous pipe. |
| |
| For use on platforms that don't have os.pipe() (or where pipes cannot |
| be passed to select()), but do have sockets. This includes Windows |
| and Jython. |
| """ |
| def __init__(self): |
| # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py |
| |
| self.writer = socket.socket() |
| # Disable buffering -- pulling the trigger sends 1 byte, |
| # and we want that sent immediately, to wake up ASAP. |
| self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| |
| count = 0 |
| while 1: |
| count += 1 |
| # Bind to a local port; for efficiency, let the OS pick |
| # a free port for us. |
| # Unfortunately, stress tests showed that we may not |
| # be able to connect to that port ("Address already in |
| # use") despite that the OS picked it. This appears |
| # to be a race bug in the Windows socket implementation. |
| # So we loop until a connect() succeeds (almost always |
| # on the first try). See the long thread at |
| # http://mail.zope.org/pipermail/zope/2005-July/160433.html |
| # for hideous details. |
| a = socket.socket() |
| a.bind(("127.0.0.1", 0)) |
| a.listen(1) |
| connect_address = a.getsockname() # assigned (host, port) pair |
| try: |
| self.writer.connect(connect_address) |
| break # success |
| except socket.error, detail: |
| if (not hasattr(errno, 'WSAEADDRINUSE') or |
| detail[0] != errno.WSAEADDRINUSE): |
| # "Address already in use" is the only error |
| # I've seen on two WinXP Pro SP2 boxes, under |
| # Pythons 2.3.5 and 2.4.1. |
| raise |
| # (10048, 'Address already in use') |
| # assert count <= 2 # never triggered in Tim's tests |
| if count >= 10: # I've never seen it go above 2 |
| a.close() |
| self.writer.close() |
| raise socket.error("Cannot bind trigger!") |
| # Close `a` and try again. Note: I originally put a short |
| # sleep() here, but it didn't appear to help or hurt. |
| a.close() |
| |
| self.reader, addr = a.accept() |
| self.reader.setblocking(0) |
| self.writer.setblocking(0) |
| a.close() |
| self.reader_fd = self.reader.fileno() |
| |
| def fileno(self): |
| return self.reader.fileno() |
| |
| def wake(self): |
| try: |
| self.writer.send(b("x")) |
| except (IOError, socket.error): |
| pass |
| |
| def consume(self): |
| try: |
| while True: |
| result = self.reader.recv(1024) |
| if not result: |
| break |
| except (IOError, socket.error): |
| pass |
| |
| def close(self): |
| self.reader.close() |
| self.writer.close() |