From 2fdea0b7b554d572015233696d62c4d3108a7a28 Mon Sep 17 00:00:00 2001 From: AlexWaygood Date: Fri, 1 Dec 2023 18:03:23 +0000 Subject: [PATCH] libregrtest: Don't store `workers` as an attribute on `RunWorkers` instances --- Lib/test/libregrtest/run_workers.py | 45 ++++++++++++++++------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index 18a0342f0611cf..11b79c4bd29b9f 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -470,7 +470,6 @@ def __init__(self, num_workers: int, runtests: RunTests, self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60) else: self.worker_timeout = None - self.workers: list[WorkerThread] | None = None jobs = self.runtests.get_jobs() if jobs is not None: @@ -478,15 +477,15 @@ def __init__(self, num_workers: int, runtests: RunTests, # these worker threads would never get anything to do. self.num_workers = min(self.num_workers, jobs) - def start_workers(self) -> None: - self.workers = [WorkerThread(index, self) - for index in range(1, self.num_workers + 1)] + def start_workers(self) -> list[WorkerThread]: + workers = [WorkerThread(index, self) + for index in range(1, self.num_workers + 1)] jobs = self.runtests.get_jobs() if jobs is not None: tests = count(jobs, 'test') else: tests = 'tests' - nworkers = len(self.workers) + nworkers = len(workers) processes = plural(nworkers, "process", "processes") msg = (f"Run {tests} in parallel using " f"{nworkers} worker {processes}") @@ -495,23 +494,25 @@ def start_workers(self) -> None: % (format_duration(self.timeout), format_duration(self.worker_timeout))) self.log(msg) - for worker in self.workers: + for worker in workers: worker.start() + return workers - def stop_workers(self) -> None: + @staticmethod + def stop_workers(workers: list[WorkerThread]) -> None: start_time = time.monotonic() - for worker in self.workers: + for worker in workers: worker.stop() - for worker in self.workers: + for worker in workers: worker.wait_stopped(start_time) - def _get_result(self) -> QueueOutput | None: + def _get_result(self, workers: list[WorkerThread]) -> QueueOutput | None: pgo = self.runtests.pgo use_faulthandler = (self.timeout is not None) # bpo-46205: check the status of workers every iteration to avoid # waiting forever on an empty queue. - while any(worker.is_alive() for worker in self.workers): + while any(worker.is_alive() for worker in workers): if use_faulthandler: faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, exit=True) @@ -524,7 +525,7 @@ def _get_result(self) -> QueueOutput | None: if not pgo: # display progress - running = get_running(self.workers) + running = get_running(workers) if running: self.log(running) @@ -534,7 +535,9 @@ def _get_result(self) -> QueueOutput | None: except queue.Empty: return None - def display_result(self, mp_result: MultiprocessResult) -> None: + def display_result( + self, mp_result: MultiprocessResult, *, workers: list[WorkerThread] + ) -> None: result = mp_result.result pgo = self.runtests.pgo @@ -545,12 +548,14 @@ def display_result(self, mp_result: MultiprocessResult) -> None: elif (result.duration >= PROGRESS_MIN_TIME and not pgo): text += ' (%s)' % format_duration(result.duration) if not pgo: - running = get_running(self.workers) + running = get_running(workers) if running: text += f' -- {running}' self.display_progress(self.test_index, text) - def _process_result(self, item: QueueOutput) -> TestResult: + def _process_result( + self, item: QueueOutput, *, workers: list[WorkerThread] + ) -> TestResult: """Returns True if test runner must stop.""" if item[0]: # Thread got an exception @@ -564,7 +569,7 @@ def _process_result(self, item: QueueOutput) -> TestResult: mp_result = item[1] result = mp_result.result self.results.accumulate_result(result, self.runtests) - self.display_result(mp_result) + self.display_result(mp_result, workers=workers) # Display worker stdout if not self.runtests.output_on_failure: @@ -583,16 +588,16 @@ def run(self) -> None: fail_fast = self.runtests.fail_fast fail_env_changed = self.runtests.fail_env_changed - self.start_workers() + workers = self.start_workers() self.test_index = 0 try: while True: - item = self._get_result() + item = self._get_result(workers) if item is None: break - result = self._process_result(item) + result = self._process_result(item, workers=workers) if result.must_stop(fail_fast, fail_env_changed): break except KeyboardInterrupt: @@ -605,4 +610,4 @@ def run(self) -> None: # Always ensure that all worker processes are no longer # worker when we exit this function self.pending.stop() - self.stop_workers() + self.stop_workers(workers)