From 6f9a8f9a3d38b0a17091cfef9c8ecb9731a3cb0d Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Sat, 4 May 2019 22:01:50 +0100 Subject: [PATCH 01/17] Move code to create connection objects from sockets into separate module. --- cheroot/connections.py | 135 ++++++++++++++++++++++++++++++++++++++ cheroot/server.py | 143 ++++------------------------------------- 2 files changed, 148 insertions(+), 130 deletions(-) create mode 100644 cheroot/connections.py diff --git a/cheroot/connections.py b/cheroot/connections.py new file mode 100644 index 0000000000..4afe40ea2d --- /dev/null +++ b/cheroot/connections.py @@ -0,0 +1,135 @@ +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import io +import six +import socket + +from . import errors +from .makefile import MakeFile + + +try: + import fcntl +except ImportError: + try: + from ctypes import windll, WinError + import ctypes.wintypes + _SetHandleInformation = windll.kernel32.SetHandleInformation + _SetHandleInformation.argtypes = [ + ctypes.wintypes.HANDLE, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ] + _SetHandleInformation.restype = ctypes.wintypes.BOOL + except ImportError: + def prevent_socket_inheritance(sock): + """Stub inheritance prevention. + + Dummy function, since neither fcntl nor ctypes are available. + """ + pass + else: + def prevent_socket_inheritance(sock): + """Mark the given socket fd as non-inheritable (Windows).""" + if not _SetHandleInformation(sock.fileno(), 1, 0): + raise WinError() +else: + def prevent_socket_inheritance(sock): + """Mark the given socket fd as non-inheritable (POSIX).""" + fd = sock.fileno() + old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) + fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) + + +class ConnectionManager: + + def __init__(self, server): + self.server = server + + def from_server_socket(self, server_socket): + try: + s, addr = server_socket.accept() + if self.server.stats['Enabled']: + self.server.stats['Accepts'] += 1 + prevent_socket_inheritance(s) + if hasattr(s, 'settimeout'): + s.settimeout(self.server.timeout) + + mf = MakeFile + ssl_env = {} + # if ssl cert and key are set, we try to be a secure HTTP server + if self.server.ssl_adapter is not None: + try: + s, ssl_env = self.server.ssl_adapter.wrap(s) + except errors.NoSSLError: + msg = ( + 'The client sent a plain HTTP request, but ' + 'this server only speaks HTTPS on this port.' + ) + buf = [ + '%s 400 Bad Request\r\n' % self.server.protocol, + 'Content-Length: %s\r\n' % len(msg), + 'Content-Type: text/plain\r\n\r\n', + msg, + ] + + sock_to_make = s if not six.PY2 else s._sock + wfile = mf(sock_to_make, 'wb', io.DEFAULT_BUFFER_SIZE) + try: + wfile.write(''.join(buf).encode('ISO-8859-1')) + except socket.error as ex: + if ex.args[0] not in errors.socket_errors_to_ignore: + raise + return + if not s: + return + mf = self.server.ssl_adapter.makefile + # Re-apply our timeout since we may have a new socket object + if hasattr(s, 'settimeout'): + s.settimeout(self.server.timeout) + + conn = self.server.ConnectionClass(self.server, s, mf) + + if not isinstance(self.server.bind_addr, six.string_types): + # optional values + # Until we do DNS lookups, omit REMOTE_HOST + if addr is None: # sometimes this can happen + # figure out if AF_INET or AF_INET6. + if len(s.getsockname()) == 2: + # AF_INET + addr = ('0.0.0.0', 0) + else: + # AF_INET6 + addr = ('::', 0) + conn.remote_addr = addr[0] + conn.remote_port = addr[1] + + conn.ssl_env = ssl_env + return conn + + except socket.timeout: + # The only reason for the timeout in start() is so we can + # notice keyboard interrupts on Win32, which don't interrupt + # accept() by default + return + except socket.error as ex: + if self.server.stats['Enabled']: + self.server.stats['Socket Errors'] += 1 + if ex.args[0] in errors.socket_error_eintr: + # I *think* this is right. EINTR should occur when a signal + # is received during the accept() call; all docs say retry + # the call, and I *think* I'm reading it right that Python + # will then go ahead and poll for and handle the signal + # elsewhere. See + # https://github.com/cherrypy/cherrypy/issues/707. + return + if ex.args[0] in errors.socket_errors_nonblocking: + # Just try again. See + # https://github.com/cherrypy/cherrypy/issues/479. + return + if ex.args[0] in errors.socket_errors_to_ignore: + # Our socket was closed. + # See https://github.com/cherrypy/cherrypy/issues/686. + return + raise diff --git a/cheroot/server.py b/cheroot/server.py index 3c00164975..8477de9ea2 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -84,7 +84,7 @@ from six.moves import queue from six.moves import urllib -from . import errors, __version__ +from . import connections, errors, __version__ from ._compat import bton, ntou from ._compat import IS_PPC from .workers import threadpool @@ -1474,39 +1474,6 @@ def _close_kernel_socket(self): self.socket._sock.close() -try: - import fcntl -except ImportError: - try: - from ctypes import windll, WinError - import ctypes.wintypes - _SetHandleInformation = windll.kernel32.SetHandleInformation - _SetHandleInformation.argtypes = [ - ctypes.wintypes.HANDLE, - ctypes.wintypes.DWORD, - ctypes.wintypes.DWORD, - ] - _SetHandleInformation.restype = ctypes.wintypes.BOOL - except ImportError: - def prevent_socket_inheritance(sock): - """Stub inheritance prevention. - - Dummy function, since neither fcntl nor ctypes are available. - """ - pass - else: - def prevent_socket_inheritance(sock): - """Mark the given socket fd as non-inheritable (Windows).""" - if not _SetHandleInformation(sock.fileno(), 1, 0): - raise WinError() -else: - def prevent_socket_inheritance(sock): - """Mark the given socket fd as non-inheritable (POSIX).""" - fd = sock.fileno() - old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) - - class HTTPServer: """An HTTP server.""" @@ -1603,6 +1570,7 @@ def __init__( self.requests = threadpool.ThreadPool( self, min=minthreads or 1, max=maxthreads, ) + self.connections = connections.ConnectionManager(self) if not server_name: server_name = self.version @@ -1936,7 +1904,7 @@ def bind_unix_socket(self, bind_addr): def prepare_socket(bind_addr, family, type, proto, nodelay, ssl_adapter): """Create and prepare the socket object.""" sock = socket.socket(family, type, proto) - prevent_socket_inheritance(sock) + connections.prevent_socket_inheritance(sock) host, port = bind_addr[:2] IS_EPHEMERAL_PORT = port == 0 @@ -2012,102 +1980,17 @@ def resolve_real_bind_addr(socket_): def tick(self): """Accept a new connection and put it on the Queue.""" - try: - s, addr = self.socket.accept() - if self.stats['Enabled']: - self.stats['Accepts'] += 1 - if not self.ready: - return - - prevent_socket_inheritance(s) - if hasattr(s, 'settimeout'): - s.settimeout(self.timeout) - - mf = MakeFile - ssl_env = {} - # if ssl cert and key are set, we try to be a secure HTTP server - if self.ssl_adapter is not None: - try: - s, ssl_env = self.ssl_adapter.wrap(s) - except errors.NoSSLError: - msg = ( - 'The client sent a plain HTTP request, but ' - 'this server only speaks HTTPS on this port.' - ) - buf = [ - '%s 400 Bad Request\r\n' % self.protocol, - 'Content-Length: %s\r\n' % len(msg), - 'Content-Type: text/plain\r\n\r\n', - msg, - ] - - sock_to_make = s if not six.PY2 else s._sock - wfile = mf(sock_to_make, 'wb', io.DEFAULT_BUFFER_SIZE) - try: - wfile.write(''.join(buf).encode('ISO-8859-1')) - except socket.error as ex: - if ex.args[0] not in errors.socket_errors_to_ignore: - raise - return - if not s: - return - mf = self.ssl_adapter.makefile - # Re-apply our timeout since we may have a new socket object - if hasattr(s, 'settimeout'): - s.settimeout(self.timeout) - - conn = self.ConnectionClass(self, s, mf) - - if not isinstance( - self.bind_addr, - (six.text_type, six.binary_type), - ): - # optional values - # Until we do DNS lookups, omit REMOTE_HOST - if addr is None: # sometimes this can happen - # figure out if AF_INET or AF_INET6. - if len(s.getsockname()) == 2: - # AF_INET - addr = ('0.0.0.0', 0) - else: - # AF_INET6 - addr = ('::', 0) - conn.remote_addr = addr[0] - conn.remote_port = addr[1] - - conn.ssl_env = ssl_env - - try: - self.requests.put(conn) - except queue.Full: - # Just drop the conn. TODO: write 503 back? - conn.close() - return - except socket.timeout: - # The only reason for the timeout in start() is so we can - # notice keyboard interrupts on Win32, which don't interrupt - # accept() by default + if not self.ready: return - except socket.error as ex: - if self.stats['Enabled']: - self.stats['Socket Errors'] += 1 - if ex.args[0] in errors.socket_error_eintr: - # I *think* this is right. EINTR should occur when a signal - # is received during the accept() call; all docs say retry - # the call, and I *think* I'm reading it right that Python - # will then go ahead and poll for and handle the signal - # elsewhere. See - # https://github.com/cherrypy/cherrypy/issues/707. - return - if ex.args[0] in errors.socket_errors_nonblocking: - # Just try again. See - # https://github.com/cherrypy/cherrypy/issues/479. - return - if ex.args[0] in errors.socket_errors_to_ignore: - # Our socket was closed. - # See https://github.com/cherrypy/cherrypy/issues/686. - return - raise + conn = self.connections.from_server_socket(self.socket) + if not conn: + return + + try: + self.requests.put(conn) + except queue.Full: + # Just drop the conn. TODO: write 503 back? + conn.close() @property def interrupt(self): From 3df76a7675e2ced2b969536d37910ef4250fbcaf Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Mon, 6 May 2019 01:18:54 +0100 Subject: [PATCH 02/17] WorkerThreads now deal with a single request from a connection. If the connection is kept alive, it is returned back to a pool. --- cheroot/connections.py | 46 +++++++++++++++++++++- cheroot/server.py | 61 +++++++++++++++-------------- cheroot/test/test_conn.py | 73 +++++++++++++++++++++++++++++++++++ cheroot/workers/threadpool.py | 8 +++- 4 files changed, 156 insertions(+), 32 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 4afe40ea2d..713cf2383e 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -2,12 +2,16 @@ __metaclass__ = type import io +import select import six import socket +import time from . import errors from .makefile import MakeFile +from collections import OrderedDict + try: import fcntl @@ -46,8 +50,48 @@ class ConnectionManager: def __init__(self, server): self.server = server + self.connections = OrderedDict() + + def put(self, conn): + self.connections[conn] = time.time() + + def expire(self): + if not self.connections: + return + + # Check for too many open connections. + conns = list(self.connections.items()) + if self.server.keep_alive_conn_limit is not None: + [self.close(conn[0]) for conn in conns[:-self.server.keep_alive_conn_limit]] + conns = conns[-self.server.keep_alive_conn_limit:] + + # Check for old connections. + now = time.time() + for (conn, ctime) in conns: + # Oldest connection out of the currently available ones. + if (ctime + self.server.timeout) < now: + self.close(conn) + else: + break + + def get_conn(self, server_socket): + socket_dict = {s.socket.fileno(): s for s in self.connections} + socket_dict[server_socket.fileno()] = server_socket + rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) + if not rlist: + return None + conn = socket_dict[rlist[0]] + if conn is server_socket: + return self._from_server_socket(server_socket) + else: + del self.connections[conn] + return conn + + def close(self, conn): + del self.connections[conn] + conn.close() - def from_server_socket(self, server_socket): + def _from_server_socket(self, server_socket): try: s, addr = server_socket.accept() if self.server.stats['Enabled']: diff --git a/cheroot/server.py b/cheroot/server.py index 8477de9ea2..a62c227380 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1255,30 +1255,27 @@ def __init__(self, server, sock, makefile=MakeFile): ) def communicate(self): - """Read each request and respond appropriately.""" + """Read each request and respond appropriately. + + Returns true if the connection should be kept open.""" request_seen = False try: - while True: - # (re)set req to None so that if something goes wrong in - # the RequestHandlerClass constructor, the error doesn't - # get written to the previous request. - req = None - req = self.RequestHandlerClass(self.server, self) - - # This order of operations should guarantee correct pipelining. - req.parse_request() - if self.server.stats['Enabled']: - self.requests_seen += 1 - if not req.ready: - # Something went wrong in the parsing (and the server has - # probably already made a simple_response). Return and - # let the conn close. - return + req = self.RequestHandlerClass(self.server, self) + + # This order of operations should guarantee correct pipelining. + req.parse_request() + if self.server.stats['Enabled']: + self.requests_seen += 1 + if not req.ready: + # Something went wrong in the parsing (and the server has + # probably already made a simple_response). Return and + # let the conn close. + return - request_seen = True - req.respond() - if req.close_connection: - return + request_seen = True + req.respond() + if not req.close_connection: + return True except socket.error as ex: errnum = ex.args[0] # sadly SSL sockets return a different (longer) time out string @@ -1549,6 +1546,11 @@ class HTTPServer: peercreds_resolve_enabled = False """If True, username/group will be looked up in the OS from peercreds.""" + keep_alive_conn_limit = 10 + """The maximum number of waiting keep-alive connections that will be kept open. + + Default is 10. Set to None to have unlimited connections.""" + def __init__( self, bind_addr, gateway, minthreads=10, maxthreads=-1, server_name=None, @@ -1982,15 +1984,16 @@ def tick(self): """Accept a new connection and put it on the Queue.""" if not self.ready: return - conn = self.connections.from_server_socket(self.socket) - if not conn: - return - try: - self.requests.put(conn) - except queue.Full: - # Just drop the conn. TODO: write 503 back? - conn.close() + conn = self.connections.get_conn(self.socket) + if conn: + try: + self.requests.put(conn) + except queue.Full: + # Just drop the conn. TODO: write 503 back? + conn.close() + + self.connections.expire() @property def interrupt(self): diff --git a/cheroot/test/test_conn.py b/cheroot/test/test_conn.py index 960e718ab0..7c612661aa 100644 --- a/cheroot/test/test_conn.py +++ b/cheroot/test/test_conn.py @@ -116,6 +116,7 @@ def _timeout(req, resp): wsgi_server.max_request_body_size = 1001 wsgi_server.timeout = timeout wsgi_server.server_client = wsgi_server_client + wsgi_server.keep_alive_conn_limit = 2 return wsgi_server @@ -389,6 +390,78 @@ def test_keepalive(test_client, http_server_protocol): test_client.server_instance.protocol = original_server_protocol +def test_keepalive_conn_management(test_client): + """Test management of Keep-Alive connections.""" + + test_client.server_instance.timeout = 2 + + def connection(): + # Initialize a persistent HTTP connection + http_connection = test_client.get_connection() + http_connection.auto_open = False + http_connection.connect() + return http_connection + + def request(conn): + status_line, actual_headers, actual_resp_body = test_client.get( + '/page3', headers=[('Connection', 'Keep-Alive')], + http_conn=conn, protocol='HTTP/1.0', + ) + actual_status = int(status_line[:3]) + assert actual_status == 200 + assert status_line[4:] == 'OK' + assert actual_resp_body == pov.encode() + assert header_has_value('Connection', 'Keep-Alive', actual_headers) + + disconnect_errors = (http_client.BadStatusLine, http_client.NotConnected) + + # Make a new connection. + c1 = connection() + request(c1) + + # Make a second one. + c2 = connection() + request(c2) + + # Reusing the first connection should still work. + request(c1) + + # Creating a new connection should still work. + c3 = connection() + request(c3) + + # Allow a tick. + time.sleep(0.2) + + # That's three connections, we should expect the one used less recently + # to be expired. + with pytest.raises(disconnect_errors): + request(c2) + + # But the oldest created one should still be valid. + # (As well as the newest one). + request(c1) + request(c3) + + # Wait for some of our timeout. + time.sleep(1.0) + + # Refresh the third connection. + request(c3) + + # Wait for the remainder of our timeout, plus one tick. + time.sleep(1.2) + + # First connection should now be expired. + with pytest.raises(disconnect_errors): + request(c1) + + # But the third one should still be valid. + request(c3) + + test_client.server_instance.timeout = timeout + + @pytest.mark.parametrize( 'timeout_before_headers', ( diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index 2b555fb463..b9182575e7 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -112,10 +112,14 @@ def run(self): is_stats_enabled = self.server.stats['Enabled'] if is_stats_enabled: self.start_time = time.time() + keep_conn_open = False try: - conn.communicate() + keep_conn_open = conn.communicate() finally: - conn.close() + if keep_conn_open: + self.server.connections.put(conn) + else: + conn.close() if is_stats_enabled: self.requests_seen += self.conn.requests_seen self.bytes_read += self.conn.rfile.bytes_read From 01458b71008e41567343aef843b9cb1f49bee8b7 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Tue, 7 May 2019 18:08:31 +0100 Subject: [PATCH 03/17] Makefile objects now have a has_data method - to indicate if there is buffered data. --- cheroot/makefile.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cheroot/makefile.py b/cheroot/makefile.py index 91483eca7c..4ed0f02b32 100644 --- a/cheroot/makefile.py +++ b/cheroot/makefile.py @@ -235,6 +235,7 @@ def readline(self, size=-1): break buf.write(data) return buf.getvalue() + else: # Read until size bytes or \n or EOF seen, whichever comes # first @@ -279,6 +280,10 @@ def readline(self, size=-1): buf_len += n # assert buf_len == buf.tell() return buf.getvalue() + + def has_data(self): + return bool(self._rbuf.getvalue()) + else: def read(self, size=-1): """Read data from the socket to buffer.""" @@ -395,6 +400,9 @@ def readline(self, size=-1): buf_len += n return ''.join(buffers) + def has_data(self): + return bool(self._rbuf) + if not six.PY2: class StreamReader(io.BufferedReader): @@ -411,6 +419,9 @@ def read(self, *args, **kwargs): self.bytes_read += len(val) return val + def has_data(self): + return bool(self.peek(1)) + class StreamWriter(BufferedWriter): """Socket stream writer.""" From daa73bcf2e8fae6d3164047bf1500425b4489a47 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Tue, 7 May 2019 18:11:26 +0100 Subject: [PATCH 04/17] Detect connections which have buffered data for the next request in ConnectionManager. --- cheroot/connections.py | 58 +++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 713cf2383e..75ca6ffdca 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -53,7 +53,7 @@ def __init__(self, server): self.connections = OrderedDict() def put(self, conn): - self.connections[conn] = time.time() + self.connections[conn] = time.time(), conn.rfile.has_data() def expire(self): if not self.connections: @@ -67,7 +67,7 @@ def expire(self): # Check for old connections. now = time.time() - for (conn, ctime) in conns: + for (conn, (ctime, _)) in conns: # Oldest connection out of the currently available ones. if (ctime + self.server.timeout) < now: self.close(conn) @@ -75,17 +75,51 @@ def expire(self): break def get_conn(self, server_socket): - socket_dict = {s.socket.fileno(): s for s in self.connections} - socket_dict[server_socket.fileno()] = server_socket - rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) - if not rlist: - return None - conn = socket_dict[rlist[0]] - if conn is server_socket: - return self._from_server_socket(server_socket) + + # Grab file descriptors from sockets, but stop if we find a + # connection which is already marked as ready. + socket_dict = {} + for conn, (tstamp, has_data) in self.connections.items(): + if has_data: + break + socket_dict[conn.socket.fileno()] = conn else: - del self.connections[conn] - return conn + # No ready connection. + conn = None + + # Will require a select call. + if not conn: + ss_fileno = server_socket.fileno() + socket_dict[ss_fileno] = server_socket + rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) + + # No available socket. + if not rlist: + return None + + try: + # See if we have a new connection coming in. + rlist.remove(ss_fileno) + except ValueError: + # No new connection, but reuse existing socket. + conn = socket_dict[rlist.pop()] + else: + conn = server_socket + + # All remaining connections in rlist should be added + # to the ready queue. + if rlist: + socket_dict.update({ + fno: (socket_dict[fno], True) + for fno in rlist + }) + + # New connection. + if conn is server_socket: + return self._from_server_socket(server_socket) + + del self.connections[conn] + return conn def close(self, conn): del self.connections[conn] From e7fc671c20cc5bc654fdce0e37883f922eaf490c Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Tue, 7 May 2019 18:59:17 +0100 Subject: [PATCH 05/17] Make ConnectionManager.close a private method, and make PEP8 changes. --- cheroot/connections.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 75ca6ffdca..1935c3f45a 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -61,16 +61,17 @@ def expire(self): # Check for too many open connections. conns = list(self.connections.items()) - if self.server.keep_alive_conn_limit is not None: - [self.close(conn[0]) for conn in conns[:-self.server.keep_alive_conn_limit]] - conns = conns[-self.server.keep_alive_conn_limit:] + ka_limit = self.server.keep_alive_conn_limit + if ka_limit is not None: + [self._close(conn[0]) for conn in conns[:-ka_limit]] + conns = conns[-ka_limit:] # Check for old connections. now = time.time() for (conn, (ctime, _)) in conns: # Oldest connection out of the currently available ones. if (ctime + self.server.timeout) < now: - self.close(conn) + self._close(conn) else: break @@ -121,7 +122,7 @@ def get_conn(self, server_socket): del self.connections[conn] return conn - def close(self, conn): + def _close(self, conn): del self.connections[conn] conn.close() From 84f6584d9da711ad29dc89ba454818a63cf50a5b Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Tue, 7 May 2019 18:59:35 +0100 Subject: [PATCH 06/17] Add docstrings to ConnectionManager class. --- cheroot/connections.py | 40 +++++++++++++++++++++++++++++++++++++++ cheroot/makefile.py | 3 +++ cheroot/server.py | 5 ++--- cheroot/test/test_conn.py | 9 ++++++--- 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 1935c3f45a..e1d62fbfb8 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -1,3 +1,5 @@ +"""Utilities to manage open connections.""" + from __future__ import absolute_import, division, print_function __metaclass__ = type @@ -47,15 +49,38 @@ def prevent_socket_inheritance(sock): class ConnectionManager: + """Class which manages HTTPConnection objects. + + This is for connections which are being kept-alive for follow-up requests. + """ def __init__(self, server): + """Initialize ConnectionManager object. + + Args: + server (cheroot.server.HTTPServer): web server object + that uses this ConnectionManager instance. + """ self.server = server self.connections = OrderedDict() def put(self, conn): + """Put idle connection into the ConnectionManager to be managed. + + Args: + conn (cheroot.server.HTTPConnection): HTTP connection + to be managed. + """ self.connections[conn] = time.time(), conn.rfile.has_data() def expire(self): + """Expire least recently used connections. + + This happens if there are either too many open connections, or if the + connections have been timed out. + + This should be called periodically. + """ if not self.connections: return @@ -76,7 +101,22 @@ def expire(self): break def get_conn(self, server_socket): + """Return a HTTPConnection object which is ready to be handled. + + A connection returned by this method should be ready for a worker + to handle it. If there are no connections ready, None will be + returned. + + Any connection returned by this method will need to be `put` + back if it should be examined again for another request. + + Args: + server_socket: ServerSocket instance to listen to for new + connections. + Returns: + cheroot.server.HTTPConnection instance, or None. + """ # Grab file descriptors from sockets, but stop if we find a # connection which is already marked as ready. socket_dict = {} diff --git a/cheroot/makefile.py b/cheroot/makefile.py index 4ed0f02b32..928a2f9bee 100644 --- a/cheroot/makefile.py +++ b/cheroot/makefile.py @@ -282,6 +282,7 @@ def readline(self, size=-1): return buf.getvalue() def has_data(self): + """Return true if there is buffered data to read.""" return bool(self._rbuf.getvalue()) else: @@ -401,6 +402,7 @@ def readline(self, size=-1): return ''.join(buffers) def has_data(self): + """Return true if there is buffered data to read.""" return bool(self._rbuf) @@ -420,6 +422,7 @@ def read(self, *args, **kwargs): return val def has_data(self): + """Return true if there is buffered data to read.""" return bool(self.peek(1)) class StreamWriter(BufferedWriter): diff --git a/cheroot/server.py b/cheroot/server.py index a62c227380..143d02e70f 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1257,12 +1257,11 @@ def __init__(self, server, sock, makefile=MakeFile): def communicate(self): """Read each request and respond appropriately. - Returns true if the connection should be kept open.""" + Returns true if the connection should be kept open. + """ request_seen = False try: req = self.RequestHandlerClass(self.server, self) - - # This order of operations should guarantee correct pipelining. req.parse_request() if self.server.stats['Enabled']: self.requests_seen += 1 diff --git a/cheroot/test/test_conn.py b/cheroot/test/test_conn.py index 7c612661aa..b26ffc96df 100644 --- a/cheroot/test/test_conn.py +++ b/cheroot/test/test_conn.py @@ -392,7 +392,6 @@ def test_keepalive(test_client, http_server_protocol): def test_keepalive_conn_management(test_client): """Test management of Keep-Alive connections.""" - test_client.server_instance.timeout = 2 def connection(): @@ -413,7 +412,11 @@ def request(conn): assert actual_resp_body == pov.encode() assert header_has_value('Connection', 'Keep-Alive', actual_headers) - disconnect_errors = (http_client.BadStatusLine, http_client.NotConnected) + disconnect_errors = ( + http_client.BadStatusLine, + http_client.CannotSendRequest, + http_client.NotConnected, + ) # Make a new connection. c1 = connection() @@ -612,7 +615,7 @@ def test_HTTP11_pipelining(test_client): response = conn.response_class(conn.sock, method='GET') # there is a bug in python3 regarding the buffering of # ``conn.sock``. Until that bug get's fixed we will - # monkey patch the ``reponse`` instance. + # monkey patch the ``response`` instance. # https://bugs.python.org/issue23377 if not six.PY2: response.fp = conn.sock.makefile('rb', 0) From 53d32fb0479775f611b814151dc88490dfce02ab Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Wed, 8 May 2019 00:13:24 +0100 Subject: [PATCH 07/17] Fix makefile.StreamReader.has_data. --- cheroot/makefile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cheroot/makefile.py b/cheroot/makefile.py index 928a2f9bee..8a86b3384f 100644 --- a/cheroot/makefile.py +++ b/cheroot/makefile.py @@ -423,7 +423,7 @@ def read(self, *args, **kwargs): def has_data(self): """Return true if there is buffered data to read.""" - return bool(self.peek(1)) + return len(self._read_buf) > self._read_pos class StreamWriter(BufferedWriter): """Socket stream writer.""" From e80001f12d89b480aa2911ee620a73d1c31b2859 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 14:11:42 +0100 Subject: [PATCH 08/17] ConnectionManager now sets closeable flag on HTTPConnection for worker threads to close. --- cheroot/connections.py | 35 ++++++++++++++++++----------------- cheroot/server.py | 3 +++ cheroot/workers/threadpool.py | 5 +++++ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index e1d62fbfb8..3b743b7134 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -84,21 +84,23 @@ def expire(self): if not self.connections: return - # Check for too many open connections. - conns = list(self.connections.items()) + # Look at the first connection - if it can be closed, then do + # that, and wait for get_conn to return it. + conn = next(iter(self.connections)) + if conn.closeable: + return + + # Too many connections? ka_limit = self.server.keep_alive_conn_limit - if ka_limit is not None: - [self._close(conn[0]) for conn in conns[:-ka_limit]] - conns = conns[-ka_limit:] - - # Check for old connections. - now = time.time() - for (conn, (ctime, _)) in conns: - # Oldest connection out of the currently available ones. - if (ctime + self.server.timeout) < now: - self._close(conn) - else: - break + if ka_limit is not None and len(self.connections) > ka_limit: + conn.closeable = True + return + + # Connection too old? + ctime, _ = self.connections[conn] + if (ctime + self.server.timeout) < time.time(): + conn.closeable = True + return def get_conn(self, server_socket): """Return a HTTPConnection object which is ready to be handled. @@ -121,7 +123,7 @@ def get_conn(self, server_socket): # connection which is already marked as ready. socket_dict = {} for conn, (tstamp, has_data) in self.connections.items(): - if has_data: + if conn.closeable or has_data: break socket_dict[conn.socket.fileno()] = conn else: @@ -162,9 +164,8 @@ def get_conn(self, server_socket): del self.connections[conn] return conn - def _close(self, conn): del self.connections[conn] - conn.close() + return conn def _from_server_socket(self, server_socket): try: diff --git a/cheroot/server.py b/cheroot/server.py index 143d02e70f..5e8f40bec3 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1227,6 +1227,9 @@ class HTTPConnection: peercreds_enabled = False peercreds_resolve_enabled = False + # Fields set by ConnectionManager. + closeable = False + def __init__(self, server, sock, makefile=MakeFile): """Initialize HTTPConnection instance. diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index b9182575e7..a4b0863444 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -108,6 +108,11 @@ def run(self): if conn is _SHUTDOWNREQUEST: return + # Just close the connection and move on. + if conn.closeable: + conn.close() + continue + self.conn = conn is_stats_enabled = self.server.stats['Enabled'] if is_stats_enabled: From cdfa8cd4209eb3d8988805ef55be13315eb0324a Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 14:13:48 +0100 Subject: [PATCH 09/17] Less nesting of code in ConnectionManager.get_conn. --- cheroot/connections.py | 63 +++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 3b743b7134..2086ba31e5 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -130,39 +130,40 @@ def get_conn(self, server_socket): # No ready connection. conn = None + # We have a connection ready for use. + if conn: + del self.connections[conn] + return conn + # Will require a select call. - if not conn: - ss_fileno = server_socket.fileno() - socket_dict[ss_fileno] = server_socket - rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) - - # No available socket. - if not rlist: - return None - - try: - # See if we have a new connection coming in. - rlist.remove(ss_fileno) - except ValueError: - # No new connection, but reuse existing socket. - conn = socket_dict[rlist.pop()] - else: - conn = server_socket - - # All remaining connections in rlist should be added - # to the ready queue. - if rlist: - socket_dict.update({ - fno: (socket_dict[fno], True) - for fno in rlist - }) - - # New connection. - if conn is server_socket: - return self._from_server_socket(server_socket) + ss_fileno = server_socket.fileno() + socket_dict[ss_fileno] = server_socket + rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) - del self.connections[conn] - return conn + # No available socket. + if not rlist: + return None + + try: + # See if we have a new connection coming in. + rlist.remove(ss_fileno) + except ValueError: + # No new connection, but reuse existing socket. + conn = socket_dict[rlist.pop()] + else: + conn = server_socket + + # All remaining connections in rlist should be added + # to the ready queue. + if rlist: + socket_dict.update({ + fno: (socket_dict[fno], True) + for fno in rlist + }) + + # New connection. + if conn is server_socket: + return self._from_server_socket(server_socket) del self.connections[conn] return conn From 2febf2d15f9b29dc83a3099ff33e4f5d3dd7c7c8 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 14:22:55 +0100 Subject: [PATCH 10/17] Try to detect invalid file descriptors in ConnectionManager, and clean them up. --- cheroot/connections.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 2086ba31e5..56c83e1b74 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -4,6 +4,7 @@ __metaclass__ = type import io +import os import select import six import socket @@ -138,10 +139,29 @@ def get_conn(self, server_socket): # Will require a select call. ss_fileno = server_socket.fileno() socket_dict[ss_fileno] = server_socket - rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) - - # No available socket. - if not rlist: + try: + rlist, _, _ = select.select(list(socket_dict), [], [], 0.1) + # No available socket. + if not rlist: + return None + except OSError: + # Mark any connection which no longer appears valid. + for fno, conn in list(socket_dict.items()): + # If the server socket is invalid, we'll just ignore it and + # wait to be shutdown. + if fno == ss_fileno: + continue + try: + os.fstat(fno) + except OSError: + # Socket is invalid, close the connection. + conn.closeable = True + else: + # Connection is fine, move to the end so that + # closeable connections appear at the front. + self.connections[fno] = self.connections.pop(fno) + + # Wait for the next tick to occur. return None try: From d78b4041d1ff04b1b86aac134820d7fedda52df3 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 14:32:02 +0100 Subject: [PATCH 11/17] ConnectionManager now records the time a connection was last used as a property on HTTPConnection. --- cheroot/connections.py | 8 ++++---- cheroot/server.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 56c83e1b74..904559b747 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -72,7 +72,8 @@ def put(self, conn): conn (cheroot.server.HTTPConnection): HTTP connection to be managed. """ - self.connections[conn] = time.time(), conn.rfile.has_data() + conn.last_used = time.time() + self.connections[conn] = conn.rfile.has_data() def expire(self): """Expire least recently used connections. @@ -98,8 +99,7 @@ def expire(self): return # Connection too old? - ctime, _ = self.connections[conn] - if (ctime + self.server.timeout) < time.time(): + if (conn.last_used + self.server.timeout) < time.time(): conn.closeable = True return @@ -123,7 +123,7 @@ def get_conn(self, server_socket): # Grab file descriptors from sockets, but stop if we find a # connection which is already marked as ready. socket_dict = {} - for conn, (tstamp, has_data) in self.connections.items(): + for conn, has_data in self.connections.items(): if conn.closeable or has_data: break socket_dict[conn.socket.fileno()] = conn diff --git a/cheroot/server.py b/cheroot/server.py index 5e8f40bec3..a1218c0826 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1229,6 +1229,7 @@ class HTTPConnection: # Fields set by ConnectionManager. closeable = False + last_used = None def __init__(self, server, sock, makefile=MakeFile): """Initialize HTTPConnection instance. From 08de8648ec5b1460200ec07c8ae84f273a0b5e61 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 17:38:19 +0100 Subject: [PATCH 12/17] ConnectionManager now sets a flag on HTTPConnection indicating if there is data ready to read. --- cheroot/connections.py | 19 ++++++++----------- cheroot/server.py | 1 + 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 904559b747..0abe9562c9 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -63,7 +63,7 @@ def __init__(self, server): that uses this ConnectionManager instance. """ self.server = server - self.connections = OrderedDict() + self.connections = OrderedDict() # using this as a set def put(self, conn): """Put idle connection into the ConnectionManager to be managed. @@ -73,7 +73,8 @@ def put(self, conn): to be managed. """ conn.last_used = time.time() - self.connections[conn] = conn.rfile.has_data() + conn.ready_with_data = conn.rfile.has_data() + self.connections[conn] = None def expire(self): """Expire least recently used connections. @@ -123,8 +124,8 @@ def get_conn(self, server_socket): # Grab file descriptors from sockets, but stop if we find a # connection which is already marked as ready. socket_dict = {} - for conn, has_data in self.connections.items(): - if conn.closeable or has_data: + for conn in self.connections: + if conn.closeable or conn.ready_with_data: break socket_dict[conn.socket.fileno()] = conn else: @@ -173,13 +174,9 @@ def get_conn(self, server_socket): else: conn = server_socket - # All remaining connections in rlist should be added - # to the ready queue. - if rlist: - socket_dict.update({ - fno: (socket_dict[fno], True) - for fno in rlist - }) + # All remaining connections in rlist should be marked as ready. + for fno in rlist: + socket_dict[fno].ready_with_data = True # New connection. if conn is server_socket: diff --git a/cheroot/server.py b/cheroot/server.py index a1218c0826..572611d95b 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1230,6 +1230,7 @@ class HTTPConnection: # Fields set by ConnectionManager. closeable = False last_used = None + ready_with_data = False def __init__(self, server, sock, makefile=MakeFile): """Initialize HTTPConnection instance. From 4103fd247344e554bfb455bd0803f720a7980843 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Fri, 10 May 2019 17:42:14 +0100 Subject: [PATCH 13/17] Minor style tweaks after code review. --- cheroot/connections.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 0abe9562c9..cdcf48fdde 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -6,15 +6,14 @@ import io import os import select -import six import socket import time +from collections import OrderedDict from . import errors from .makefile import MakeFile -from collections import OrderedDict - +import six try: import fcntl @@ -115,7 +114,7 @@ def get_conn(self, server_socket): back if it should be examined again for another request. Args: - server_socket: ServerSocket instance to listen to for new + server_socket (socket.socket): Socket to listen to for new connections. Returns: cheroot.server.HTTPConnection instance, or None. From e43c25d1b52432d60400c2110f8dcad639a1b36f Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Thu, 23 May 2019 13:25:06 +0100 Subject: [PATCH 14/17] Use a list internally instead of OrderedDict in ConnectionManager, as it is thread-safe. --- cheroot/connections.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index cdcf48fdde..a4638fc8fd 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -8,7 +8,6 @@ import select import socket import time -from collections import OrderedDict from . import errors from .makefile import MakeFile @@ -62,7 +61,7 @@ def __init__(self, server): that uses this ConnectionManager instance. """ self.server = server - self.connections = OrderedDict() # using this as a set + self.connections = [] def put(self, conn): """Put idle connection into the ConnectionManager to be managed. @@ -73,7 +72,7 @@ def put(self, conn): """ conn.last_used = time.time() conn.ready_with_data = conn.rfile.has_data() - self.connections[conn] = None + self.connections.append(conn) def expire(self): """Expire least recently used connections. @@ -88,7 +87,7 @@ def expire(self): # Look at the first connection - if it can be closed, then do # that, and wait for get_conn to return it. - conn = next(iter(self.connections)) + conn = self.connections[0] if conn.closeable: return @@ -133,7 +132,7 @@ def get_conn(self, server_socket): # We have a connection ready for use. if conn: - del self.connections[conn] + self.connections.remove(conn) return conn # Will require a select call. @@ -154,12 +153,11 @@ def get_conn(self, server_socket): try: os.fstat(fno) except OSError: - # Socket is invalid, close the connection. + # Socket is invalid, close the connection, insert at + # the front. + self.connections.remove(conn) + self.connections.insert(0, conn) conn.closeable = True - else: - # Connection is fine, move to the end so that - # closeable connections appear at the front. - self.connections[fno] = self.connections.pop(fno) # Wait for the next tick to occur. return None @@ -181,7 +179,7 @@ def get_conn(self, server_socket): if conn is server_socket: return self._from_server_socket(server_socket) - del self.connections[conn] + self.connections.remove(conn) return conn def _from_server_socket(self, server_socket): From 91cbc296e7fb1b7946c50277b8d6c2e71a6a10a8 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Thu, 23 May 2019 13:26:07 +0100 Subject: [PATCH 15/17] When the server shuts down, get ConnectionManager to close all of its connections too. --- cheroot/connections.py | 6 ++++++ cheroot/server.py | 1 + 2 files changed, 7 insertions(+) diff --git a/cheroot/connections.py b/cheroot/connections.py index a4638fc8fd..77af343224 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -268,3 +268,9 @@ def _from_server_socket(self, server_socket): # See https://github.com/cherrypy/cherrypy/issues/686. return raise + + def close(self): + """Close all monitored connections.""" + for conn in self.connections[:]: + conn.close() + self.connections = [] diff --git a/cheroot/server.py b/cheroot/server.py index 572611d95b..9f95f976b9 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -2059,6 +2059,7 @@ def stop(self): sock.close() self.socket = None + self.connections.close() self.requests.stop(self.shutdown_timeout) From 65d81255408735c4d45995b3640a5d7e5ccd3c6e Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Wed, 2 Oct 2019 23:26:18 +0100 Subject: [PATCH 16/17] Port "Fix UNIX abstract sockets support" change lost when rebasing. --- cheroot/connections.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 77af343224..943ac65ad2 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -226,7 +226,10 @@ def _from_server_socket(self, server_socket): conn = self.server.ConnectionClass(self.server, s, mf) - if not isinstance(self.server.bind_addr, six.string_types): + if not isinstance( + self.server.bind_addr, + (six.text_type, six.binary_type), + ): # optional values # Until we do DNS lookups, omit REMOTE_HOST if addr is None: # sometimes this can happen From 17b4ebb68265edc3f60df34ee3c98c9544fee572 Mon Sep 17 00:00:00 2001 From: Allan Crooks Date: Thu, 3 Oct 2019 13:12:11 +0100 Subject: [PATCH 17/17] Return False from HTTPConnection.communicate. --- cheroot/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cheroot/server.py b/cheroot/server.py index 9f95f976b9..e354a4d406 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1274,7 +1274,7 @@ def communicate(self): # Something went wrong in the parsing (and the server has # probably already made a simple_response). Return and # let the conn close. - return + return False request_seen = True req.respond() @@ -1308,6 +1308,7 @@ def communicate(self): repr(ex), level=logging.ERROR, traceback=True, ) self._conditional_error(req, '500 Internal Server Error') + return False linger = False