Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking socket support/better handling of HTTP/1.1 connections #176

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e99941d
enhance worker run loop
hexaclock Sep 28, 2018
7989913
non-blocking
hexaclock Sep 28, 2018
5537556
Update threadpool.py
hexaclock Sep 28, 2018
0b77338
just need rlist
hexaclock Sep 30, 2018
1a77b86
Merge pull request #1 from hexaclock/non-blocking
hexaclock Sep 30, 2018
47952c6
lint and refactor run method. possibly fix a test..
Oct 1, 2018
965be2e
fix perms
Oct 2, 2018
c3dd1c7
refactor process_conns and use socket.gettimeout
hexaclock Oct 7, 2018
e89db2c
remove unnecessary variable, better var naming
hexaclock Oct 7, 2018
9c347f3
refactor close_expired_conns
hexaclock Oct 7, 2018
c3bd269
attempt to lower cog complexity
hexaclock Oct 7, 2018
18c429c
oops
hexaclock Oct 7, 2018
161eac6
lines
hexaclock Oct 8, 2018
8aaa977
revert changes to issue template
hexaclock Oct 9, 2018
0a2eea4
return False only when a non-zero timeout is specified
hexaclock Oct 11, 2018
368b607
collapse if branches
hexaclock Oct 17, 2018
00654cc
test_HTTP11_Timeout passes now
Oct 25, 2018
1247199
lint
hexaclock Oct 25, 2018
1be1023
Restore inconsequential newline
jaraco Feb 4, 2019
a686495
Merge branch 'master' into hexaclock-master
jaraco Feb 4, 2019
2014555
Merge branch 'master' into feature/91-non-blocking-sockets
jaraco Feb 4, 2019
1dd9fd2
Rely on Exceptions to trap exceptional conditions in WorkerThread.pro…
jaraco Feb 4, 2019
d83398e
Remove unused parameter
jaraco Feb 4, 2019
14bcf2a
Extract 'if stats enabled' logic into a decorator
jaraco Feb 4, 2019
d1fc2a5
Ran pre-commit
jaraco Feb 14, 2019
05f446a
Satisfy the linter's need for docstrings
jaraco Feb 14, 2019
e558e30
Merge branch 'master' into feature/91-non-blocking-sockets
jaraco Apr 30, 2019
9413ed9
Update changelog.
jaraco Apr 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
v6.7.0
======

- :issue:`91` via :pr:`176`: Server now processes sockets in a
non-blocking manner, improving support for keep-alive connections
on HTTP 1.1 especially when operating under load.

v6.5.5
======

Expand Down
4 changes: 4 additions & 0 deletions cheroot/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class FatalSSLAlert(Exception):
"""Exception raised when the SSL implementation signals a fatal alert."""


class CloseConnection(Exception):
"""Exception raised when client requested the connection to be closed."""


def plat_specific_errors(*errnames):
"""Return error numbers for all errors in errnames on this platform.

Expand Down
8 changes: 4 additions & 4 deletions cheroot/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,15 +1264,13 @@ def communicate(self):
if self.server.stats['Enabled']:
self.requests_seen += 1
if not req.ready:
# Something went wrong in the parsing (and the server has
# probably already made a simple_response). Return and
# let the conn close.
# Allow conn to stay open to support non-blocking sockets
return

request_seen = True
req.respond()
if req.close_connection:
return
raise errors.CloseConnection()
except socket.error as ex:
errnum = ex.args[0]
# sadly SSL sockets return a different (longer) time out string
Expand All @@ -1290,6 +1288,7 @@ def communicate(self):
level=logging.WARNING, traceback=True,
)
self._conditional_error(req, '500 Internal Server Error')
raise
except (KeyboardInterrupt, SystemExit):
raise
except errors.FatalSSLAlert:
Expand All @@ -1301,6 +1300,7 @@ def communicate(self):
repr(ex), level=logging.ERROR, traceback=True,
)
self._conditional_error(req, '500 Internal Server Error')
raise

linger = False

Expand Down
89 changes: 72 additions & 17 deletions cheroot/workers/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import threading
import time
import socket
import select
import functools

from six.moves import queue

Expand Down Expand Up @@ -94,6 +96,65 @@ def __init__(self, server):
}
threading.Thread.__init__(self)

def if_stats(func):
"""Decorate the function to only invoke if stats are enabled."""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
return self.server.stats['Enabled'] and func(self, *args, **kwargs)
return wrapper

@if_stats
def log_start_stats(self):
"""Record the start time."""
self.start_time = time.time()

@if_stats
def log_close_stats(self):
"""On close, record the stats."""
self.requests_seen += self.conn.requests_seen
self.bytes_read += self.conn.rfile.bytes_read
self.bytes_written += self.conn.wfile.bytes_written
self.work_time += time.time() - self.start_time
self.start_time = None

def conn_expired(self, last_active, cur_time):
"""Return True if the connection has expired."""
srv_timeout = self.server.timeout
return cur_time - last_active > srv_timeout

def get_expired_conns(self, conn_socks, cur_time):
"""Generate all expired connections."""
for conn, last_active in tuple(conn_socks.values()):
if self.conn_expired(last_active, cur_time):
yield conn

def close_conns(self, conn_list, conn_socks):
"""Close all connections and associated sockets."""
for conn in conn_list:
conn.communicate() # allow for 408 to be sent
conn.close()
conn_socks.pop(conn.socket)

def process_conns(self, conn_socks):
"""Process connections."""
rlist = []
socks = [sck for sck in conn_socks.keys() if sck.fileno() > -1]
if socks:
rlist = select.select(socks, [], [], 0)[0]
for sock in rlist:
conn, conn_start_time = conn_socks[sock]
self.conn = conn
self.log_start_stats()
try:
conn.communicate()
except Exception:
conn.close()
conn_socks.pop(conn.socket)
else:
conn_socks[conn.socket] = (conn, time.time())
self.log_close_stats()
self.conn = None

def run(self):
"""Process incoming HTTP connections.

Expand All @@ -102,25 +163,19 @@ def run(self):
self.server.stats['Worker Threads'][self.getName()] = self.stats
try:
self.ready = True
conn_socks = {}
while True:
conn = self.server.requests.get()
if conn is _SHUTDOWNREQUEST:
return

self.conn = conn
if self.server.stats['Enabled']:
self.start_time = time.time()
try:
conn.communicate()
finally:
conn.close()
if self.server.stats['Enabled']:
self.requests_seen += self.conn.requests_seen
self.bytes_read += self.conn.rfile.bytes_read
self.bytes_written += self.conn.wfile.bytes_written
self.work_time += time.time() - self.start_time
self.start_time = None
self.conn = None
conn = self.server.requests.get(block=True, timeout=0.01)
except queue.Empty:
pass
else:
if conn is _SHUTDOWNREQUEST:
return
conn_socks[conn.socket] = (conn, time.time())
self.process_conns(conn_socks)
expired_conns = self.get_expired_conns(conn_socks, time.time())
self.close_conns(expired_conns, conn_socks)
except (KeyboardInterrupt, SystemExit) as ex:
self.server.interrupt = ex

Expand Down