Skip to content

Commit

Permalink
[integ-test] improve integration test framework
Browse files Browse the repository at this point in the history
Signed-off-by: Hanwen <[email protected]>
  • Loading branch information
hanwen-cluster committed Nov 8, 2024
1 parent 7bf3d3a commit 45cbade
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions tests/integration-tests/framework/fixture_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import os
import pickle
import re
import time
from dataclasses import dataclass, field
from inspect import getfullargspec, isgeneratorfunction
Expand Down Expand Up @@ -53,13 +54,15 @@ def __init__(
fixture_func_args: tuple,
fixture_func_kwargs: dict,
xdist_worker_id_and_pid: str,
log_file: str,
):
self.name = name
self.shared_save_location = shared_save_location
self.fixture_func = fixture_func
self.fixture_func_args = fixture_func_args
self.fixture_func_kwargs = fixture_func_kwargs
self.xdist_worker_id_and_pid = xdist_worker_id_and_pid
self.log_file = log_file
self._lock_file = shared_save_location / f"{name}.lock"
self._fixture_file = shared_save_location / f"{name}.fixture"
self._generator = None
Expand Down Expand Up @@ -95,6 +98,19 @@ def _terminate_process(pid: int):
except Exception:
logging.error("Error terminating process %s.", pid)

@staticmethod
def _completed_in_the_past(minutes: int, line: str):
if line is None or "- Completed test" not in line:
return False
regex=r"^(.*?) \- .*"
re_result = re.search(regex, line)
if re_result:
date = re_result.group(1)
try:
return (time.time() - time.mktime(time.strptime(date, "%Y-%m-%d %H:%M:%S,%f"))) / 60 > minutes
except Exception:
return False

def release(self):
"""
Release a shared fixture.
Expand All @@ -119,6 +135,16 @@ def release(self):
)
time.sleep(30)

last_message_of_each_proc = {}
with open(self.log_file, "r") as f:
lines = f.readlines()
for line in lines:
regex=r"^.* \- .* \- (\d+) - .*"
re_result = re.search(regex, line)
if re_result:
pid = re.search(regex, line).group(1)
last_message_of_each_proc[pid] = line

with FileLock(self._lock_file):
for worker in data.currently_using_processes.copy():
pid = int(worker.split(" ")[1])
Expand All @@ -132,6 +158,9 @@ def release(self):
data.counter,
)
self._save_fixture_data(data)
elif self._completed_in_the_past(30, last_message_of_each_proc.get(str(pid))):
logging.warning("%s is sleeping but the test has been finished. Terminating the process, ", worker)
self._terminate_process(pid)
data = self._load_fixture_data()

if time.time() > timeout:
Expand Down Expand Up @@ -222,6 +251,7 @@ def _xdist_session_fixture(request, *args, **kwargs):
fixture_func_args=args,
fixture_func_kwargs=kwargs,
xdist_worker_id_and_pid=f"{xdist_worker_id}: {pid}",
log_file=request.config.getoption("tests_log_file")
)
try:
yield shared_fixture.acquire().fixture_return_value
Expand Down

0 comments on commit 45cbade

Please sign in to comment.