From b82023c97ea8518125220a1646292318603217d3 Mon Sep 17 00:00:00 2001 From: qazal <77887910+Qazalin@users.noreply.github.com> Date: Mon, 7 Oct 2024 08:57:05 +0300 Subject: [PATCH] process replay cleanup to generic _pmap [pr] (#6929) * process replay cleanup to generic _pmap [pr] * delete `COMPARE_SCHEDULE` --- .github/workflows/benchmark.yml | 1 - .../external/process_replay/process_replay.py | 56 +++++-------------- 2 files changed, 15 insertions(+), 42 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index e966af1740bf..c2e136ee368c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -1,7 +1,6 @@ name: Benchmarks env: # TODO: this rescheduling makes gpt2, mixtral and llama unjitted slower - COMPARE_SCHEDULE: "0" RUN_PROCESS_REPLAY: "1" ASSERT_PROCESS_REPLAY: "0" PYTHONPATH: . diff --git a/test/external/process_replay/process_replay.py b/test/external/process_replay/process_replay.py index 5ee5d9230a2e..1ae9152752ed 100755 --- a/test/external/process_replay/process_replay.py +++ b/test/external/process_replay/process_replay.py @@ -23,7 +23,6 @@ ASSERT_DIFF = int(any(flag in os.getenv("COMMIT_MESSAGE", flag) or flag in os.getenv("PR_TITLE", flag) for flag in ASSERT_FLAGS)) if not getenv("ASSERT_PROCESS_REPLAY", 1): ASSERT_DIFF = 0 SKIP_PROCESS_REPLAY = (k:="[skip_process_replay]") in os.getenv("COMMIT_MESSAGE", "") or k in os.getenv("PR_TITLE", "") -COMPARE_SCHEDULE = getenv("COMPARE_SCHEDULE", 1) if REF == "master": SKIP_PROCESS_REPLAY = True # *** differs @@ -105,7 +104,15 @@ def diff_kernel(offset:int) -> Union[Tuple[int, int], bool]: # *** generic runner for executing fxn across all rows of a table in parallel -def _pmap(row_count:int, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxtasksperchild:int=16) -> None: +def _pmap(name:str, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxtasksperchild:int=16) -> None: + conn = db_connection() + cur = conn.cursor() + try: row_count = cur.execute(f"select count(*) from '{name}_{TABLE_NAME}'").fetchone()[0] + except sqlite3.OperationalError: + logging.warning(f"{name}_{TABLE_NAME} isn't accessible in master, did DB_VERSION change?") + return None + conn.commit() + cur.close() with multiprocessing.get_context("spawn").Pool(multiprocessing.cpu_count(), maxtasksperchild=maxtasksperchild) as pool: inputs = list(range(0, row_count, PAGE_SIZE)) ret: List[Union[bool, Tuple[int, int]]] = list(tqdm(pool.imap_unordered(fxn, inputs), total=len(inputs))) @@ -115,37 +122,10 @@ def _pmap(row_count:int, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxt changed = [bool(x[0] or x[1]) if isinstance(x, tuple) else x for x in ret] insertion, deletions = [x[0] for x in ret if isinstance(x, tuple)], [x[1] for x in ret if isinstance(x, tuple)] logging.info(f"{sum(changed)} kernels changed") - if len(insertion) != 0: logging.info(colored(f"{sum(insertion)} insertions(+)", "green")) - if len(deletions) != 0: logging.info(colored(f"{sum(deletions)} deletions(-)", "red")) + if sum(insertion) != 0: logging.info(colored(f"{sum(insertion)} insertions(+)", "green")) + if sum(deletions) != 0: logging.info(colored(f"{sum(deletions)} deletions(-)", "red")) if any(changed) and ASSERT_DIFF: raise AssertionError("process replay detected changes") -# *** process replay parallel differ runners - -def process_replay_schedule() -> None: - conn = db_connection() - cur = conn.cursor() - try: has_diff = cur.execute(f"select name from sqlite_master where type='table' and name='schedule_diff_{VERSION}'").fetchone() - except sqlite3.OperationalError: - logging.warning(f"schedule_diff_{VERSION} isn't accessible in master, did DB_VERSION change?") - return - if has_diff: - row_count = cur.execute(f"select count(*) from 'schedule_diff_{VERSION}'").fetchone()[0] - if row_count != 0: logging.info("***** schedule diff") - conn.commit() - cur.close() - _pmap(row_count, diff_schedule) - -def process_replay_kernel() -> None: - conn = db_connection() - cur = conn.cursor() - try: row_count = cur.execute(f"select count(*) from 'kernel_{TABLE_NAME}'").fetchone()[0] - except sqlite3.OperationalError: - logging.warning(f"kernel_{TABLE_NAME} isn't accessible in master, did DB_VERSION change?") - return None - conn.commit() - cur.close() - _pmap(row_count, diff_kernel) - # *** main loop if __name__ == "__main__": @@ -153,15 +133,9 @@ def process_replay_kernel() -> None: logging.info("skipping process replay.") exit(0) - if COMPARE_SCHEDULE: - logging.info("***** schedule diff") - try: process_replay_schedule() + for name,fxn in [("schedule", diff_schedule), ("kernel", diff_kernel)]: + logging.info(f"***** {name} diff") + try: _pmap(name, fxn) except Exception as e: if ASSERT_DIFF: raise e - logging.error(f"schedule diff err {e}") - - logging.info("***** kernel diff") - try: process_replay_kernel() - except Exception as e: - if ASSERT_DIFF: raise e - logging.error(f"kernel diff err {e}") + logging.error(f"{name} diff err {e}")