| #!/usr/bin/env python |
| # |
| # Copyright 2011 Facebook |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| """Miscellaneous network utility code.""" |
| |
| import errno |
| import logging |
| import os |
| import socket |
| import stat |
| |
| from tornado import process |
| from tornado.ioloop import IOLoop |
| from tornado.iostream import IOStream, SSLIOStream |
| from tornado.platform.auto import set_close_exec |
| |
| try: |
| import ssl # Python 2.6+ |
| except ImportError: |
| ssl = None |
| |
| class TCPServer(object): |
| r"""A non-blocking, single-threaded TCP server. |
| |
| To use `TCPServer`, define a subclass which overrides the `handle_stream` |
| method. |
| |
| `TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL. |
| To make this server serve SSL traffic, send the ssl_options dictionary |
| argument with the arguments required for the `ssl.wrap_socket` method, |
| including "certfile" and "keyfile":: |
| |
| TCPServer(ssl_options={ |
| "certfile": os.path.join(data_dir, "mydomain.crt"), |
| "keyfile": os.path.join(data_dir, "mydomain.key"), |
| }) |
| |
| `TCPServer` initialization follows one of three patterns: |
| |
| 1. `listen`: simple single-process:: |
| |
| server = TCPServer() |
| server.listen(8888) |
| IOLoop.instance().start() |
| |
| 2. `bind`/`start`: simple multi-process:: |
| |
| server = TCPServer() |
| server.bind(8888) |
| server.start(0) # Forks multiple sub-processes |
| IOLoop.instance().start() |
| |
| When using this interface, an `IOLoop` must *not* be passed |
| to the `TCPServer` constructor. `start` will always start |
| the server on the default singleton `IOLoop`. |
| |
| 3. `add_sockets`: advanced multi-process:: |
| |
| sockets = bind_sockets(8888) |
| tornado.process.fork_processes(0) |
| server = TCPServer() |
| server.add_sockets(sockets) |
| IOLoop.instance().start() |
| |
| The `add_sockets` interface is more complicated, but it can be |
| used with `tornado.process.fork_processes` to give you more |
| flexibility in when the fork happens. `add_sockets` can |
| also be used in single-process servers if you want to create |
| your listening sockets in some way other than |
| `bind_sockets`. |
| """ |
| def __init__(self, io_loop=None, ssl_options=None): |
| self.io_loop = io_loop |
| self.ssl_options = ssl_options |
| self._sockets = {} # fd -> socket object |
| self._pending_sockets = [] |
| self._started = False |
| |
| def listen(self, port, address=""): |
| """Starts accepting connections on the given port. |
| |
| This method may be called more than once to listen on multiple ports. |
| `listen` takes effect immediately; it is not necessary to call |
| `TCPServer.start` afterwards. It is, however, necessary to start |
| the `IOLoop`. |
| """ |
| sockets = bind_sockets(port, address=address) |
| self.add_sockets(sockets) |
| |
| def add_sockets(self, sockets): |
| """Makes this server start accepting connections on the given sockets. |
| |
| The ``sockets`` parameter is a list of socket objects such as |
| those returned by `bind_sockets`. |
| `add_sockets` is typically used in combination with that |
| method and `tornado.process.fork_processes` to provide greater |
| control over the initialization of a multi-process server. |
| """ |
| if self.io_loop is None: |
| self.io_loop = IOLoop.instance() |
| |
| for sock in sockets: |
| self._sockets[sock.fileno()] = sock |
| add_accept_handler(sock, self._handle_connection, |
| io_loop=self.io_loop) |
| |
| def add_socket(self, socket): |
| """Singular version of `add_sockets`. Takes a single socket object.""" |
| self.add_sockets([socket]) |
| |
| def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): |
| """Binds this server to the given port on the given address. |
| |
| To start the server, call `start`. If you want to run this server |
| in a single process, you can call `listen` as a shortcut to the |
| sequence of `bind` and `start` calls. |
| |
| Address may be either an IP address or hostname. If it's a hostname, |
| the server will listen on all IP addresses associated with the |
| name. Address may be an empty string or None to listen on all |
| available interfaces. Family may be set to either ``socket.AF_INET`` |
| or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise |
| both will be used if available. |
| |
| The ``backlog`` argument has the same meaning as for |
| `socket.listen`. |
| |
| This method may be called multiple times prior to `start` to listen |
| on multiple ports or interfaces. |
| """ |
| sockets = bind_sockets(port, address=address, family=family, |
| backlog=backlog) |
| if self._started: |
| self.add_sockets(sockets) |
| else: |
| self._pending_sockets.extend(sockets) |
| |
| def start(self, num_processes=1): |
| """Starts this server in the IOLoop. |
| |
| By default, we run the server in this process and do not fork any |
| additional child process. |
| |
| If num_processes is ``None`` or <= 0, we detect the number of cores |
| available on this machine and fork that number of child |
| processes. If num_processes is given and > 1, we fork that |
| specific number of sub-processes. |
| |
| Since we use processes and not threads, there is no shared memory |
| between any server code. |
| |
| Note that multiple processes are not compatible with the autoreload |
| module (or the ``debug=True`` option to `tornado.web.Application`). |
| When using multiple processes, no IOLoops can be created or |
| referenced until after the call to ``TCPServer.start(n)``. |
| """ |
| assert not self._started |
| self._started = True |
| if num_processes != 1: |
| process.fork_processes(num_processes) |
| sockets = self._pending_sockets |
| self._pending_sockets = [] |
| self.add_sockets(sockets) |
| |
| def stop(self): |
| """Stops listening for new connections. |
| |
| Requests currently in progress may still continue after the |
| server is stopped. |
| """ |
| for fd, sock in self._sockets.iteritems(): |
| self.io_loop.remove_handler(fd) |
| sock.close() |
| |
| def handle_stream(self, stream, address): |
| """Override to handle a new `IOStream` from an incoming connection.""" |
| raise NotImplementedError() |
| |
| def _handle_connection(self, connection, address): |
| if self.ssl_options is not None: |
| assert ssl, "Python 2.6+ and OpenSSL required for SSL" |
| try: |
| connection = ssl.wrap_socket(connection, |
| server_side=True, |
| do_handshake_on_connect=False, |
| **self.ssl_options) |
| except ssl.SSLError, err: |
| if err.args[0] == ssl.SSL_ERROR_EOF: |
| return connection.close() |
| else: |
| raise |
| except socket.error, err: |
| if err.args[0] == errno.ECONNABORTED: |
| return connection.close() |
| else: |
| raise |
| try: |
| if self.ssl_options is not None: |
| stream = SSLIOStream(connection, io_loop=self.io_loop) |
| else: |
| stream = IOStream(connection, io_loop=self.io_loop) |
| self.handle_stream(stream, address) |
| except Exception: |
| logging.error("Error in connection callback", exc_info=True) |
| |
| |
| def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128): |
| """Creates listening sockets bound to the given port and address. |
| |
| Returns a list of socket objects (multiple sockets are returned if |
| the given address maps to multiple IP addresses, which is most common |
| for mixed IPv4 and IPv6 use). |
| |
| Address may be either an IP address or hostname. If it's a hostname, |
| the server will listen on all IP addresses associated with the |
| name. Address may be an empty string or None to listen on all |
| available interfaces. Family may be set to either socket.AF_INET |
| or socket.AF_INET6 to restrict to ipv4 or ipv6 addresses, otherwise |
| both will be used if available. |
| |
| The ``backlog`` argument has the same meaning as for |
| ``socket.listen()``. |
| """ |
| sockets = [] |
| if address == "": |
| address = None |
| flags = socket.AI_PASSIVE |
| if hasattr(socket, "AI_ADDRCONFIG"): |
| # AI_ADDRCONFIG ensures that we only try to bind on ipv6 |
| # if the system is configured for it, but the flag doesn't |
| # exist on some platforms (specifically WinXP, although |
| # newer versions of windows have it) |
| flags |= socket.AI_ADDRCONFIG |
| for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, |
| 0, flags)): |
| af, socktype, proto, canonname, sockaddr = res |
| sock = socket.socket(af, socktype, proto) |
| set_close_exec(sock.fileno()) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| if af == socket.AF_INET6: |
| # On linux, ipv6 sockets accept ipv4 too by default, |
| # but this makes it impossible to bind to both |
| # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, |
| # separate sockets *must* be used to listen for both ipv4 |
| # and ipv6. For consistency, always disable ipv4 on our |
| # ipv6 sockets and use a separate ipv4 socket when needed. |
| # |
| # Python 2.x on windows doesn't have IPPROTO_IPV6. |
| if hasattr(socket, "IPPROTO_IPV6"): |
| sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) |
| sock.setblocking(0) |
| sock.bind(sockaddr) |
| sock.listen(backlog) |
| sockets.append(sock) |
| return sockets |
| |
| if hasattr(socket, 'AF_UNIX'): |
| def bind_unix_socket(file, mode=0600, backlog=128): |
| """Creates a listening unix socket. |
| |
| If a socket with the given name already exists, it will be deleted. |
| If any other file with that name exists, an exception will be |
| raised. |
| |
| Returns a socket object (not a list of socket objects like |
| `bind_sockets`) |
| """ |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| set_close_exec(sock.fileno()) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.setblocking(0) |
| try: |
| st = os.stat(file) |
| except OSError, err: |
| if err.errno != errno.ENOENT: |
| raise |
| else: |
| if stat.S_ISSOCK(st.st_mode): |
| os.remove(file) |
| else: |
| raise ValueError("File %s exists and is not a socket", file) |
| sock.bind(file) |
| os.chmod(file, mode) |
| sock.listen(backlog) |
| return sock |
| |
| def add_accept_handler(sock, callback, io_loop=None): |
| """Adds an ``IOLoop`` event handler to accept new connections on ``sock``. |
| |
| When a connection is accepted, ``callback(connection, address)`` will |
| be run (``connection`` is a socket object, and ``address`` is the |
| address of the other end of the connection). Note that this signature |
| is different from the ``callback(fd, events)`` signature used for |
| ``IOLoop`` handlers. |
| """ |
| if io_loop is None: |
| io_loop = IOLoop.instance() |
| def accept_handler(fd, events): |
| while True: |
| try: |
| connection, address = sock.accept() |
| except socket.error, e: |
| if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): |
| return |
| raise |
| callback(connection, address) |
| io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ) |