Skip to content

Commit

Permalink
PR fixes/updates
Browse files Browse the repository at this point in the history
  • Loading branch information
csm10495 committed Dec 17, 2024
1 parent 5d58e50 commit 0db381b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
14 changes: 7 additions & 7 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,15 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

.. method:: terminate_workers(signal=signal.SIGTERM)

Attempt to terminate all living worker processes immediately by sending each
of them the given signal. If the signal is not specified, the default signal
:data:`signal.SIGTERM` is used.
Attempt to terminate all living worker processes immediately by sending
each of them the given signal. If the signal is not specified, the default
signal :data:`signal.SIGTERM` is used.

After calling this, the caller should no longer submit tasks to the executor.
It is also recommended to still call :meth:`Executor.shutdown` to ensure that all
other resources associated with the executor are freed.
After calling this method the caller should no longer submit tasks to the
executor. It is also recommended to still call :meth:`Executor.shutdown`
to ensure that all other resources associated with the executor are freed.

.. versionadded:: 3.14
.. versionadded:: next

.. _processpoolexecutor-example:

Expand Down
1 change: 1 addition & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ concurrent.futures
incompatible *fork* start method you must explicitly request it by
supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`.
(Contributed by Gregory P. Smith in :gh:`84559`.)

* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)
Expand Down
52 changes: 31 additions & 21 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing
import os
import queue
import signal
import sys
import threading
import time
Expand Down Expand Up @@ -239,47 +240,56 @@ def test_process_pool_executor_terminate_workers(self):

executor.terminate_workers()

try:
q.get(timeout=1)
raise RuntimeError("Queue should not have gotten a second value")
except queue.Empty:
pass
self.assertRaises(queue.Empty, q.get, timeout=1)


def test_process_pool_executor_terminate_workers_dead_workers(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
executor.submit(os._exit, 1).result()
except BrokenProcessPool:
# BrokenProcessPool will be raised by our call to .result() since the worker will die
pass
future = executor.submit(os._exit, 1)
self.assertRaises(BrokenProcessPool, future.result)

# The worker has been killed already, terminate_workers should basically no-op
executor.terminate_workers()
# Patching in here instead of at the function level since we only want
# to patch it for this function call, not other parts of the flow.
with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill:
executor.terminate_workers()

def test_process_pool_executor_terminate_workers_not_started_yet(self):
mock_kill.assert_not_called()

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
# The worker has not been started yet, terminate_workers should basically no-op
executor.terminate_workers()

mock_kill.assert_not_called()

def test_process_pool_executor_terminate_workers_stops_pool(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()

executor.terminate_workers()

try:
executor.submit(time.sleep, 0).result()
raise RuntimeError("Should have raised BrokenProcessPool")
except BrokenProcessPool:
pass
future = executor.submit(time.sleep, 0)
self.assertRaises(BrokenProcessPool, future.result)

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()
future = executor.submit(time.sleep, 0)
future.result()

executor.terminate_workers(signal.SIGKILL)

worker_process = list(executor._processes.values())[0]
mock_kill.assert_called_once_with(worker_process.pid, signal.SIGKILL)

def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

executor.terminate_workers(9)
mock_kill.assert_called_once_with(list(executor._processes.values())[0].pid, 9)
# 'potatoes' isn't a valid signal, so os.kill will raise a TypeError
self.assertRaises(TypeError, executor.terminate_workers, 'potatoes')


create_executor_tests(globals(), ProcessPoolExecutorTest,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
Add a ``terminate_workers`` method to ``ProcessPoolExecutor`` to allow a way to attempt to force kill the worker processes.
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)

0 comments on commit 0db381b

Please sign in to comment.