diff --git a/CHANGES.rst b/CHANGES.rst index 63b47b8ca5..a7de7a59e5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,10 @@ +v6.7.0 +====== + +- :issue:`91` via :pr:`176`: Server now processes sockets in a + non-blocking manner, improving support for keep-alive connections + on HTTP 1.1 especially when operating under load. + v6.5.5 ====== diff --git a/cheroot/errors.py b/cheroot/errors.py index 809287319d..e0cc8c495b 100644 --- a/cheroot/errors.py +++ b/cheroot/errors.py @@ -22,6 +22,10 @@ class FatalSSLAlert(Exception): """Exception raised when the SSL implementation signals a fatal alert.""" +class CloseConnection(Exception): + """Exception raised when client requested the connection to be closed.""" + + def plat_specific_errors(*errnames): """Return error numbers for all errors in errnames on this platform. diff --git a/cheroot/server.py b/cheroot/server.py index 08590b109a..65e9365646 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1264,15 +1264,13 @@ def communicate(self): if self.server.stats['Enabled']: self.requests_seen += 1 if not req.ready: - # Something went wrong in the parsing (and the server has - # probably already made a simple_response). Return and - # let the conn close. + # Allow conn to stay open to support non-blocking sockets return request_seen = True req.respond() if req.close_connection: - return + raise errors.CloseConnection() except socket.error as ex: errnum = ex.args[0] # sadly SSL sockets return a different (longer) time out string @@ -1290,6 +1288,7 @@ def communicate(self): level=logging.WARNING, traceback=True, ) self._conditional_error(req, '500 Internal Server Error') + raise except (KeyboardInterrupt, SystemExit): raise except errors.FatalSSLAlert: @@ -1301,6 +1300,7 @@ def communicate(self): repr(ex), level=logging.ERROR, traceback=True, ) self._conditional_error(req, '500 Internal Server Error') + raise linger = False diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index 494f76f7ca..90ee498d3c 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -7,6 +7,8 @@ import threading import time import socket +import select +import functools from six.moves import queue @@ -94,6 +96,65 @@ def __init__(self, server): } threading.Thread.__init__(self) + def if_stats(func): + """Decorate the function to only invoke if stats are enabled.""" + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + return self.server.stats['Enabled'] and func(self, *args, **kwargs) + return wrapper + + @if_stats + def log_start_stats(self): + """Record the start time.""" + self.start_time = time.time() + + @if_stats + def log_close_stats(self): + """On close, record the stats.""" + self.requests_seen += self.conn.requests_seen + self.bytes_read += self.conn.rfile.bytes_read + self.bytes_written += self.conn.wfile.bytes_written + self.work_time += time.time() - self.start_time + self.start_time = None + + def conn_expired(self, last_active, cur_time): + """Return True if the connection has expired.""" + srv_timeout = self.server.timeout + return cur_time - last_active > srv_timeout + + def get_expired_conns(self, conn_socks, cur_time): + """Generate all expired connections.""" + for conn, last_active in tuple(conn_socks.values()): + if self.conn_expired(last_active, cur_time): + yield conn + + def close_conns(self, conn_list, conn_socks): + """Close all connections and associated sockets.""" + for conn in conn_list: + conn.communicate() # allow for 408 to be sent + conn.close() + conn_socks.pop(conn.socket) + + def process_conns(self, conn_socks): + """Process connections.""" + rlist = [] + socks = [sck for sck in conn_socks.keys() if sck.fileno() > -1] + if socks: + rlist = select.select(socks, [], [], 0)[0] + for sock in rlist: + conn, conn_start_time = conn_socks[sock] + self.conn = conn + self.log_start_stats() + try: + conn.communicate() + except Exception: + conn.close() + conn_socks.pop(conn.socket) + else: + conn_socks[conn.socket] = (conn, time.time()) + self.log_close_stats() + self.conn = None + def run(self): """Process incoming HTTP connections. @@ -102,25 +163,19 @@ def run(self): self.server.stats['Worker Threads'][self.getName()] = self.stats try: self.ready = True + conn_socks = {} while True: - conn = self.server.requests.get() - if conn is _SHUTDOWNREQUEST: - return - - self.conn = conn - if self.server.stats['Enabled']: - self.start_time = time.time() try: - conn.communicate() - finally: - conn.close() - if self.server.stats['Enabled']: - self.requests_seen += self.conn.requests_seen - self.bytes_read += self.conn.rfile.bytes_read - self.bytes_written += self.conn.wfile.bytes_written - self.work_time += time.time() - self.start_time - self.start_time = None - self.conn = None + conn = self.server.requests.get(block=True, timeout=0.01) + except queue.Empty: + pass + else: + if conn is _SHUTDOWNREQUEST: + return + conn_socks[conn.socket] = (conn, time.time()) + self.process_conns(conn_socks) + expired_conns = self.get_expired_conns(conn_socks, time.time()) + self.close_conns(expired_conns, conn_socks) except (KeyboardInterrupt, SystemExit) as ex: self.server.interrupt = ex