From 733dac01b0dc3047efc9027dba177d7116e47c50 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Fri, 21 Jun 2024 10:23:10 +0530 Subject: [PATCH] GH-120804: Remove SafeChildWatcher, FastChildWatcher and MultiLoopChildWatcher from asyncio (#120805) Remove SafeChildWatcher, FastChildWatcher and MultiLoopChildWatcher from asyncio. These child watchers have been deprecated since Python 3.12. The tests are also removed and some more tests will be added after the rewrite of child watchers. --- Lib/asyncio/unix_events.py | 325 +---------- Lib/test/test_asyncio/test_events.py | 9 +- Lib/test/test_asyncio/test_streams.py | 46 -- Lib/test/test_asyncio/test_subprocess.py | 29 - Lib/test/test_asyncio/test_unix_events.py | 636 ---------------------- 5 files changed, 4 insertions(+), 1041 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 41ccf1b78fb93b..9a2e300259ee8c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -28,9 +28,9 @@ __all__ = ( 'SelectorEventLoop', - 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', 'PidfdChildWatcher', - 'MultiLoopChildWatcher', 'ThreadedChildWatcher', + 'AbstractChildWatcher', + 'PidfdChildWatcher', + 'ThreadedChildWatcher', 'DefaultEventLoopPolicy', 'EventLoop', ) @@ -1062,325 +1062,6 @@ def _sig_chld(self): }) -class SafeChildWatcher(BaseChildWatcher): - """'Safe' child watcher implementation. - - This implementation avoids disrupting other code spawning processes by - polling explicitly each process in the SIGCHLD handler instead of calling - os.waitpid(-1). - - This is a safe solution but it has a significant overhead when handling a - big number of children (O(n) each time SIGCHLD is raised) - """ - - def __init__(self): - super().__init__() - warnings._deprecated("SafeChildWatcher", - "{name!r} is deprecated as of Python 3.12 and will be " - "removed in Python {remove}.", - remove=(3, 14)) - - def close(self): - self._callbacks.clear() - super().close() - - def __enter__(self): - return self - - def __exit__(self, a, b, c): - pass - - def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = (callback, args) - - # Prevent a race condition in case the child is already terminated. - self._do_waitpid(pid) - - def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: - return False - - def _do_waitpid_all(self): - - for pid in list(self._callbacks): - self._do_waitpid(pid) - - def _do_waitpid(self, expected_pid): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, os.WNOHANG) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - else: - if pid == 0: - # The child process is still alive. - return - - returncode = waitstatus_to_exitcode(status) - if self._loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - - try: - callback, args = self._callbacks.pop(pid) - except KeyError: # pragma: no cover - # May happen if .remove_child_handler() is called - # after os.waitpid() returns. - if self._loop.get_debug(): - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) - else: - callback(pid, returncode, *args) - - -class FastChildWatcher(BaseChildWatcher): - """'Fast' child watcher implementation. - - This implementation reaps every terminated processes by calling - os.waitpid(-1) directly, possibly breaking other code spawning processes - and waiting for their termination. - - There is no noticeable overhead when handling a big number of children - (O(1) each time a child terminates). - """ - def __init__(self): - super().__init__() - self._lock = threading.Lock() - self._zombies = {} - self._forks = 0 - warnings._deprecated("FastChildWatcher", - "{name!r} is deprecated as of Python 3.12 and will be " - "removed in Python {remove}.", - remove=(3, 14)) - - def close(self): - self._callbacks.clear() - self._zombies.clear() - super().close() - - def __enter__(self): - with self._lock: - self._forks += 1 - - return self - - def __exit__(self, a, b, c): - with self._lock: - self._forks -= 1 - - if self._forks or not self._zombies: - return - - collateral_victims = str(self._zombies) - self._zombies.clear() - - logger.warning( - "Caught subprocesses termination from unknown pids: %s", - collateral_victims) - - def add_child_handler(self, pid, callback, *args): - assert self._forks, "Must use the context manager" - - with self._lock: - try: - returncode = self._zombies.pop(pid) - except KeyError: - # The child is running. - self._callbacks[pid] = callback, args - return - - # The child is dead already. We can fire the callback. - callback(pid, returncode, *args) - - def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: - return False - - def _do_waitpid_all(self): - # Because of signal coalescing, we must keep calling waitpid() as - # long as we're able to reap a child. - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except ChildProcessError: - # No more child processes exist. - return - else: - if pid == 0: - # A child process is still alive. - return - - returncode = waitstatus_to_exitcode(status) - - with self._lock: - try: - callback, args = self._callbacks.pop(pid) - except KeyError: - # unknown child - if self._forks: - # It may not be registered yet. - self._zombies[pid] = returncode - if self._loop.get_debug(): - logger.debug('unknown process %s exited ' - 'with returncode %s', - pid, returncode) - continue - callback = None - else: - if self._loop.get_debug(): - logger.debug('process %s exited with returncode %s', - pid, returncode) - - if callback is None: - logger.warning( - "Caught subprocess termination from unknown pid: " - "%d -> %d", pid, returncode) - else: - callback(pid, returncode, *args) - - -class MultiLoopChildWatcher(AbstractChildWatcher): - """A watcher that doesn't require running loop in the main thread. - - This implementation registers a SIGCHLD signal handler on - instantiation (which may conflict with other code that - install own handler for this signal). - - The solution is safe but it has a significant overhead when - handling a big number of processes (*O(n)* each time a - SIGCHLD is received). - """ - - # Implementation note: - # The class keeps compatibility with AbstractChildWatcher ABC - # To achieve this it has empty attach_loop() method - # and doesn't accept explicit loop argument - # for add_child_handler()/remove_child_handler() - # but retrieves the current loop by get_running_loop() - - def __init__(self): - self._callbacks = {} - self._saved_sighandler = None - warnings._deprecated("MultiLoopChildWatcher", - "{name!r} is deprecated as of Python 3.12 and will be " - "removed in Python {remove}.", - remove=(3, 14)) - - def is_active(self): - return self._saved_sighandler is not None - - def close(self): - self._callbacks.clear() - if self._saved_sighandler is None: - return - - handler = signal.getsignal(signal.SIGCHLD) - if handler != self._sig_chld: - logger.warning("SIGCHLD handler was changed by outside code") - else: - signal.signal(signal.SIGCHLD, self._saved_sighandler) - self._saved_sighandler = None - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def add_child_handler(self, pid, callback, *args): - loop = events.get_running_loop() - self._callbacks[pid] = (loop, callback, args) - - # Prevent a race condition in case the child is already terminated. - self._do_waitpid(pid) - - def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: - return False - - def attach_loop(self, loop): - # Don't save the loop but initialize itself if called first time - # The reason to do it here is that attach_loop() is called from - # unix policy only for the main thread. - # Main thread is required for subscription on SIGCHLD signal - if self._saved_sighandler is not None: - return - - self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) - if self._saved_sighandler is None: - logger.warning("Previous SIGCHLD handler was set by non-Python code, " - "restore to default handler on watcher close.") - self._saved_sighandler = signal.SIG_DFL - - # Set SA_RESTART to limit EINTR occurrences. - signal.siginterrupt(signal.SIGCHLD, False) - - def _do_waitpid_all(self): - for pid in list(self._callbacks): - self._do_waitpid(pid) - - def _do_waitpid(self, expected_pid): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, os.WNOHANG) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - debug_log = False - else: - if pid == 0: - # The child process is still alive. - return - - returncode = waitstatus_to_exitcode(status) - debug_log = True - try: - loop, callback, args = self._callbacks.pop(pid) - except KeyError: # pragma: no cover - # May happen if .remove_child_handler() is called - # after os.waitpid() returns. - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) - else: - if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) - else: - if debug_log and loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - def _sig_chld(self, signum, frame): - try: - self._do_waitpid_all() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) - - class ThreadedChildWatcher(AbstractChildWatcher): """Threaded child watcher implementation. diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 88c85a36b5d448..06eb4d3841a0d2 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2214,7 +2214,7 @@ def setUp(self): super().setUp() with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) - watcher = asyncio.SafeChildWatcher() + watcher = asyncio.ThreadedChildWatcher() watcher.attach_loop(self.loop) asyncio.set_child_watcher(watcher) @@ -2833,13 +2833,6 @@ def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - if sys.platform != 'win32': - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - watcher = asyncio.SafeChildWatcher() - watcher.attach_loop(self.loop) - asyncio.set_child_watcher(watcher) - def tearDown(self): try: if sys.platform != 'win32': diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index ae943f39869815..d32b7ff251885d 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -822,52 +822,6 @@ async def client(addr): self.assertEqual(msg1, b"hello world 1!\n") self.assertEqual(msg2, b"hello world 2!\n") - @unittest.skipIf(sys.platform == 'win32', "Don't have pipes") - @requires_subprocess() - def test_read_all_from_pipe_reader(self): - # See asyncio issue 168. This test is derived from the example - # subprocess_attach_read_pipe.py, but we configure the - # StreamReader's limit so that twice it is less than the size - # of the data writer. Also we must explicitly attach a child - # watcher to the event loop. - - code = """\ -import os, sys -fd = int(sys.argv[1]) -os.write(fd, b'data') -os.close(fd) -""" - rfd, wfd = os.pipe() - args = [sys.executable, '-c', code, str(wfd)] - - pipe = open(rfd, 'rb', 0) - reader = asyncio.StreamReader(loop=self.loop, limit=1) - protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) - transport, _ = self.loop.run_until_complete( - self.loop.connect_read_pipe(lambda: protocol, pipe)) - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - watcher = asyncio.SafeChildWatcher() - watcher.attach_loop(self.loop) - try: - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - asyncio.set_child_watcher(watcher) - create = asyncio.create_subprocess_exec( - *args, - pass_fds={wfd}, - ) - proc = self.loop.run_until_complete(create) - self.loop.run_until_complete(proc.wait()) - finally: - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - asyncio.set_child_watcher(None) - - os.close(wfd) - data = self.loop.run_until_complete(reader.read(-1)) - self.assertEqual(data, b'data') - def test_streamreader_constructor_without_loop(self): with self.assertRaisesRegex(RuntimeError, 'no current event loop'): asyncio.StreamReader() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index cf1a1985338e40..27ae766a19413b 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -631,15 +631,6 @@ async def kill_running(): # the transport was not notified yet self.assertFalse(killed) - # Unlike SafeChildWatcher, FastChildWatcher does not pop the - # callbacks if waitpid() is called elsewhere. Let's clear them - # manually to avoid a warning when the watcher is detached. - if (sys.platform != 'win32' and - isinstance(self, SubprocessFastWatcherTests)): - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - asyncio.get_child_watcher()._callbacks.clear() - async def _test_popen_error(self, stdin): if sys.platform == 'win32': target = 'asyncio.windows_utils.Popen' @@ -908,26 +899,6 @@ class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, def _get_watcher(self): return unix_events.ThreadedChildWatcher() - class SubprocessSafeWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - def _get_watcher(self): - with self.assertWarns(DeprecationWarning): - return unix_events.SafeChildWatcher() - - class MultiLoopChildWatcherTests(test_utils.TestCase): - - def test_warns(self): - with self.assertWarns(DeprecationWarning): - unix_events.MultiLoopChildWatcher() - - class SubprocessFastWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - def _get_watcher(self): - with self.assertWarns(DeprecationWarning): - return unix_events.FastChildWatcher() - @unittest.skipUnless( unix_events.can_use_pidfd(), "operating system does not support pidfds", diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 9452213c685851..42fb54a4c3a7a5 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1138,578 +1138,6 @@ def test_not_implemented(self): NotImplementedError, watcher.__exit__, f, f, f) -class BaseChildWatcherTests(unittest.TestCase): - - def test_not_implemented(self): - f = mock.Mock() - watcher = unix_events.BaseChildWatcher() - self.assertRaises( - NotImplementedError, watcher._do_waitpid, f) - - -class ChildWatcherTestsMixin: - - ignore_warnings = mock.patch.object(log.logger, "warning") - - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() - self.running = False - self.zombies = {} - - with mock.patch.object( - self.loop, "add_signal_handler") as self.m_add_signal_handler: - self.watcher = self.create_watcher() - self.watcher.attach_loop(self.loop) - - def waitpid(self, pid, flags): - if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1: - self.assertGreater(pid, 0) - try: - if pid < 0: - return self.zombies.popitem() - else: - return pid, self.zombies.pop(pid) - except KeyError: - pass - if self.running: - return 0, 0 - else: - raise ChildProcessError() - - def add_zombie(self, pid, status): - self.zombies[pid] = status - - def waitstatus_to_exitcode(self, status): - if status > 32768: - return status - 32768 - elif 32700 < status < 32768: - return status - 32768 - else: - return status - - def test_create_watcher(self): - self.m_add_signal_handler.assert_called_once_with( - signal.SIGCHLD, self.watcher._sig_chld) - - def waitpid_mocks(func): - def wrapped_func(self): - def patch(target, wrapper): - return mock.patch(target, wraps=wrapper, - new_callable=mock.Mock) - - with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \ - patch('os.waitpid', self.waitpid) as m_waitpid: - func(self, m_waitpid) - return wrapped_func - - @waitpid_mocks - def test_sigchld(self, m_waitpid): - # register a child - callback = mock.Mock() - - with self.watcher: - self.running = True - self.watcher.add_child_handler(42, callback, 9, 10, 14) - - self.assertFalse(callback.called) - - # child is running - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - # child terminates (returncode 12) - self.running = False - self.add_zombie(42, EXITCODE(12)) - self.watcher._sig_chld() - - callback.assert_called_once_with(42, 12, 9, 10, 14) - - callback.reset_mock() - - # ensure that the child is effectively reaped - self.add_zombie(42, EXITCODE(13)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - # sigchld called again - self.zombies.clear() - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - @waitpid_mocks - def test_sigchld_two_children(self, m_waitpid): - callback1 = mock.Mock() - callback2 = mock.Mock() - - # register child 1 - with self.watcher: - self.running = True - self.watcher.add_child_handler(43, callback1, 7, 8) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # register child 2 - with self.watcher: - self.watcher.add_child_handler(44, callback2, 147, 18) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # children are running - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # child 1 terminates (signal 3) - self.add_zombie(43, SIGNAL(3)) - self.watcher._sig_chld() - - callback1.assert_called_once_with(43, -3, 7, 8) - self.assertFalse(callback2.called) - - callback1.reset_mock() - - # child 2 still running - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # child 2 terminates (code 108) - self.add_zombie(44, EXITCODE(108)) - self.running = False - self.watcher._sig_chld() - - callback2.assert_called_once_with(44, 108, 147, 18) - self.assertFalse(callback1.called) - - callback2.reset_mock() - - # ensure that the children are effectively reaped - self.add_zombie(43, EXITCODE(14)) - self.add_zombie(44, EXITCODE(15)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # sigchld called again - self.zombies.clear() - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - @waitpid_mocks - def test_sigchld_two_children_terminating_together(self, m_waitpid): - callback1 = mock.Mock() - callback2 = mock.Mock() - - # register child 1 - with self.watcher: - self.running = True - self.watcher.add_child_handler(45, callback1, 17, 8) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # register child 2 - with self.watcher: - self.watcher.add_child_handler(46, callback2, 1147, 18) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # children are running - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # child 1 terminates (code 78) - # child 2 terminates (signal 5) - self.add_zombie(45, EXITCODE(78)) - self.add_zombie(46, SIGNAL(5)) - self.running = False - self.watcher._sig_chld() - - callback1.assert_called_once_with(45, 78, 17, 8) - callback2.assert_called_once_with(46, -5, 1147, 18) - - callback1.reset_mock() - callback2.reset_mock() - - # ensure that the children are effectively reaped - self.add_zombie(45, EXITCODE(14)) - self.add_zombie(46, EXITCODE(15)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - @waitpid_mocks - def test_sigchld_race_condition(self, m_waitpid): - # register a child - callback = mock.Mock() - - with self.watcher: - # child terminates before being registered - self.add_zombie(50, EXITCODE(4)) - self.watcher._sig_chld() - - self.watcher.add_child_handler(50, callback, 1, 12) - - callback.assert_called_once_with(50, 4, 1, 12) - callback.reset_mock() - - # ensure that the child is effectively reaped - self.add_zombie(50, SIGNAL(1)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - @waitpid_mocks - def test_sigchld_replace_handler(self, m_waitpid): - callback1 = mock.Mock() - callback2 = mock.Mock() - - # register a child - with self.watcher: - self.running = True - self.watcher.add_child_handler(51, callback1, 19) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # register the same child again - with self.watcher: - self.watcher.add_child_handler(51, callback2, 21) - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - # child terminates (signal 8) - self.running = False - self.add_zombie(51, SIGNAL(8)) - self.watcher._sig_chld() - - callback2.assert_called_once_with(51, -8, 21) - self.assertFalse(callback1.called) - - callback2.reset_mock() - - # ensure that the child is effectively reaped - self.add_zombie(51, EXITCODE(13)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - - @waitpid_mocks - def test_sigchld_remove_handler(self, m_waitpid): - callback = mock.Mock() - - # register a child - with self.watcher: - self.running = True - self.watcher.add_child_handler(52, callback, 1984) - - self.assertFalse(callback.called) - - # unregister the child - self.watcher.remove_child_handler(52) - - self.assertFalse(callback.called) - - # child terminates (code 99) - self.running = False - self.add_zombie(52, EXITCODE(99)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - @waitpid_mocks - def test_sigchld_unknown_status(self, m_waitpid): - callback = mock.Mock() - - # register a child - with self.watcher: - self.running = True - self.watcher.add_child_handler(53, callback, -19) - - self.assertFalse(callback.called) - - # terminate with unknown status - self.zombies[53] = 1178 - self.running = False - self.watcher._sig_chld() - - callback.assert_called_once_with(53, 1178, -19) - - callback.reset_mock() - - # ensure that the child is effectively reaped - self.add_zombie(53, EXITCODE(101)) - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback.called) - - @waitpid_mocks - def test_remove_child_handler(self, m_waitpid): - callback1 = mock.Mock() - callback2 = mock.Mock() - callback3 = mock.Mock() - - # register children - with self.watcher: - self.running = True - self.watcher.add_child_handler(54, callback1, 1) - self.watcher.add_child_handler(55, callback2, 2) - self.watcher.add_child_handler(56, callback3, 3) - - # remove child handler 1 - self.assertTrue(self.watcher.remove_child_handler(54)) - - # remove child handler 2 multiple times - self.assertTrue(self.watcher.remove_child_handler(55)) - self.assertFalse(self.watcher.remove_child_handler(55)) - self.assertFalse(self.watcher.remove_child_handler(55)) - - # all children terminate - self.add_zombie(54, EXITCODE(0)) - self.add_zombie(55, EXITCODE(1)) - self.add_zombie(56, EXITCODE(2)) - self.running = False - with self.ignore_warnings: - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - callback3.assert_called_once_with(56, 2, 3) - - @waitpid_mocks - def test_sigchld_unhandled_exception(self, m_waitpid): - callback = mock.Mock() - - # register a child - with self.watcher: - self.running = True - self.watcher.add_child_handler(57, callback) - - # raise an exception - m_waitpid.side_effect = ValueError - - with mock.patch.object(log.logger, - 'error') as m_error: - - self.assertEqual(self.watcher._sig_chld(), None) - self.assertTrue(m_error.called) - - @waitpid_mocks - def test_sigchld_child_reaped_elsewhere(self, m_waitpid): - # register a child - callback = mock.Mock() - - with self.watcher: - self.running = True - self.watcher.add_child_handler(58, callback) - - self.assertFalse(callback.called) - - # child terminates - self.running = False - self.add_zombie(58, EXITCODE(4)) - - # waitpid is called elsewhere - os.waitpid(58, os.WNOHANG) - - m_waitpid.reset_mock() - - # sigchld - with self.ignore_warnings: - self.watcher._sig_chld() - - if isinstance(self.watcher, asyncio.FastChildWatcher): - # here the FastChildWatcher enters a deadlock - # (there is no way to prevent it) - self.assertFalse(callback.called) - else: - callback.assert_called_once_with(58, 255) - - @waitpid_mocks - def test_sigchld_unknown_pid_during_registration(self, m_waitpid): - # register two children - callback1 = mock.Mock() - callback2 = mock.Mock() - - with self.ignore_warnings, self.watcher: - self.running = True - # child 1 terminates - self.add_zombie(591, EXITCODE(7)) - # an unknown child terminates - self.add_zombie(593, EXITCODE(17)) - - self.watcher._sig_chld() - - self.watcher.add_child_handler(591, callback1) - self.watcher.add_child_handler(592, callback2) - - callback1.assert_called_once_with(591, 7) - self.assertFalse(callback2.called) - - @waitpid_mocks - def test_set_loop(self, m_waitpid): - # register a child - callback = mock.Mock() - - with self.watcher: - self.running = True - self.watcher.add_child_handler(60, callback) - - # attach a new loop - old_loop = self.loop - self.loop = self.new_test_loop() - patch = mock.patch.object - - with patch(old_loop, "remove_signal_handler") as m_old_remove, \ - patch(self.loop, "add_signal_handler") as m_new_add: - - self.watcher.attach_loop(self.loop) - - m_old_remove.assert_called_once_with( - signal.SIGCHLD) - m_new_add.assert_called_once_with( - signal.SIGCHLD, self.watcher._sig_chld) - - # child terminates - self.running = False - self.add_zombie(60, EXITCODE(9)) - self.watcher._sig_chld() - - callback.assert_called_once_with(60, 9) - - @waitpid_mocks - def test_set_loop_race_condition(self, m_waitpid): - # register 3 children - callback1 = mock.Mock() - callback2 = mock.Mock() - callback3 = mock.Mock() - - with self.watcher: - self.running = True - self.watcher.add_child_handler(61, callback1) - self.watcher.add_child_handler(62, callback2) - self.watcher.add_child_handler(622, callback3) - - # detach the loop - old_loop = self.loop - self.loop = None - - with mock.patch.object( - old_loop, "remove_signal_handler") as m_remove_signal_handler: - - with self.assertWarnsRegex( - RuntimeWarning, 'A loop is being detached'): - self.watcher.attach_loop(None) - - m_remove_signal_handler.assert_called_once_with( - signal.SIGCHLD) - - # child 1 & 2 terminate - self.add_zombie(61, EXITCODE(11)) - self.add_zombie(62, SIGNAL(5)) - - # SIGCHLD was not caught - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - self.assertFalse(callback3.called) - - # attach a new loop - self.loop = self.new_test_loop() - - with mock.patch.object( - self.loop, "add_signal_handler") as m_add_signal_handler: - - self.watcher.attach_loop(self.loop) - - m_add_signal_handler.assert_called_once_with( - signal.SIGCHLD, self.watcher._sig_chld) - callback1.assert_called_once_with(61, 11) # race condition! - callback2.assert_called_once_with(62, -5) # race condition! - self.assertFalse(callback3.called) - - callback1.reset_mock() - callback2.reset_mock() - - # child 3 terminates - self.running = False - self.add_zombie(622, EXITCODE(19)) - self.watcher._sig_chld() - - self.assertFalse(callback1.called) - self.assertFalse(callback2.called) - callback3.assert_called_once_with(622, 19) - - @waitpid_mocks - def test_close(self, m_waitpid): - # register two children - callback1 = mock.Mock() - - with self.watcher: - self.running = True - # child 1 terminates - self.add_zombie(63, EXITCODE(9)) - # other child terminates - self.add_zombie(65, EXITCODE(18)) - self.watcher._sig_chld() - - self.watcher.add_child_handler(63, callback1) - self.watcher.add_child_handler(64, callback1) - - self.assertEqual(len(self.watcher._callbacks), 1) - if isinstance(self.watcher, asyncio.FastChildWatcher): - self.assertEqual(len(self.watcher._zombies), 1) - - with mock.patch.object( - self.loop, - "remove_signal_handler") as m_remove_signal_handler: - - self.watcher.close() - - m_remove_signal_handler.assert_called_once_with( - signal.SIGCHLD) - self.assertFalse(self.watcher._callbacks) - if isinstance(self.watcher, asyncio.FastChildWatcher): - self.assertFalse(self.watcher._zombies) - - -class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): - def create_watcher(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - return asyncio.SafeChildWatcher() - - -class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): - def create_watcher(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - return asyncio.FastChildWatcher() - - class PolicyTests(unittest.TestCase): def create_policy(self): @@ -1739,70 +1167,6 @@ def test_get_default_child_watcher(self, m_can_use_pidfd): with self.assertWarns(DeprecationWarning): self.assertIs(watcher, policy.get_child_watcher()) - def test_get_child_watcher_after_set(self): - policy = self.create_policy() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - watcher = asyncio.FastChildWatcher() - policy.set_child_watcher(watcher) - - self.assertIs(policy._watcher, watcher) - with self.assertWarns(DeprecationWarning): - self.assertIs(watcher, policy.get_child_watcher()) - - def test_get_child_watcher_thread(self): - - def f(): - policy.set_event_loop(policy.new_event_loop()) - - self.assertIsInstance(policy.get_event_loop(), - asyncio.AbstractEventLoop) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - watcher = policy.get_child_watcher() - - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) - self.assertIsNone(watcher._loop) - - policy.get_event_loop().close() - - policy = self.create_policy() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - policy.set_child_watcher(asyncio.SafeChildWatcher()) - - th = threading.Thread(target=f) - th.start() - th.join() - - def test_child_watcher_replace_mainloop_existing(self): - policy = self.create_policy() - loop = policy.new_event_loop() - policy.set_event_loop(loop) - - # Explicitly setup SafeChildWatcher, - # default ThreadedChildWatcher has no _loop property - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - watcher = asyncio.SafeChildWatcher() - policy.set_child_watcher(watcher) - watcher.attach_loop(loop) - - self.assertIs(watcher._loop, loop) - - new_loop = policy.new_event_loop() - policy.set_event_loop(new_loop) - - self.assertIs(watcher._loop, new_loop) - - policy.set_event_loop(None) - - self.assertIs(watcher._loop, None) - - loop.close() - new_loop.close() - - class TestFunctional(unittest.TestCase): def setUp(self):