Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request based workers #199

Merged
merged 17 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6f9a8f9
Move code to create connection objects from sockets into separate mod…
the-allanc May 4, 2019
3df76a7
WorkerThreads now deal with a single request from a connection.
the-allanc May 6, 2019
01458b7
Makefile objects now have a has_data method - to indicate if there is…
the-allanc May 7, 2019
daa73bc
Detect connections which have buffered data for the next request in C…
the-allanc May 7, 2019
e7fc671
Make ConnectionManager.close a private method, and make PEP8 changes.
the-allanc May 7, 2019
84f6584
Add docstrings to ConnectionManager class.
the-allanc May 7, 2019
53d32fb
Fix makefile.StreamReader.has_data.
the-allanc May 7, 2019
e80001f
ConnectionManager now sets closeable flag on HTTPConnection for worke…
the-allanc May 10, 2019
cdfa8cd
Less nesting of code in ConnectionManager.get_conn.
the-allanc May 10, 2019
2febf2d
Try to detect invalid file descriptors in ConnectionManager, and clea…
the-allanc May 10, 2019
d78b404
ConnectionManager now records the time a connection was last used as …
the-allanc May 10, 2019
08de864
ConnectionManager now sets a flag on HTTPConnection indicating if the…
the-allanc May 10, 2019
4103fd2
Minor style tweaks after code review.
the-allanc May 10, 2019
e43c25d
Use a list internally instead of OrderedDict in ConnectionManager, as…
the-allanc May 23, 2019
91cbc29
When the server shuts down, get ConnectionManager to close all of its…
the-allanc May 23, 2019
65d8125
Port "Fix UNIX abstract sockets support" change lost when rebasing.
the-allanc Oct 2, 2019
17b4ebb
Return False from HTTPConnection.communicate.
the-allanc Oct 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions cheroot/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
"""Utilities to manage open connections."""

from __future__ import absolute_import, division, print_function
__metaclass__ = type

import io
import os
import select
import socket
import time

from . import errors
from .makefile import MakeFile

import six

try:
the-allanc marked this conversation as resolved.
Show resolved Hide resolved
import fcntl
except ImportError:
try:
from ctypes import windll, WinError
import ctypes.wintypes
_SetHandleInformation = windll.kernel32.SetHandleInformation
_SetHandleInformation.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
]
_SetHandleInformation.restype = ctypes.wintypes.BOOL
except ImportError:
def prevent_socket_inheritance(sock):
"""Stub inheritance prevention.

Dummy function, since neither fcntl nor ctypes are available.
"""
pass
else:
def prevent_socket_inheritance(sock):
"""Mark the given socket fd as non-inheritable (Windows)."""
if not _SetHandleInformation(sock.fileno(), 1, 0):
raise WinError()
else:
def prevent_socket_inheritance(sock):
"""Mark the given socket fd as non-inheritable (POSIX)."""
fd = sock.fileno()
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)


class ConnectionManager:
"""Class which manages HTTPConnection objects.

This is for connections which are being kept-alive for follow-up requests.
"""

def __init__(self, server):
"""Initialize ConnectionManager object.

Args:
server (cheroot.server.HTTPServer): web server object
that uses this ConnectionManager instance.
"""
self.server = server
self.connections = []

def put(self, conn):
"""Put idle connection into the ConnectionManager to be managed.

Args:
conn (cheroot.server.HTTPConnection): HTTP connection
to be managed.
"""
conn.last_used = time.time()
conn.ready_with_data = conn.rfile.has_data()
self.connections.append(conn)

def expire(self):
"""Expire least recently used connections.

This happens if there are either too many open connections, or if the
connections have been timed out.

This should be called periodically.
"""
if not self.connections:
return

# Look at the first connection - if it can be closed, then do
# that, and wait for get_conn to return it.
conn = self.connections[0]
if conn.closeable:
return

# Too many connections?
ka_limit = self.server.keep_alive_conn_limit
if ka_limit is not None and len(self.connections) > ka_limit:
conn.closeable = True
return

# Connection too old?
if (conn.last_used + self.server.timeout) < time.time():
conn.closeable = True
return

def get_conn(self, server_socket):
"""Return a HTTPConnection object which is ready to be handled.

A connection returned by this method should be ready for a worker
to handle it. If there are no connections ready, None will be
returned.

Any connection returned by this method will need to be `put`
back if it should be examined again for another request.

Args:
server_socket (socket.socket): Socket to listen to for new
connections.
Returns:
cheroot.server.HTTPConnection instance, or None.

jaraco marked this conversation as resolved.
Show resolved Hide resolved
"""
# Grab file descriptors from sockets, but stop if we find a
# connection which is already marked as ready.
socket_dict = {}
for conn in self.connections:
if conn.closeable or conn.ready_with_data:
break
socket_dict[conn.socket.fileno()] = conn
else:
# No ready connection.
conn = None

# We have a connection ready for use.
if conn:
self.connections.remove(conn)
jaraco marked this conversation as resolved.
Show resolved Hide resolved
return conn

# Will require a select call.
ss_fileno = server_socket.fileno()
socket_dict[ss_fileno] = server_socket
try:
rlist, _, _ = select.select(list(socket_dict), [], [], 0.1)
the-allanc marked this conversation as resolved.
Show resolved Hide resolved
# No available socket.
if not rlist:
return None
except OSError:
# Mark any connection which no longer appears valid.
for fno, conn in list(socket_dict.items()):
# If the server socket is invalid, we'll just ignore it and
# wait to be shutdown.
if fno == ss_fileno:
continue
try:
os.fstat(fno)
except OSError:
# Socket is invalid, close the connection, insert at
# the front.
self.connections.remove(conn)
self.connections.insert(0, conn)
jaraco marked this conversation as resolved.
Show resolved Hide resolved
conn.closeable = True

# Wait for the next tick to occur.
return None

try:
# See if we have a new connection coming in.
rlist.remove(ss_fileno)
except ValueError:
# No new connection, but reuse existing socket.
conn = socket_dict[rlist.pop()]
else:
conn = server_socket

# All remaining connections in rlist should be marked as ready.
for fno in rlist:
socket_dict[fno].ready_with_data = True

# New connection.
if conn is server_socket:
return self._from_server_socket(server_socket)

self.connections.remove(conn)
return conn

def _from_server_socket(self, server_socket):
try:
s, addr = server_socket.accept()
if self.server.stats['Enabled']:
self.server.stats['Accepts'] += 1
prevent_socket_inheritance(s)
if hasattr(s, 'settimeout'):
s.settimeout(self.server.timeout)

mf = MakeFile
ssl_env = {}
# if ssl cert and key are set, we try to be a secure HTTP server
if self.server.ssl_adapter is not None:
try:
s, ssl_env = self.server.ssl_adapter.wrap(s)
except errors.NoSSLError:
msg = (
'The client sent a plain HTTP request, but '
'this server only speaks HTTPS on this port.'
)
buf = [
'%s 400 Bad Request\r\n' % self.server.protocol,
'Content-Length: %s\r\n' % len(msg),
'Content-Type: text/plain\r\n\r\n',
msg,
]

sock_to_make = s if not six.PY2 else s._sock
wfile = mf(sock_to_make, 'wb', io.DEFAULT_BUFFER_SIZE)
try:
wfile.write(''.join(buf).encode('ISO-8859-1'))
except socket.error as ex:
if ex.args[0] not in errors.socket_errors_to_ignore:
raise
return
if not s:
return
mf = self.server.ssl_adapter.makefile
# Re-apply our timeout since we may have a new socket object
if hasattr(s, 'settimeout'):
s.settimeout(self.server.timeout)

conn = self.server.ConnectionClass(self.server, s, mf)

if not isinstance(
self.server.bind_addr,
(six.text_type, six.binary_type),
):
# optional values
# Until we do DNS lookups, omit REMOTE_HOST
if addr is None: # sometimes this can happen
# figure out if AF_INET or AF_INET6.
if len(s.getsockname()) == 2:
the-allanc marked this conversation as resolved.
Show resolved Hide resolved
# AF_INET
addr = ('0.0.0.0', 0)
else:
# AF_INET6
addr = ('::', 0)
conn.remote_addr = addr[0]
conn.remote_port = addr[1]

conn.ssl_env = ssl_env
return conn

except socket.timeout:
# The only reason for the timeout in start() is so we can
# notice keyboard interrupts on Win32, which don't interrupt
# accept() by default
return
except socket.error as ex:
if self.server.stats['Enabled']:
self.server.stats['Socket Errors'] += 1
if ex.args[0] in errors.socket_error_eintr:
# I *think* this is right. EINTR should occur when a signal
# is received during the accept() call; all docs say retry
# the call, and I *think* I'm reading it right that Python
# will then go ahead and poll for and handle the signal
# elsewhere. See
# https://github.com/cherrypy/cherrypy/issues/707.
return
if ex.args[0] in errors.socket_errors_nonblocking:
# Just try again. See
# https://github.com/cherrypy/cherrypy/issues/479.
return
if ex.args[0] in errors.socket_errors_to_ignore:
# Our socket was closed.
# See https://github.com/cherrypy/cherrypy/issues/686.
return
raise

def close(self):
"""Close all monitored connections."""
for conn in self.connections[:]:
conn.close()
self.connections = []
14 changes: 14 additions & 0 deletions cheroot/makefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def readline(self, size=-1):
break
buf.write(data)
return buf.getvalue()

jaraco marked this conversation as resolved.
Show resolved Hide resolved
else:
# Read until size bytes or \n or EOF seen, whichever comes
# first
Expand Down Expand Up @@ -279,6 +280,11 @@ def readline(self, size=-1):
buf_len += n
# assert buf_len == buf.tell()
return buf.getvalue()

def has_data(self):
"""Return true if there is buffered data to read."""
return bool(self._rbuf.getvalue())

else:
def read(self, size=-1):
"""Read data from the socket to buffer."""
Expand Down Expand Up @@ -395,6 +401,10 @@ def readline(self, size=-1):
buf_len += n
return ''.join(buffers)

def has_data(self):
"""Return true if there is buffered data to read."""
return bool(self._rbuf)


if not six.PY2:
class StreamReader(io.BufferedReader):
Expand All @@ -411,6 +421,10 @@ def read(self, *args, **kwargs):
self.bytes_read += len(val)
return val

def has_data(self):
"""Return true if there is buffered data to read."""
return len(self._read_buf) > self._read_pos

class StreamWriter(BufferedWriter):
"""Socket stream writer."""

Expand Down
Loading