Skip to content

Commit

Permalink
libregrtest: Don't store workers as an attribute on RunWorkers in…
Browse files Browse the repository at this point in the history
…stances
  • Loading branch information
AlexWaygood committed Dec 1, 2023
1 parent 05a370a commit 2fdea0b
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,23 +470,22 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers: list[WorkerThread] | None = None

jobs = self.runtests.get_jobs()
if jobs is not None:
# Don't spawn more threads than the number of jobs:
# these worker threads would never get anything to do.
self.num_workers = min(self.num_workers, jobs)

def start_workers(self) -> None:
self.workers = [WorkerThread(index, self)
for index in range(1, self.num_workers + 1)]
def start_workers(self) -> list[WorkerThread]:
workers = [WorkerThread(index, self)
for index in range(1, self.num_workers + 1)]
jobs = self.runtests.get_jobs()
if jobs is not None:
tests = count(jobs, 'test')
else:
tests = 'tests'
nworkers = len(self.workers)
nworkers = len(workers)
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
Expand All @@ -495,23 +494,25 @@ def start_workers(self) -> None:
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
for worker in workers:
worker.start()
return workers

def stop_workers(self) -> None:
@staticmethod
def stop_workers(workers: list[WorkerThread]) -> None:
start_time = time.monotonic()
for worker in self.workers:
for worker in workers:
worker.stop()
for worker in self.workers:
for worker in workers:
worker.wait_stopped(start_time)

def _get_result(self) -> QueueOutput | None:
def _get_result(self, workers: list[WorkerThread]) -> QueueOutput | None:
pgo = self.runtests.pgo
use_faulthandler = (self.timeout is not None)

# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
while any(worker.is_alive() for worker in self.workers):
while any(worker.is_alive() for worker in workers):
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)
Expand All @@ -524,7 +525,7 @@ def _get_result(self) -> QueueOutput | None:

if not pgo:
# display progress
running = get_running(self.workers)
running = get_running(workers)
if running:
self.log(running)

Expand All @@ -534,7 +535,9 @@ def _get_result(self) -> QueueOutput | None:
except queue.Empty:
return None

def display_result(self, mp_result: MultiprocessResult) -> None:
def display_result(
self, mp_result: MultiprocessResult, *, workers: list[WorkerThread]
) -> None:
result = mp_result.result
pgo = self.runtests.pgo

Expand All @@ -545,12 +548,14 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)
running = get_running(workers)
if running:
text += f' -- {running}'
self.display_progress(self.test_index, text)

def _process_result(self, item: QueueOutput) -> TestResult:
def _process_result(
self, item: QueueOutput, *, workers: list[WorkerThread]
) -> TestResult:
"""Returns True if test runner must stop."""
if item[0]:
# Thread got an exception
Expand All @@ -564,7 +569,7 @@ def _process_result(self, item: QueueOutput) -> TestResult:
mp_result = item[1]
result = mp_result.result
self.results.accumulate_result(result, self.runtests)
self.display_result(mp_result)
self.display_result(mp_result, workers=workers)

# Display worker stdout
if not self.runtests.output_on_failure:
Expand All @@ -583,16 +588,16 @@ def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed

self.start_workers()
workers = self.start_workers()

self.test_index = 0
try:
while True:
item = self._get_result()
item = self._get_result(workers)
if item is None:
break

result = self._process_result(item)
result = self._process_result(item, workers=workers)
if result.must_stop(fail_fast, fail_env_changed):
break
except KeyboardInterrupt:
Expand All @@ -605,4 +610,4 @@ def run(self) -> None:
# Always ensure that all worker processes are no longer
# worker when we exit this function
self.pending.stop()
self.stop_workers()
self.stop_workers(workers)

0 comments on commit 2fdea0b

Please sign in to comment.