Skip to content

Commit

Permalink
Merge pull request #516 from pR0Ps/bugfix/workerthread-name
Browse files Browse the repository at this point in the history
Before this patch, the `ThreadPool.start` and `ThreadPool.grow` functions both added
threads to the pool, but both implemented their own versions that were similar,
but not exactly the same.

This change refactors the `ThreadPool.start()` function to use `ThreadPool.grow()`
to create the initial workers, reducing code duplication. It also adds some error
checking that was not previously there, and unit tests to exercise them.
  • Loading branch information
webknjaz authored Mar 17, 2023
2 parents 7aa5759 + 2f16a25 commit 0dcf6a1
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ per-file-ignores =
cheroot/test/test_core.py: C815, DAR101, DAR201, DAR401, I003, I004, N805, N806, S101, WPS110, WPS111, WPS114, WPS121, WPS202, WPS204, WPS226, WPS229, WPS302, WPS306, WPS317, WPS323, WPS324, WPS326, WPS421, WPS422, WPS432, WPS437, WPS442
cheroot/test/test_dispatch.py: DAR101, DAR201, S101, WPS111, WPS121, WPS302, WPS422, WPS430
cheroot/test/test_ssl.py: C818, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, S101, S309, S404, S603, WPS100, WPS110, WPS111, WPS114, WPS121, WPS130, WPS201, WPS202, WPS204, WPS210, WPS211, WPS218, WPS219, WPS222, WPS226, WPS231, WPS300, WPS301, WPS317, WPS318, WPS324, WPS326, WPS335, WPS336, WPS337, WPS352, WPS408, WPS420, WPS421, WPS422, WPS432, WPS436, WPS440, WPS441, WPS442, WPS450, WPS509, WPS510, WPS608
cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608
cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS226, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
cheroot/test/webtest.py: B007, DAR101, DAR201, DAR401, I001, I003, I004, N802, RST303, RST304, S101, S104, WPS100, WPS110, WPS111, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS220, WPS221, WPS223, WPS229, WPS230, WPS231, WPS236, WPS301, WPS306, WPS317, WPS323, WPS326, WPS338, WPS361, WPS414, WPS420, WPS421, WPS422, WPS430, WPS432, WPS433, WPS437, WPS440, WPS501, WPS503, WPS505, WPS601
cheroot/testing.py: B014, C815, DAR101, DAR201, DAR301, I001, I003, S104, WPS100, WPS211, WPS229, WPS301, WPS306, WPS317, WPS414, WPS420, WPS422, WPS430, WPS503, WPS526
Expand Down
89 changes: 89 additions & 0 deletions cheroot/test/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket
import tempfile
import threading
import types
import uuid
import urllib.parse # noqa: WPS301

Expand All @@ -17,6 +18,7 @@
from .._compat import bton, ntob
from .._compat import IS_LINUX, IS_MACOS, IS_WINDOWS, SYS_PLATFORM
from ..server import IS_UID_GID_RESOLVABLE, Gateway, HTTPServer
from ..workers.threadpool import ThreadPool
from ..testing import (
ANY_INTERFACE_IPV4,
ANY_INTERFACE_IPV6,
Expand Down Expand Up @@ -440,3 +442,90 @@ def many_open_sockets(request, resource_limit):
# Close our open resources
for test_socket in test_sockets:
test_socket.close()


@pytest.mark.parametrize(
('minthreads', 'maxthreads', 'inited_maxthreads'),
(
(
# NOTE: The docstring only mentions -1 to mean "no max", but other
# NOTE: negative numbers should also work.
1,
-2,
float('inf'),
),
(1, -1, float('inf')),
(1, 1, 1),
(1, 2, 2),
(1, float('inf'), float('inf')),
(2, -2, float('inf')),
(2, -1, float('inf')),
(2, 2, 2),
(2, float('inf'), float('inf')),
),
)
def test_threadpool_threadrange_set(minthreads, maxthreads, inited_maxthreads):
"""Test setting the number of threads in a ThreadPool.
The ThreadPool should properly set the min+max number of the threads to use
in the pool if those limits are valid.
"""
tp = ThreadPool(
server=None,
min=minthreads,
max=maxthreads,
)
assert tp.min == minthreads
assert tp.max == inited_maxthreads


@pytest.mark.parametrize(
('minthreads', 'maxthreads', 'error'),
(
(-1, -1, 'min=-1 must be > 0'),
(-1, 0, 'min=-1 must be > 0'),
(-1, 1, 'min=-1 must be > 0'),
(-1, 2, 'min=-1 must be > 0'),
(0, -1, 'min=0 must be > 0'),
(0, 0, 'min=0 must be > 0'),
(0, 1, 'min=0 must be > 0'),
(0, 2, 'min=0 must be > 0'),
(1, 0, 'Expected an integer or the infinity value for the `max` argument but got 0.'),
(1, 0.5, 'Expected an integer or the infinity value for the `max` argument but got 0.5.'),
(2, 0, 'Expected an integer or the infinity value for the `max` argument but got 0.'),
(2, '1', "Expected an integer or the infinity value for the `max` argument but got '1'."),
(2, 1, 'max=1 must be > min=2'),
),
)
def test_threadpool_invalid_threadrange(minthreads, maxthreads, error):
"""Test that a ThreadPool rejects invalid min/max values.
The ThreadPool should raise an error with the proper message when
initialized with an invalid min+max number of threads.
"""
with pytest.raises((ValueError, TypeError), match=error):
ThreadPool(
server=None,
min=minthreads,
max=maxthreads,
)


def test_threadpool_multistart_validation(monkeypatch):
"""Test for ThreadPool multi-start behavior.
Tests that when calling start() on a ThreadPool multiple times raises a
:exc:`RuntimeError`
"""
# replace _spawn_worker with a function that returns a placeholder to avoid
# actually starting any threads
monkeypatch.setattr(
ThreadPool,
'_spawn_worker',
lambda _: types.SimpleNamespace(ready=True),
)

tp = ThreadPool(server=None)
tp.start()
with pytest.raises(RuntimeError, match='Threadpools can only be started once.'):
tp.start()
54 changes: 33 additions & 21 deletions cheroot/workers/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,33 @@ def __init__(
server (cheroot.server.HTTPServer): web server object
receiving this request
min (int): minimum number of worker threads
max (int): maximum number of worker threads
max (int): maximum number of worker threads (-1/inf for no max)
accepted_queue_size (int): maximum number of active
requests in queue
accepted_queue_timeout (int): timeout for putting request
into queue
:raises ValueError: if the min/max values are invalid
:raises TypeError: if the max is not an integer or inf
"""
if min < 1:
raise ValueError(f'min={min!s} must be > 0')

if max == float('inf'):
pass
elif not isinstance(max, int) or max == 0:
raise TypeError(
'Expected an integer or the infinity value for the `max` '
f'argument but got {max!r}.',
)
elif max < 0:
max = float('inf')

if max < min:
raise ValueError(
f'max={max!s} must be > min={min!s} (or infinity for no max)',
)

self.server = server
self.min = min
self.max = max
Expand All @@ -167,18 +188,13 @@ def __init__(
self._pending_shutdowns = collections.deque()

def start(self):
"""Start the pool of threads."""
for _ in range(self.min):
self._threads.append(WorkerThread(self.server))
for worker in self._threads:
worker.name = (
'CP Server {worker_name!s}'.
format(worker_name=worker.name)
)
worker.start()
for worker in self._threads:
while not worker.ready:
time.sleep(.1)
"""Start the pool of threads.
:raises RuntimeError: if the pool is already started
"""
if self._threads:
raise RuntimeError('Threadpools can only be started once.')
self.grow(self.min)

@property
def idle(self): # noqa: D401; irrelevant for properties
Expand Down Expand Up @@ -206,17 +222,13 @@ def _clear_dead_threads(self):

def grow(self, amount):
"""Spawn new worker threads (not above self.max)."""
if self.max > 0:
budget = max(self.max - len(self._threads), 0)
else:
# self.max <= 0 indicates no maximum
budget = float('inf')

budget = max(self.max - len(self._threads), 0)
n_new = min(amount, budget)

workers = [self._spawn_worker() for i in range(n_new)]
while not all(worker.ready for worker in workers):
time.sleep(.1)
for worker in workers:
while not worker.ready:
time.sleep(.1)
self._threads.extend(workers)

def _spawn_worker(self):
Expand Down

0 comments on commit 0dcf6a1

Please sign in to comment.