From 5bb8e137acce042c7a9e57ab5247cc3e7b30d55c Mon Sep 17 00:00:00 2001 From: "pierre.delaunay" Date: Wed, 26 Jun 2024 02:37:50 -0400 Subject: [PATCH] Fix CPU default --- benchmate/benchmate/monitor.py | 4 +-- milabench/_version.py | 6 ++-- milabench/alt_async.py | 36 +++++++++---------- milabench/cli/compare.py | 2 +- milabench/cli/gather.py | 2 +- milabench/cli/report.py | 2 +- milabench/cli/run.py | 2 +- milabench/cli/summary.py | 2 +- milabench/commands/executors.py | 32 +++++++++-------- milabench/common.py | 21 ++++++----- milabench/config.py | 13 ++++--- milabench/log.py | 23 +++++++----- milabench/multi.py | 18 ++++++---- milabench/report.py | 11 ++++-- milabench/summary.py | 14 ++++++-- milabench/testing.py | 2 +- scripts/article/run_cuda.sh | 20 ++++++++--- tests/integration/test_postgresql.py | 2 +- tests/test_metrics.py | 2 +- tests/test_scaler.py | 2 +- tests/test_slurm.py | 2 +- tests/test_summary.py | 2 +- tests/test_summary/test_report.txt | 7 +--- .../test_report_folder_does_average.txt | 2 +- 24 files changed, 137 insertions(+), 92 deletions(-) diff --git a/benchmate/benchmate/monitor.py b/benchmate/benchmate/monitor.py index e21931458..ad218bff5 100644 --- a/benchmate/benchmate/monitor.py +++ b/benchmate/benchmate/monitor.py @@ -74,7 +74,7 @@ def monitor_fn(): } for gpu in get_gpu_info()["gpus"].values() } - return {"task": "train", "gpudata": data, "time": time.time(), "units": "s"} + return {"task": "train", "gpudata": data, "time": time.time()} monitor = CustomMonitor(0.5, monitor_fn) @@ -82,7 +82,7 @@ def log(data): nonlocal monitor if data_file is not None: - data["t"] = time.time() + data["time"] = time.time() print(json.dumps(data), file=data_file) while not monitor.results.empty(): diff --git a/milabench/_version.py b/milabench/_version.py index f09618832..2f6bd5d42 100644 --- a/milabench/_version.py +++ b/milabench/_version.py @@ -1,5 +1,5 @@ """This file is generated, do not modify""" -__tag__ = "v0.1.0-30-g94b27a71" -__commit__ = "94b27a71145d3ba754a2713aeca60e5a28be4bc5" -__date__ = "2024-06-25 13:49:52 -0400" +__tag__ = "v0.1.0-32-ge9e52501" +__commit__ = "e9e52501ad92d2ee2dac97e66f601a0458404986" +__date__ = "2024-06-26 02:37:50 -0400" diff --git a/milabench/alt_async.py b/milabench/alt_async.py index cd5faef00..8608196d3 100644 --- a/milabench/alt_async.py +++ b/milabench/alt_async.py @@ -10,7 +10,7 @@ from collections import deque from functools import wraps -from benchmate.warden import process_cleaner, destroy +from benchmate.warden import destroy from voir.proc import run as voir_run @@ -166,29 +166,29 @@ async def wrapped(*args, **kwargs): return wrapped + + @feedback_runner def run(argv, setsid=None, process_accumulator=None, info={}, **kwargs): - with process_cleaner() as processes: - if setsid: - kwargs["preexec_fn"] = os.setsid - - mx = voir_run(argv, info=info, **kwargs, timeout=0) - processes.add_process(*mx.processes) + if setsid: + kwargs["preexec_fn"] = os.setsid - if process_accumulator is not None: - process_accumulator.extend(mx.processes) + mx = voir_run(argv, info=info, **kwargs, timeout=0) + + if process_accumulator is not None: + process_accumulator.extend(mx.processes) - if setsid: - for proc in mx.processes: - proc.did_setsid = True + if setsid: + for proc in mx.processes: + proc.did_setsid = True - loop = asyncio.get_running_loop() - loop._multiplexers.append(mx) + loop = asyncio.get_running_loop() + loop._multiplexers.append(mx) - for entry in mx: - if entry and entry.event == "stop": - destroy(*mx.processes) - yield entry + for entry in mx: + if entry and entry.event == "stop": + destroy(*mx.processes) + yield entry def proceed(coro): diff --git a/milabench/cli/compare.py b/milabench/cli/compare.py index 59da8e255..b2992857c 100644 --- a/milabench/cli/compare.py +++ b/milabench/cli/compare.py @@ -70,6 +70,6 @@ def cli_compare(args=None): for run in runs: all_data = _read_reports(run.path) - run.summary = make_summary(all_data.values()) + run.summary = make_summary(all_data) compare(runs, args.last, args.metric, args.stat) diff --git a/milabench/cli/gather.py b/milabench/cli/gather.py index 3669a74df..d3058d65c 100644 --- a/milabench/cli/gather.py +++ b/milabench/cli/gather.py @@ -106,7 +106,7 @@ def gather_cli(args=None): data = [] for run in runs: reports = _read_reports(run) - summary = make_summary(reports.values(), query=query) + summary = make_summary(reports, query=query) df = make_dataframe(summary, None, None, query=query) name = run.split("/")[-1] diff --git a/milabench/cli/report.py b/milabench/cli/report.py index cd1a1ca12..9db3e5ca9 100644 --- a/milabench/cli/report.py +++ b/milabench/cli/report.py @@ -71,7 +71,7 @@ def cli_report(args=None): reports = None if args.runs: reports = _read_reports(*args.runs) - summary = make_summary(reports.values()) + summary = make_summary(reports) if args.config: from milabench.common import arguments as multipack_args diff --git a/milabench/cli/run.py b/milabench/cli/run.py index 96e81eeb8..c59e7e6df 100644 --- a/milabench/cli/run.py +++ b/milabench/cli/run.py @@ -112,7 +112,7 @@ def cli_run(args=None): reports = _read_reports(*runs) assert len(reports) != 0, "No reports found" - summary = make_summary(reports.values()) + summary = make_summary(reports) assert len(summary) != 0, "No summaries" make_report( diff --git a/milabench/cli/summary.py b/milabench/cli/summary.py index 3a5b9a83c..08390b464 100644 --- a/milabench/cli/summary.py +++ b/milabench/cli/summary.py @@ -34,7 +34,7 @@ def cli_summary(args=None): args = arguments() all_data = _read_reports(*args.runs) - summary = make_summary(all_data.values()) + summary = make_summary(all_data) if args.out is not None: with open(args.out, "w") as file: diff --git a/milabench/commands/executors.py b/milabench/commands/executors.py index 15b57403b..ce043b3b3 100644 --- a/milabench/commands/executors.py +++ b/milabench/commands/executors.py @@ -1,6 +1,8 @@ import asyncio import os +from benchmate.warden import process_cleaner + from ..alt_async import destroy, run from ..metadata import machine_metadata from ..structs import BenchLogEntry @@ -60,21 +62,23 @@ async def execute_command( pack.phase = phase timeout_tasks = [] - for pack, argv, _kwargs in command.commands(): - await pack.send(event="config", data=pack.config) - await pack.send(event="meta", data=machine_metadata(pack)) + with process_cleaner() as warden: + for pack, argv, _kwargs in command.commands(): + await pack.send(event="config", data=pack.config) + await pack.send(event="meta", data=machine_metadata(pack)) - fut = execute(pack, *argv, **{**_kwargs, **kwargs}) - coro.append(fut) + fut = execute(pack, *argv, **{**_kwargs, **kwargs}) + coro.append(fut) + warden.add_process(*pack.processes) - if timeout: - delay = pack.config.get("max_duration", timeout_delay) - timeout_task = asyncio.create_task(force_terminate(pack, delay)) - timeout_tasks.append(timeout_task) + if timeout: + delay = pack.config.get("max_duration", timeout_delay) + timeout_task = asyncio.create_task(force_terminate(pack, delay)) + timeout_tasks.append(timeout_task) - results = await asyncio.gather(*coro) + results = await asyncio.gather(*coro) - if timeout: - for task in timeout_tasks: - task.cancel() - return results + if timeout: + for task in timeout_tasks: + task.cancel() + return results diff --git a/milabench/common.py b/milabench/common.py index f5e90efc5..eab593fbd 100644 --- a/milabench/common.py +++ b/milabench/common.py @@ -278,7 +278,10 @@ def _parse_report(pth): traceback.print_exc() bad_lines += 1 - if good_lines == 0: + if good_lines == 0 and bad_lines == 0: + print(f"Empty file {pth}") + + if good_lines == 0 and bad_lines > 0: print(f"Unknow format for file {pth}") return data @@ -300,12 +303,14 @@ def _read_reports(*runs): def _error_report(reports): out = {} for r, data in reports.items(): - agg = aggregate(data) - if not agg: - continue - (success,) = agg["data"]["success"] - if not success: - out[r] = [line for line in data if "#stdout" in line or "#stderr" in line] + try: + agg = aggregate(data) + (success,) = agg["data"]["success"] + if not success: + out[r] = [line for line in data if "#stdout" in line or "#stderr" in line] + except: + pass + return out @@ -362,7 +367,7 @@ def _short_make_report(runs, config): if runs: reports = _read_reports(*runs) - summary = make_summary(reports.values()) + summary = make_summary(reports) if config: config = _get_multipack(CommonArguments(config), return_config=True) diff --git a/milabench/config.py b/milabench/config.py index 2003c3af6..f61d1175c 100644 --- a/milabench/config.py +++ b/milabench/config.py @@ -13,15 +13,20 @@ config_global = contextvars.ContextVar("config", default=None) -execution_count = contextvars.ContextVar("count", default=0) +execution_count = (0, 0) -def set_run_count(total): - execution_count.set(total) +def set_run_count(total_run, total_bench): + global execution_count + execution_count = (total_run, total_bench) def get_run_count(): - return execution_count.get() + return execution_count[0] + + +def get_bench_count(): + return execution_count[1] def get_base_folder(): diff --git a/milabench/log.py b/milabench/log.py index 602fd02d9..d38900260 100644 --- a/milabench/log.py +++ b/milabench/log.py @@ -220,6 +220,15 @@ def log(self, entry): self.file(entry).write(f"{j}\n") +def new_progress_bar(): + progress = Progress( + BarColumn(), + TextColumn("({task.completed}/{task.total})"), + ) + progress._task = progress.add_task("progress") + return progress + + class DashFormatter(BaseLogger): def __init__(self): self.panel = Panel("") @@ -230,7 +239,7 @@ def __init__(self): self.early_stop = {} # Limit the number of rows to avoid too much clutering # This is a soft limit, it only prunes finished runs - self.max_rows = 8 + self.max_rows = 2 self.prune_delay = 60 self.current = 0 @@ -238,13 +247,7 @@ def _get_global_progress_bar(self): progress = self.rows.get("GLOBAL") if progress is not None: return progress["progress"] - - progress = Progress( - BarColumn(), - TimeRemainingColumn(), - TextColumn("({task.completed}/{task.total})"), - ) - progress._task = progress.add_task("progress") + progress = new_progress_bar() progress.update(progress._task, completed=self.current, total=get_run_count()) self.rows["GLOBAL"] = {"progress": progress} return progress @@ -267,12 +270,16 @@ def refresh(self): self.live.update(self.make_table()) def start(self): + self._update_global(0) self.live.__enter__() def end(self): self.live.__exit__(None, None, None) def __call__(self, entry): + if get_run_count(): + self._get_global_progress_bar() + event = entry.event data = entry.data tag = entry.tag diff --git a/milabench/multi.py b/milabench/multi.py index 12b683793..adb5886cf 100644 --- a/milabench/multi.py +++ b/milabench/multi.py @@ -187,9 +187,8 @@ async def do_run(self, repeat=1): return assert is_main_local(setup), "Running benchmarks only works on the main node" - - set_run_count(await self.count_runs(repeat)) - + await self.count_runs(repeat) + for index in range(repeat): for pack in self.packs.values(): try: @@ -280,7 +279,8 @@ async def do_pin( ) async def count_runs(self, repeat): - acc = 0 + total_run = 0 + total_bench = 0 for index in range(repeat): for pack in self.packs.values(): if not await is_system_capable(pack): @@ -288,8 +288,12 @@ async def count_runs(self, repeat): exec_plan = make_execution_plan(pack, index, repeat) + total_bench += 1 + if isinstance(exec_plan, PerGPU): - acc += len(exec_plan.devices) + total_run += len(exec_plan.devices) else: - acc += 1 - return acc + total_run += 1 + + set_run_count(total_run, total_bench) + return total_run diff --git a/milabench/report.py b/milabench/report.py index b9a31a6be..0611151c6 100644 --- a/milabench/report.py +++ b/milabench/report.py @@ -208,13 +208,18 @@ def _report_pergpu(entries, measure="50"): def make_dataframe(summary, compare=None, weights=None, query=None): if weights is not None: - # We overriden the config + # We've overriden the config required = weights.keys() for key in required: if key not in summary: - print(f"Missing benchmark {key}") - summary[key] = {"name": key, "n": 0, "successes": 0, "failures": 0} + summary[key] = { + "name": key, + "n": 0, + "successes": 0, + "failures": 0, + "enabled": weights[key]["enabled"] + } if weights is None: weights = dict() diff --git a/milabench/summary.py b/milabench/summary.py index f1bb6b3e9..01dfcb4ad 100644 --- a/milabench/summary.py +++ b/milabench/summary.py @@ -5,9 +5,9 @@ import numpy as np from .utils import error_guard +from .syslog import syslog -@error_guard(None) def aggregate(run_data): """Group all the data inside a dictionary of lists""" omnibus = defaultdict(list) @@ -228,8 +228,18 @@ def _summarize(group, query=tuple([])) -> Summary: def make_summary(runs, query=tuple([])) -> dict[str, Summary]: - aggs = [agg for run in runs if (agg := aggregate(run))] + aggs = [] + for name, run in runs.items(): + try: + if agg := aggregate(run): + aggs.append(agg) + except AssertionError: + syslog("Ignoring run {0}: it looks like it did not finish successfully", name) + except Exception as err: + syslog("Ignoring run {0}: beause of exception: {1}", name, err) + classified = _classify(aggs) merged = {name: _merge(runs) for name, runs in classified.items()} summarized = {name: _summarize(agg, query) for name, agg in merged.items()} return summarized + diff --git a/milabench/testing.py b/milabench/testing.py index 90c3d4016..9dcd455ee 100644 --- a/milabench/testing.py +++ b/milabench/testing.py @@ -123,7 +123,7 @@ def replay_zipfile(path, *validation, sleep=0): from milabench.config import set_run_count - set_run_count(total) + set_run_count(total, len(data)) with multilogger(*validation) as log: for _, streams in data.items(): diff --git a/scripts/article/run_cuda.sh b/scripts/article/run_cuda.sh index 6cfdc96fb..7081000f6 100644 --- a/scripts/article/run_cuda.sh +++ b/scripts/article/run_cuda.sh @@ -5,7 +5,7 @@ set -ex export MILABENCH_GPU_ARCH=cuda export MILABENCH_WORDIR="$(pwd)/$MILABENCH_GPU_ARCH" export MILABENCH_BASE="$MILABENCH_WORDIR/results" -export MILABENCH_CONFIG="$MILABENCH_WORDIR/milabench/config/standard.yaml" + export MILABENCH_VENV="$MILABENCH_WORDIR/env" export BENCHMARK_VENV="$MILABENCH_WORDIR/results/venv/torch" @@ -13,6 +13,12 @@ if [ -z "${MILABENCH_PREPARE}" ]; then export MILABENCH_PREPARE=0 fi +if [ -z "${MILABENCH_SOURCE}" ]; then + export MILABENCH_CONFIG="$MILABENCH_WORDIR/milabench/config/standard.yaml" +else + export MILABENCH_CONFIG="$MILABENCH_SOURCE/config/standard.yaml" +fi + install_prepare() { mkdir -p $MILABENCH_WORDIR cd $MILABENCH_WORDIR @@ -21,12 +27,16 @@ install_prepare() { virtualenv $MILABENCH_WORDIR/env fi - if [ ! -d "$MILABENCH_WORDIR/milabench" ]; then - git clone https://github.com/mila-iqia/milabench.git + if [ -z "${MILABENCH_SOURCE}" ]; then + if [ ! -d "$MILABENCH_WORDIR/milabench" ]; then + git clone https://github.com/mila-iqia/milabench.git + fi + export MILABENCH_SOURCE="$MILABENCH_WORDIR/milabench" fi - + . $MILABENCH_WORDIR/env/bin/activate - pip install -e $MILABENCH_WORDIR/milabench + + pip install -e $MILABENCH_SOURCE # # Install milabench's benchmarks in their venv diff --git a/tests/integration/test_postgresql.py b/tests/integration/test_postgresql.py index f7a904155..5e84debfe 100644 --- a/tests/integration/test_postgresql.py +++ b/tests/integration/test_postgresql.py @@ -41,6 +41,6 @@ def test_sqlalchemy_postgresql(runs_folder): # ------- runs = [run_dir] reports = _read_reports(*runs) - summary = make_summary(reports.values()) + summary = make_summary(reports) show_diff(summary, replicated) diff --git a/tests/test_metrics.py b/tests/test_metrics.py index c73a237fb..9cae0e9c7 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -24,6 +24,6 @@ def test_sqlalchemy_sqlite(runs_folder): # ------- runs = [run_dir] reports = _read_reports(*runs) - summary = make_summary(reports.values()) + summary = make_summary(reports) show_diff(summary, replicated) diff --git a/tests/test_scaler.py b/tests/test_scaler.py index dc4293e93..f00a89793 100644 --- a/tests/test_scaler.py +++ b/tests/test_scaler.py @@ -36,7 +36,7 @@ def test_scaler_use_optimized(multipack, config): @pytest.mark.parametrize("capacity,expected", _values) def test_scaler_autoscaler_lerp(multipack, config, capacity, expected): - sizer = Sizer(SizerOptions(size=None, autoscale=True), config("scaling")) + sizer = Sizer(SizerOptions(size=None, autoscale=True, multiple=None), config("scaling")) for k, pack in multipack.packs.items(): assert sizer.size(pack, capacity) == expected diff --git a/tests/test_slurm.py b/tests/test_slurm.py index 07c792fe3..67cda16a1 100644 --- a/tests/test_slurm.py +++ b/tests/test_slurm.py @@ -1,4 +1,4 @@ -from milabench.slurm import expand_node_list +from milabench.cli.slurm import expand_node_list def test_slurm_node_expand_0(): diff --git a/tests/test_summary.py b/tests/test_summary.py index 08cee16b7..4273d6715 100644 --- a/tests/test_summary.py +++ b/tests/test_summary.py @@ -90,6 +90,6 @@ def test_summary_full(runs_folder): runs = [run] reports = _read_reports(*runs) - summary = make_summary(reports.values()) + summary = make_summary(reports) make_report(summary, None) diff --git a/tests/test_summary/test_report.txt b/tests/test_summary/test_report.txt index 25790bad4..d6a8c0848 100644 --- a/tests/test_summary/test_report.txt +++ b/tests/test_summary/test_report.txt @@ -2,13 +2,8 @@ Source: XXX ================= Benchmark results ================= -<<<<<<< HEAD bench | fail | n | perf | sem% | std% | peak_memory | score | weight -benchio | 0 | 4 | 7979.82 | 2.9% | 17.2% | -1 | 7979.82 | 2.00 -======= - fail n perf sem% std% peak_memory score weight -benchio 0 4 7979.82 2.9% 17.2% NaN 7979.82 2.00 ->>>>>>> cfd32afb45f89bce2899e3dcf2a63bd54844a61d +benchio | 0 | 4 | 7979.82 | 2.9% | 17.2% | nan | 7979.82 | 2.00 Scores ------ diff --git a/tests/test_summary/test_report_folder_does_average.txt b/tests/test_summary/test_report_folder_does_average.txt index b7ad71e7e..70027ec07 100644 --- a/tests/test_summary/test_report_folder_does_average.txt +++ b/tests/test_summary/test_report_folder_does_average.txt @@ -3,7 +3,7 @@ Source: XXX Benchmark results ================= bench | fail | n | perf | sem% | std% | peak_memory | score | weight -benchio | 0 | 6 | 7878.45 | 2.5% | 18.0% | 24456 | 7878.45 | 2.00 +benchio | 0 | 6 | 7878.45 | 2.5% | 18.0% | 24456 | 7878.45 | 2.00 Scores ------