Skip to content

Commit

Permalink
process replay cleanup to generic _pmap [pr] (tinygrad#6929)
Browse files Browse the repository at this point in the history
* process replay cleanup to generic _pmap [pr]

* delete `COMPARE_SCHEDULE`
  • Loading branch information
Qazalin authored Oct 7, 2024
1 parent 16312b4 commit b82023c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 42 deletions.
1 change: 0 additions & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: Benchmarks
env:
# TODO: this rescheduling makes gpt2, mixtral and llama unjitted slower
COMPARE_SCHEDULE: "0"
RUN_PROCESS_REPLAY: "1"
ASSERT_PROCESS_REPLAY: "0"
PYTHONPATH: .
Expand Down
56 changes: 15 additions & 41 deletions test/external/process_replay/process_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
ASSERT_DIFF = int(any(flag in os.getenv("COMMIT_MESSAGE", flag) or flag in os.getenv("PR_TITLE", flag) for flag in ASSERT_FLAGS))
if not getenv("ASSERT_PROCESS_REPLAY", 1): ASSERT_DIFF = 0
SKIP_PROCESS_REPLAY = (k:="[skip_process_replay]") in os.getenv("COMMIT_MESSAGE", "") or k in os.getenv("PR_TITLE", "")
COMPARE_SCHEDULE = getenv("COMPARE_SCHEDULE", 1)
if REF == "master": SKIP_PROCESS_REPLAY = True

# *** differs
Expand Down Expand Up @@ -105,7 +104,15 @@ def diff_kernel(offset:int) -> Union[Tuple[int, int], bool]:

# *** generic runner for executing fxn across all rows of a table in parallel

def _pmap(row_count:int, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxtasksperchild:int=16) -> None:
def _pmap(name:str, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxtasksperchild:int=16) -> None:
conn = db_connection()
cur = conn.cursor()
try: row_count = cur.execute(f"select count(*) from '{name}_{TABLE_NAME}'").fetchone()[0]
except sqlite3.OperationalError:
logging.warning(f"{name}_{TABLE_NAME} isn't accessible in master, did DB_VERSION change?")
return None
conn.commit()
cur.close()
with multiprocessing.get_context("spawn").Pool(multiprocessing.cpu_count(), maxtasksperchild=maxtasksperchild) as pool:
inputs = list(range(0, row_count, PAGE_SIZE))
ret: List[Union[bool, Tuple[int, int]]] = list(tqdm(pool.imap_unordered(fxn, inputs), total=len(inputs)))
Expand All @@ -115,53 +122,20 @@ def _pmap(row_count:int, fxn:Callable[[int], Union[bool, Tuple[int, int]]], maxt
changed = [bool(x[0] or x[1]) if isinstance(x, tuple) else x for x in ret]
insertion, deletions = [x[0] for x in ret if isinstance(x, tuple)], [x[1] for x in ret if isinstance(x, tuple)]
logging.info(f"{sum(changed)} kernels changed")
if len(insertion) != 0: logging.info(colored(f"{sum(insertion)} insertions(+)", "green"))
if len(deletions) != 0: logging.info(colored(f"{sum(deletions)} deletions(-)", "red"))
if sum(insertion) != 0: logging.info(colored(f"{sum(insertion)} insertions(+)", "green"))
if sum(deletions) != 0: logging.info(colored(f"{sum(deletions)} deletions(-)", "red"))
if any(changed) and ASSERT_DIFF: raise AssertionError("process replay detected changes")

# *** process replay parallel differ runners

def process_replay_schedule() -> None:
conn = db_connection()
cur = conn.cursor()
try: has_diff = cur.execute(f"select name from sqlite_master where type='table' and name='schedule_diff_{VERSION}'").fetchone()
except sqlite3.OperationalError:
logging.warning(f"schedule_diff_{VERSION} isn't accessible in master, did DB_VERSION change?")
return
if has_diff:
row_count = cur.execute(f"select count(*) from 'schedule_diff_{VERSION}'").fetchone()[0]
if row_count != 0: logging.info("***** schedule diff")
conn.commit()
cur.close()
_pmap(row_count, diff_schedule)

def process_replay_kernel() -> None:
conn = db_connection()
cur = conn.cursor()
try: row_count = cur.execute(f"select count(*) from 'kernel_{TABLE_NAME}'").fetchone()[0]
except sqlite3.OperationalError:
logging.warning(f"kernel_{TABLE_NAME} isn't accessible in master, did DB_VERSION change?")
return None
conn.commit()
cur.close()
_pmap(row_count, diff_kernel)

# *** main loop

if __name__ == "__main__":
if SKIP_PROCESS_REPLAY:
logging.info("skipping process replay.")
exit(0)

if COMPARE_SCHEDULE:
logging.info("***** schedule diff")
try: process_replay_schedule()
for name,fxn in [("schedule", diff_schedule), ("kernel", diff_kernel)]:
logging.info(f"***** {name} diff")
try: _pmap(name, fxn)
except Exception as e:
if ASSERT_DIFF: raise e
logging.error(f"schedule diff err {e}")

logging.info("***** kernel diff")
try: process_replay_kernel()
except Exception as e:
if ASSERT_DIFF: raise e
logging.error(f"kernel diff err {e}")
logging.error(f"{name} diff err {e}")

0 comments on commit b82023c

Please sign in to comment.