From 1fad24ca7e02b65e9591c21ee3a0d28ff33e65b9 Mon Sep 17 00:00:00 2001 From: "pierre.delaunay" Date: Fri, 14 Jun 2024 10:37:45 -0400 Subject: [PATCH] Add metric gathering utility for batch x worker run --- milabench/cli/gather.py | 124 +++++++++++++++++++++++++++++++++ milabench/cli/matrix.py | 17 +++-- milabench/commands/__init__.py | 2 +- milabench/common.py | 2 +- milabench/config.py | 3 - milabench/log.py | 15 ++-- milabench/report.py | 114 +++++++++++++++++++----------- milabench/sizer.py | 38 ++++++++-- milabench/summary.py | 31 ++++++++- milabench/system.py | 20 +++--- 10 files changed, 290 insertions(+), 76 deletions(-) create mode 100644 milabench/cli/gather.py diff --git a/milabench/cli/gather.py b/milabench/cli/gather.py new file mode 100644 index 000000000..3669a74df --- /dev/null +++ b/milabench/cli/gather.py @@ -0,0 +1,124 @@ +import argparse +import os +import re +from dataclasses import dataclass, field + +import pandas as pd + +from ..common import _read_reports +from ..report import make_dataframe, pandas_to_string +from ..summary import make_summary + + +def default_tags(): + return [ + "worker=w([a-z0-9]*)", + "multiple=m([0-9]*)", + "power=p([0-9]*)", + "capacity=c([A-Za-z0-9]*(Go)?)", + ] + + +# fmt: off +@dataclass +class Arguments: + runs: str + tags: list = field(default_factory=default_tags) +# fmt: on + + +def arguments(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--runs", + type=str, + help="Run folder", + default="/home/mila/d/delaunap/batch_x_worker/", + ) + parser.add_argument( + "--tags", + type=str, + help="Tags defined in run names", + default=default_tags(), + ) + return parser.parse_args() # Arguments() + + +def get_config(reports): + k = list(reports.keys())[0] + config = None + for line in reports[k]: + if line["event"] == "config": + config = line["data"] + break + return config + + +def extract_tags(name, tags): + for tag, pat in tags.items(): + if m := pat.search(name): + value = m.group(1) + yield tag, value + else: + print(f"{tag} not found in {name}") + yield tag, "NA" + + +def gather_cli(args=None): + """Gather metrics from runs inside a folder in a neat format. + It can extract tags/flags from the runname and create new columns to uniquely identify runs. + + Examples + -------- + + >>> python -m milabench.cli.gather --runs /home/mila/d/delaunap/batch_x_worker/ + bench | fail | n | perf | sem% | std% | peak_memory | score | weight | elapsed | name | worker | multiple | power | capacity + brax | 0 | 1 | 722480.33 | 0.7% | 5.2% | 6448 | 722480.33 | 1.00 | 94 | w16-m8-c4Go | 16 | 8 | NA | 4Go + dlrm | 0 | 1 | 350641.30 | 0.6% | 4.6% | 7624 | 350641.30 | 1.00 | 124 | w16-m8-c4Go | 16 | 8 | NA | 4Go + .... + brax | 0 | 1 | 723867.42 | 0.6% | 4.5% | 6448 | 723867.42 | 1.00 | 94 | w2-m8-c8Go | 2 | 8 | NA | 8Go + dlrm | 0 | 1 | 403113.36 | 0.7% | 5.1% | 7420 | 403113.36 | 1.00 | 258 | w2-m8-c8Go | 2 | 8 | NA | 8Go + bf16 | 0 | 8 | 293.08 | 0.3% | 7.5% | 5688 | 2361.09 | 0.00 | 18 | w2-m8-c8Go | 2 | 8 | NA | 8Go + fp16 | 0 | 8 | 290.58 | 0.2% | 4.9% | 5688 | 2335.63 | 0.00 | 29 | w2-m8-c8Go | 2 | 8 | NA | 8Go + + """ + if args is None: + args = arguments() + + runs = [] + for folder in os.listdir(args.runs): + if folder.startswith("prepare"): + continue + + if folder.startswith("install"): + continue + + path = f"{args.runs}/{folder}" + if os.path.isdir(path): + runs.append(path) + + tags = dict() + for tag in args.tags: + name, regex = tag.split("=") + tags[name] = re.compile(regex) + + query = ("batch_size", "elapsed") + data = [] + for run in runs: + reports = _read_reports(run) + summary = make_summary(reports.values(), query=query) + df = make_dataframe(summary, None, None, query=query) + + name = run.split("/")[-1] + df["name"] = name.split(".", maxsplit=1)[0] + for tag, value in extract_tags(name, tags): + df[tag] = value + + data.append(df) + + gathered = pd.concat(data) + print(pandas_to_string(gathered)) + + +if __name__ == "__main__": + gather_cli() diff --git a/milabench/cli/matrix.py b/milabench/cli/matrix.py index 670a16faf..b2e78a510 100644 --- a/milabench/cli/matrix.py +++ b/milabench/cli/matrix.py @@ -1,12 +1,18 @@ -from dataclasses import dataclass import sys +from dataclasses import dataclass import yaml from coleo import Option, tooled -from ..system import build_system_config -from ..common import deduce_arch, build_config, get_base_defaults, merge, is_selected +from ..common import ( + build_config, + deduce_arch, + get_base_defaults, + is_selected, + merge, +) from ..sizer import resolve_argv, scale_argv +from ..system import build_system_config # fmt: off @@ -78,13 +84,14 @@ def cli_matrix_run(args=None): def resolve_args(conf, argv): from ..pack import Package + pack = Package(conf) args = [] for k, v in argv.items(): args.append(k) args.append(v) - + sized_args = scale_argv(pack, args) final_args = resolve_argv(pack, sized_args) @@ -94,7 +101,7 @@ def resolve_args(conf, argv): argv[k] = final_args[i + 1] i += 2 continue - + print(f"Missing resolved argument {k}") return argv diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index d4fae32d5..9e2ca1d77 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -666,7 +666,7 @@ def _argv(self, **_) -> List: resolver = new_argument_resolver(self.pack) - cpu_per_process = resolver(str(self.pack.config['argv']['--cpus_per_gpu'])) + cpu_per_process = resolver(str(self.pack.config["argv"]["--cpus_per_gpu"])) return [ # -- Run the command in the right venv # This could be inside the SSH Command diff --git a/milabench/common.py b/milabench/common.py index b6ef8c1e3..429895ef7 100644 --- a/milabench/common.py +++ b/milabench/common.py @@ -15,7 +15,6 @@ from milabench.alt_async import proceed from milabench.utils import available_layers, blabla, multilogger -from .system import build_system_config from .config import build_config from .fs import XPath from .log import TerminalFormatter @@ -23,6 +22,7 @@ from .multi import MultiPackage from .report import make_report from .summary import aggregate, make_summary +from .system import build_system_config def get_pack(defn): diff --git a/milabench/config.py b/milabench/config.py index d549fc3e5..585dee48f 100644 --- a/milabench/config.py +++ b/milabench/config.py @@ -4,14 +4,12 @@ from copy import deepcopy import psutil -from copy import deepcopy import yaml from omegaconf import OmegaConf from .fs import XPath from .merge import merge - config_global = contextvars.ContextVar("config", default=None) @@ -112,7 +110,6 @@ def build_matrix_bench(all_configs): for name, bench_config in all_configs.items(): for k, v in expand_matrix(name, bench_config): - if k in expanded_config: raise ValueError("Bench name is not unique") diff --git a/milabench/log.py b/milabench/log.py index f012f672e..bed8aac3e 100644 --- a/milabench/log.py +++ b/milabench/log.py @@ -145,10 +145,11 @@ def __call__(self, entry): pass elif event == "config": + def _show(k, entry): if k in ("meta", "system"): return - + if isinstance(entry, dict): for k2, v in entry.items(): _show(f"{k}.{k2}", v) @@ -302,9 +303,9 @@ def on_data(self, entry, data, row): load = int(data.get("load", 0) * 100) currm, totalm = data.get("memory", [0, 0]) temp = int(data.get("temperature", 0)) - row[f"gpu:{gpuid}"] = ( - f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C" - ) + row[ + f"gpu:{gpuid}" + ] = f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C" row["gpu_load"] = f"{load}%" row["gpu_mem"] = f"{currm:.0f}/{totalm:.0f} MB" row["gpu_temp"] = f"{temp}C" @@ -378,9 +379,9 @@ def on_data(self, entry, data, row): load = int(data.get("load", 0) * 100) currm, totalm = data.get("memory", [0, 0]) temp = int(data.get("temperature", 0)) - row[f"gpu:{gpuid}"] = ( - f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C" - ) + row[ + f"gpu:{gpuid}" + ] = f"{load}% load | {currm:.0f}/{totalm:.0f} MB | {temp}C" else: task = data.pop("task", "") units = data.pop("units", "") diff --git a/milabench/report.py b/milabench/report.py index 8488482e4..42a701a16 100644 --- a/milabench/report.py +++ b/milabench/report.py @@ -13,7 +13,7 @@ @error_guard({}) -def _make_row(summary, compare, weights): +def _make_row(summary, compare, weights, query=None): mkey = "train_rate" metric = "mean" row = {} @@ -55,6 +55,12 @@ def _make_row(summary, compare, weights): row["weight"] = weights.get("weight", summary.get("weight", 0)) # ---- + if query is not None: + extra = summary.get("extra", dict()) + for q in query: + if v := extra.get(q): + row[q] = v + return row @@ -185,33 +191,50 @@ def _report_pergpu(entries, measure="50"): } -def make_dataframe(summary, compare=None, weights=None): +def make_dataframe(summary, compare=None, weights=None, query=None): if weights is None: weights = dict() all_keys = list( - sorted( - { - *(summary.keys() if summary else []), - *(compare.keys() if compare else []), - *(weights.keys() if weights else []), - } - ) + { + *(summary.keys() if summary else []), + *(compare.keys() if compare else []), + *(weights.keys() if weights else []), + } ) + def sort_by(key): + """Group similar runs together""" + if weights: + return weights[key]["group"] + + if summary: + return summary[key]["group"] + + return key + + # Sort by name first so bench with similar names are together + # we want bench in the same group with similar names to be close + all_keys = sorted(all_keys) + + # Sort by group so bench are grouped together + # we want flops bench to be close together no matter what their names are + all_keys = sorted(all_keys, key=sort_by) + df = DataFrame( { key: _make_row( summary.get(key, {}), compare and compare.get(key, {}), weights and weights.get(key, {}), + query=query, ) for key in all_keys } ).transpose() # Reorder columns - df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 0))] + df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 2000))] return df @@ -301,37 +324,6 @@ def _score(column): out.finalize() -def pandas_to_string(df, formatters): - """Default stdout printer does not insert a column sep which makes it hard to retranscribe results elsewhere. - to_csv does not align the output. - """ - from collections import defaultdict - - columns = df.columns.tolist() - - sep = " | " - lines = [] - col_size = defaultdict(int) - - for index, row in df.iterrows(): - line = [f"{index:<30}"] - for col, val in zip(columns, row): - fmt = formatters.get(col) - val = fmt(val) - col_size[col] = max(col_size[col], len(val)) - line.append(val) - - lines.append(sep.join(line)) - - def fmtcol(col): - size = col_size[col] - return f"{col:>{size}}" - - header = sep.join([f"{'bench':<30}"] + [fmtcol(col) for col in columns]) - - return "\n".join([header] + lines) - - _formatters = { "fail": "{:4.0f}".format, "n": "{:3.0f}".format, @@ -347,8 +339,10 @@ def fmtcol(col): "sem%": "{:6.1%}".format, "iqr%": "{:6.1%}".format, "score": "{:10.2f}".format, - "weight": "{:5.2f}".format, + "weight": "{:6.2f}".format, "peak_memory": "{:11.0f}".format, + "elapsed": "{:5.0f}".format, + "batch_size": "{:3.0f}".format, 0: "{:.0%}".format, 1: "{:.0%}".format, 2: "{:.0%}".format, @@ -368,6 +362,42 @@ def fmtcol(col): } +def pandas_to_string(df, formatters=_formatters): + """Default stdout printer does not insert a column sep which makes it hard to retranscribe results elsewhere. + to_csv does not align the output. + """ + from collections import defaultdict + + columns = df.columns.tolist() + + sep = " | " + lines = [] + col_size = defaultdict(int) + + for index, row in df.iterrows(): + line = [f"{index:<30}"] + for col, val in zip(columns, row): + fmt = formatters.get(col) + + if fmt is not None: + val = fmt(val) + col_size[col] = max(col_size[col], len(val)) + else: + val = str(val) + + line.append(val) + + lines.append(sep.join(line)) + + def fmtcol(col): + size = col_size[col] + return f"{col:>{size}}" + + header = sep.join([f"{'bench':<30}"] + [fmtcol(col) for col in columns]) + + return "\n".join([header] + lines) + + _table_style = H.style( """ body { diff --git a/milabench/sizer.py b/milabench/sizer.py index 341fbc75f..55ddcdb0d 100644 --- a/milabench/sizer.py +++ b/milabench/sizer.py @@ -2,14 +2,13 @@ import multiprocessing import os from copy import deepcopy -import multiprocessing import numpy as np import yaml from voir.instruments.gpu import get_gpu_info from .merge import merge -from .system import system_global, SizerOptions, CPUOptions +from .system import CPUOptions, SizerOptions, system_global from .validation.validation import ValidationLayer ROOT = os.path.dirname(__file__) @@ -70,7 +69,7 @@ def benchscaling(self, benchmark): # benchmark config if isinstance(benchmark, dict) and "name" in benchmark: - return benchmark + return self.scaling_config.get(benchmark["name"]) # pack return self.scaling_config.get(benchmark.config["name"]) @@ -143,6 +142,27 @@ def size(self, benchmark, capacity): return None + def find_batch_size(self, benchmark, event): + config = self.benchscaling(benchmark) + + if config is None: + return None + + argname = config.get("arg") + if argname is None: + return -1 + + if "event" in event: + event = event["data"] + + argv = event["command"] + + for i, arg in enumerate(argv): + if str(arg).endswith(argname): + return int(argv[i + 1]) + + return -1 + def argv(self, benchmark, capacity, argv): """Find the batch size and override it with a new value""" @@ -183,6 +203,11 @@ def batch_sizer() -> Sizer: return sizer +def get_batch_size(config, start_event): + sizer = batch_sizer() + return sizer.find_batch_size(config, start_event) + + def scale_argv(pack, argv): sizer = batch_sizer() @@ -276,8 +301,8 @@ def on_end(self, entry): def report(self, *args): if self.filepath is not None: newdata = self.memory - - if os.path.exists(self.filepath): + + if os.path.exists(self.filepath): with open(self.filepath, "r") as fp: previous_data = yaml.safe_load(fp) newdata = merge(previous_data, self.memory) @@ -290,7 +315,7 @@ def new_argument_resolver(pack): context = deepcopy(system_global.get()) arch = context.get("arch", "cpu") - if hasattr(pack, 'config'): + if hasattr(pack, "config"): device_count = len(pack.config.get("devices", [0])) else: device_count = len(get_gpu_info()["gpus"]) @@ -301,6 +326,7 @@ def new_argument_resolver(pack): device_count = 1 options = CPUOptions() + def auto(value, default): if options.enabled: return value diff --git a/milabench/summary.py b/milabench/summary.py index 0bc4a8d4e..946f6e6ef 100644 --- a/milabench/summary.py +++ b/milabench/summary.py @@ -135,8 +135,28 @@ def _metrics(xs): return metrics +@error_guard(dict()) +def augment(group, query=tuple([])): + """Optional augmentation steps that will add additional data. + Usually extracted from the run itself + """ + data = {} + + if "batch_size" in query: + from .sizer import get_batch_size + + data["batch_size"] = get_batch_size(group["config"], group["start"]) + + if "elapsed" in query: + start_time = group["start"]["time"] + end_time = group["end"]["time"] + data["elapsed"] = end_time - start_time + + return data + + @error_guard(None) -def _summarize(group): +def _summarize(group, query): agg = group["data"] gpudata = defaultdict(lambda: defaultdict(list)) @@ -152,8 +172,12 @@ def _summarize(group): per_gpu[device].append(tr) config = group["config"] + + additional = augment(group, query) + return { "name": config["name"], + "group": config["group"], "n": len(agg["success"]), "successes": sum(agg["success"]), "failures": sum(not x for x in agg["success"]), @@ -170,12 +194,13 @@ def _summarize(group): for device, data in gpudata.items() }, "weight": config.get("weight", 0), + "extra": additional, } -def make_summary(runs): +def make_summary(runs, query=None): aggs = [agg for run in runs if (agg := aggregate(run))] classified = _classify(aggs) merged = {name: _merge(runs) for name, runs in classified.items()} - summarized = {name: _summarize(agg) for name, agg in merged.items()} + summarized = {name: _summarize(agg, query) for name, agg in merged.items()} return summarized diff --git a/milabench/system.py b/milabench/system.py index e9fe8226c..c470bfbe6 100644 --- a/milabench/system.py +++ b/milabench/system.py @@ -1,7 +1,7 @@ import contextvars -from dataclasses import dataclass, field import os import socket +from dataclasses import dataclass, field import psutil import yaml @@ -27,20 +27,23 @@ def getenv(name, expected_type): def print_once(*args, **kwargs): printed = 0 + def _print(): nonlocal printed if printed == 0: print(*args, **kwargs) printed += 1 + return _print + warn_no_config = print_once("No system config found, using defaults") def option(name, etype, default=None): options = dict() system = system_global.get() - + if system: options = system.get("options", dict()) else: @@ -59,7 +62,7 @@ def option(name, etype, default=None): if final_value is None: return None - + try: value = etype(final_value) lookup[frags[-1]] = value @@ -75,6 +78,7 @@ def is_autoscale_enabled(): def default_save_location(): from pathlib import Path + return Path.home() / "new_scaling.yaml" @@ -97,13 +101,13 @@ class CPUOptions: cpu_max: int = option("cpu.max", int, 16) # min number of CPU per GPU - cpu_min: int = option("cpu.min", int, 2) + cpu_min: int = option("cpu.min", int, 2) # reserved CPU cores (i.e not available for the benchmark) - reserved_cores: int = option("cpu.reserved_cores", int, 0) + reserved_cores: int = option("cpu.reserved_cores", int, 0) # Number of workers (ignores cpu_max and cpu_min) - n_workers: int = option("cpu.n_workers", int) + n_workers: int = option("cpu.n_workers", int) @dataclass @@ -130,8 +134,8 @@ class Nodes: class SystemConfig: arch: str = getenv("MILABENCH_GPU_ARCH", str) sshkey: str = None - docker_image: str = None - nodes : list[Nodes] = field(default_factory=list) + docker_image: str = None + nodes: list[Nodes] = field(default_factory=list) gpu: GPUConfig = None options: Options = None