diff --git a/benchmate/benchmate/datagen.py b/benchmate/benchmate/datagen.py index d3eacecf3..6cfd74225 100644 --- a/benchmate/benchmate/datagen.py +++ b/benchmate/benchmate/datagen.py @@ -1,28 +1,23 @@ #!/usr/bin/env python import argparse -from collections import defaultdict +import json import multiprocessing import os +from collections import defaultdict from pathlib import Path -import json -import warnings - -warnings.filterwarnings('ignore') import torch - from tqdm import tqdm def write(args): - import torch import torchvision.transforms as transforms - offset, outdir, size = args + offset, outdir, size = args img = torch.randn(*size) - target = offset % 1000 # torch.randint(0, 1000, size=(1,), dtype=torch.long)[0] + target = offset % 1000 # torch.randint(0, 1000, size=(1,), dtype=torch.long)[0] img = transforms.ToPILImage()(img) class_val = int(target) @@ -35,14 +30,16 @@ def write(args): img.save(image_path) -def generate(image_size, n, outdir, start = 0): +def generate(image_size, n, outdir, start=0): work_items = [] for i in range(n): - work_items.append([ - start + i, - outdir, - image_size, - ]) + work_items.append( + [ + start + i, + outdir, + image_size, + ] + ) n_worker = min(multiprocessing.cpu_count(), 8) with multiprocessing.Pool(n_worker) as pool: @@ -53,7 +50,7 @@ def generate(image_size, n, outdir, start = 0): def count_images(path): count = defaultdict(int) for root, _, files in tqdm(os.walk(path)): - split = root.split('/')[-2] + split = root.split("/")[-2] count[split] += len(files) return count @@ -71,7 +68,12 @@ def generate_sets(root, sets, shape): if current_count < count: print(f"Generating {split} (current {current_count}) (target: {count})") - generate(shape, count - current_count, os.path.join(root, split), start=current_count) + generate( + shape, + count - current_count, + os.path.join(root, split), + start=current_count, + ) with open(sentinel, "w") as fp: json.dump(sets, fp) @@ -92,9 +94,9 @@ def generate_fakeimagenet(): total_images = args.batch_size * args.batch_count size_spec = { - "train": total_images, - "val": int(total_images * args.val), - "test": int(total_images * args.test) + "train": total_images, + "val": int(total_images * args.val), + "test": int(total_images * args.test), } generate_sets(dest, size_spec, args.image_size) @@ -102,4 +104,4 @@ def generate_fakeimagenet(): if __name__ == "__main__": - generate_fakeimagenet() \ No newline at end of file + generate_fakeimagenet() diff --git a/benchmate/benchmate/dataloader.py b/benchmate/benchmate/dataloader.py index 869834cc6..0e3caf84c 100644 --- a/benchmate/benchmate/dataloader.py +++ b/benchmate/benchmate/dataloader.py @@ -3,9 +3,9 @@ import torch import torch.cuda.amp +import torchcompat.core as accelerator import torchvision.datasets as datasets import torchvision.transforms as transforms -import torchcompat.core as accelerator from torch.utils.data.distributed import DistributedSampler @@ -23,14 +23,14 @@ def generate_tensors(batch_size, shapes, device): tensors = [] if len(shapes[0]) == 2: tensors = dict() - + for kshape in shapes: if len(kshape) == 2: key, shape = kshape tensors[key] = torch.randn((batch_size, *shape), device=device) else: - tensors.append(torch.randn((batch_size, *kshape), device=device)) - + tensors.append(torch.randn((batch_size, *kshape), device=device)) + return tensors @@ -70,7 +70,7 @@ def __iter__(self): if self.fixed_batch: for _ in range(self.n): yield self.tensors - + else: for _ in range(self.n): yield [torch.rand_like(t) for t in self.tensors] @@ -80,25 +80,25 @@ def __len__(self): def dali(folder, batch_size, num_workers, rank=0, world_size=1): - from nvidia.dali.pipeline import pipeline_def - import nvidia.dali.types as types import nvidia.dali.fn as fn + import nvidia.dali.types as types + from nvidia.dali.pipeline import pipeline_def from nvidia.dali.plugin.pytorch import DALIGenericIterator @pipeline_def(num_threads=num_workers, device_id=0) def get_dali_pipeline(): images, labels = fn.readers.file( - file_root=folder, - random_shuffle=True, + file_root=folder, + random_shuffle=True, name="Reader", shard_id=rank, num_shards=world_size, ) - + # decode data on the GPU images = fn.decoders.image_random_crop( - images, - device="mixed", + images, + device="mixed", output_type=types.RGB, ) # the rest of processing happens on the GPU as well @@ -109,14 +109,14 @@ def get_dali_pipeline(): crop_w=224, mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], std=[0.229 * 255, 0.224 * 255, 0.225 * 255], - mirror=fn.random.coin_flip() + mirror=fn.random.coin_flip(), ) return images, labels train_data = DALIGenericIterator( [get_dali_pipeline(batch_size=batch_size)], - ['data', 'label'], - reader_name='Reader' + ["data", "label"], + reader_name="Reader", ) class Adapter: @@ -130,10 +130,10 @@ def set_epoch(epoch): def __len__(self): return len(self.iter) - + def __iter__(self): for data in self.iter: - x, y = data[0]['data'], data[0]['label'] + x, y = data[0]["data"], data[0]["label"] yield x, torch.squeeze(y, dim=1).type(torch.LongTensor) return Adapter(train_data) @@ -141,7 +141,7 @@ def __iter__(self): def pytorch_fakedataset(folder, batch_size, num_workers): train = FakeImageClassification((3, 224, 224), batch_size, 60) - + return torch.utils.data.DataLoader( train, batch_size=batch_size, @@ -152,7 +152,9 @@ def pytorch_fakedataset(folder, batch_size, num_workers): def image_transforms(): - normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + normalize = transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] + ) data_transforms = transforms.Compose( [ transforms.RandomResizedCrop(224), @@ -163,11 +165,9 @@ def image_transforms(): ) return data_transforms -def pytorch(folder, batch_size, num_workers, distributed=False): - train = datasets.ImageFolder( - folder, - image_transforms() - ) + +def pytorch(folder, batch_size, num_workers, distributed=False, epochs=60): + train = datasets.ImageFolder(folder, image_transforms()) kwargs = {"shuffle": True} if distributed: @@ -179,9 +179,7 @@ def pytorch(folder, batch_size, num_workers, distributed=False): # we reduce the standard deviation if False: kwargs["sampler"] = torch.utils.data.RandomSampler( - train, - replacement=True, - num_samples=len(train) * args.epochs + train, replacement=True, num_samples=len(train) * epochs ) kwargs["shuffle"] = False @@ -197,15 +195,13 @@ def pytorch(folder, batch_size, num_workers, distributed=False): def synthetic(model, batch_size, fixed_batch): return SyntheticData( tensors=generate_tensor_classification( - model, - batch_size, - (3, 244, 244), - device=accelerator.fetch_device(0) + model, batch_size, (3, 244, 244), device=accelerator.fetch_device(0) ), n=1000, fixed_batch=fixed_batch, ) + def synthetic_fixed(*args): return synthetic(*args, fixed_batch=True) @@ -216,20 +212,28 @@ def synthetic_random(*args): def dataloader_arguments(parser: argparse.ArgumentParser): parser.add_argument( - "--batch-size", type=int, default=16, + "--batch-size", + type=int, + default=16, help="input batch size for training (default: 16)", ) parser.add_argument( - "--loader", type=str, help="Dataloader implementation (dali, pytorch, synthetic_fixed, synthetic_random)", - default="pytorch" + "--loader", + type=str, + help="Dataloader implementation (dali, pytorch, synthetic_fixed, synthetic_random)", + default="pytorch", ) parser.add_argument( - "--num-workers", type=int, default=8, + "--num-workers", + type=int, + default=8, help="number of workers for data loading", ) - parser.add_argument( - "--data", type=str, default=os.environ.get("MILABENCH_DIR_DATA", None), - help="data directory" + parser.add_argument( + "--data", + type=str, + default=os.environ.get("MILABENCH_DIR_DATA", None), + help="data directory", ) @@ -243,24 +247,14 @@ def data_folder(args): def imagenet_dataloader(args, model, rank=0, world_size=1): if args.loader == "synthetic_random": - return synthetic( - model=model, - batch_size=args.batch_size, - fixed_batch=False - ) - + return synthetic(model=model, batch_size=args.batch_size, fixed_batch=False) + if args.loader == "synthetic_fixed": - return synthetic( - model=model, - batch_size=args.batch_size, - fixed_batch=True - ) - + return synthetic(model=model, batch_size=args.batch_size, fixed_batch=True) + if args.loader == "pytorch_fakedataset": return pytorch_fakedataset( - None, - batch_size=args.batch_size, - num_workers=args.num_workers + None, batch_size=args.batch_size, num_workers=args.num_workers ) folder = os.path.join(data_folder(args), "train") diff --git a/benchmate/benchmate/dataset.py b/benchmate/benchmate/dataset.py index 89ac90e4c..5983e7b99 100644 --- a/benchmate/benchmate/dataset.py +++ b/benchmate/benchmate/dataset.py @@ -1,4 +1,3 @@ - import os from collections import defaultdict @@ -11,6 +10,7 @@ def transform_images(transform_x, transform_y=no_transform): def _(args): print(args) return transform_x(args[0]), transform_y(args[1]) + return _ @@ -18,6 +18,7 @@ def transform_celebA(transform_x): def _(args): print(args) return transform_x(args["image"]) + return _ @@ -33,7 +34,6 @@ def __getitem__(self, item): return self.transforms(self.dataset[item]) - class ImageNetAsFrames: def __init__(self, folder) -> None: self.clip = defaultdict(list) @@ -42,9 +42,9 @@ def __init__(self, folder) -> None: video = self.clip[clip_id] for frame in files: video.append(frame) - + def __getitem__(self, item): return self.clip[item] - + def __len__(self): return len(self.clip) diff --git a/benchmate/benchmate/metrics.py b/benchmate/benchmate/metrics.py index 975443e23..b6ca483c7 100644 --- a/benchmate/benchmate/metrics.py +++ b/benchmate/benchmate/metrics.py @@ -58,14 +58,14 @@ def append(self, *args, **kwargs): def record(self, *args, **kwargs): """Record data for a future metric. - + No synchronization here. """ self.append(*args, **kwargs) def materialize(self, *args, **kwargs): """Transform raw data into a metric. - + Synchronization happens here. """ return *args, kwargs @@ -93,7 +93,7 @@ def __init__(self): self._end = None def start(self): - self._start = - time.time() + self._start = -time.time() def end(self): self._end = time.time() @@ -101,7 +101,7 @@ def end(self): def __enter__(self): self.start() return self - + def __exit__(self, *args): self.end() @@ -123,7 +123,7 @@ def end(self): def __enter__(self): self.start() return self - + def __exit__(self, *args): self.end() @@ -205,20 +205,20 @@ def __init__( raise_stop_program=False, batch_size_fn=None, ): - self.loader = loader # original iterator - self.events = [] # accumulated events to be pushed + self.loader = loader # original iterator + self.events = [] # accumulated events to be pushed - self.task = "train" # voir task usually train but could be validation/test - self.total_obs = 0 # Number of "pushed" observations - self.event_fn = event_fn # function to create a device event - self.early_stop = earlystop # Number of observation to target - self.unit = 1000 # device timer is ms - - self.message_push = push # How to push the metrics usually voir or stdout + self.task = "train" # voir task usually train but could be validation/test + self.total_obs = 0 # Number of "pushed" observations + self.event_fn = event_fn # function to create a device event + self.early_stop = earlystop # Number of observation to target + self.unit = 1000 # device timer is ms + + self.message_push = push # How to push the metrics usually voir or stdout # Number of times we broke out of the iterator for early stopping # we should really only do this onece - self.break_count = 0 + self.break_count = 0 self.batch_size_fn = batch_size_fn # Multi-GPU setup @@ -227,13 +227,15 @@ def __init__( self.world_size = 1 # Options - self.raise_stop_program = raise_stop_program # Does TimedIterator raise StopProgram + self.raise_stop_program = ( + raise_stop_program # Does TimedIterator raise StopProgram + ) self.profile_instrumentation = False self.overhead = [] self.previous_overhead = 0 self.loader_init_time = [] self.sub_overhead = 0 - + if not TORCH_ERROR and dist.is_initialized(): self.rank = rank assert ( @@ -261,7 +263,7 @@ def wrapped(self, iterator): start = self.event_fn(enable_timing=True) start.record() self.previous_overhead = 0 - + for data in iterator: yield data @@ -281,15 +283,15 @@ def wrapped(self, iterator): break start = end - + # Note: first step does not have overhead because end event is recorded # before the overhead starts # Note: It is not sure if the CPU overhead impacst the device at all # since we avoid sync it is possible the device is working during - # the overhead section and that the effective overhead ends up being minimal + # the overhead section and that the effective overhead ends up being minimal self.previous_overhead = ct.elapsed() self.overhead.append(self.previous_overhead) - + self._push() self.earlystop() @@ -335,7 +337,7 @@ def _push_time_steps(self): elapsed = (start.elapsed_time(end)) / self.unit rate = self.batch_size(bs) / elapsed self.log_rate(rate) - + self.total_obs += len(self.events) self.events = [] @@ -349,7 +351,7 @@ def _push_profile_metrics(self): self.previous_overhead = 0 self.overhead = [] self.loader_init_time = [] - + def _push(self): """Push all the accumulated metrics""" @@ -381,5 +383,3 @@ def log_progress(self): def message(self, **kwargs): if self.rank is None or self.rank == 0: self.message_push(**kwargs) - - diff --git a/benchmate/benchmate/monitor.py b/benchmate/benchmate/monitor.py index bb8e0a437..0d9e3d8b7 100644 --- a/benchmate/benchmate/monitor.py +++ b/benchmate/benchmate/monitor.py @@ -1,17 +1,16 @@ -import sys import json -import time -import sys import multiprocessing +import sys +import time - -from voir.smuggle import SmuggleWriter from voir.instruments.gpu import get_gpu_info from voir.instruments.utils import Monitor +from voir.smuggle import SmuggleWriter def milabench_sys_monitor(): data_file = SmuggleWriter(sys.stdout) + def mblog(data): if data_file is not None: print(json.dumps(data), file=data_file) @@ -32,8 +31,6 @@ def monitor_fn(): monitor.start() - - def _worker(state, queue, func, delay): while state["running"]: queue.put(func()) @@ -47,7 +44,8 @@ def __init__(self, delay, func): self.state["running"] = True self.results = multiprocessing.Queue() self.process = multiprocessing.Process( - target=_worker, args=(self.state, self.results, func, delay), + target=_worker, + args=(self.state, self.results, func, delay), ) def start(self): @@ -57,6 +55,7 @@ def stop(self): self.state["running"] = False self.process.join() + def setupvoir(): # wtf this do data_file = SmuggleWriter(sys.stdout) @@ -65,7 +64,10 @@ def setupvoir(): def monitor_fn(): data = { gpu["device"]: { - "memory": [gpu["memory"]["used"], gpu["memory"]["total"],], + "memory": [ + gpu["memory"]["used"], + gpu["memory"]["total"], + ], "load": gpu["utilization"]["compute"], "temperature": gpu["temperature"], "power": gpu["power"], diff --git a/benchmate/benchmate/observer.py b/benchmate/benchmate/observer.py index e00b1fec4..5ead66a5b 100644 --- a/benchmate/benchmate/observer.py +++ b/benchmate/benchmate/observer.py @@ -1,6 +1,6 @@ from voir.helpers import current_overseer -from .metrics import LazyLossPusher, give_push, sumggle_push, TimedIterator +from .metrics import LazyLossPusher, TimedIterator, give_push, sumggle_push class BenchObserver: @@ -33,7 +33,7 @@ def __init__( self.stdout = stdout self.task = "train" self.losses = LazyLossPusher(self.task) - + self.pusher = give_push() if self.stdout: self.pusher = sumggle_push() @@ -41,26 +41,29 @@ def __init__( def on_iterator_stop_iterator(self): """Called when the timed iterator stops, used to do extra work when timers are off""" self.losses.push(self.pusher) - + def record_loss(self, loss): self.losses.record(loss) return loss - + def override_return_value(self, function, override): import ptera + refstring = ptera.refstring(function) - + ov = current_overseer.get() if ov is not None: probe = ov.probe(f"{refstring}() as retval", overridable=True) - probe['retval'].override(override) + probe["retval"].override(override) else: raise RuntimeError("Not running through voir") - + def loader(self, loader): """Wrap a dataloader or an iterable which enable accurate measuring of time spent in the loop's body""" - self.wrapped = TimedIterator(loader, *self.args, push=self.pusher, **self.kwargs) + self.wrapped = TimedIterator( + loader, *self.args, push=self.pusher, **self.kwargs + ) self.wrapped.task = self.task self.wrapped.on_iterator_stop_iterator = self.on_iterator_stop_iterator return self.wrapped diff --git a/milabench/cli/__init__.py b/milabench/cli/__init__.py index be7341e01..b6cec01d9 100644 --- a/milabench/cli/__init__.py +++ b/milabench/cli/__init__.py @@ -7,6 +7,7 @@ from .dev import cli_dev from .install import cli_install from .machine import cli_machine +from .matrix import cli_matrix_run from .pin import cli_pin from .pip import cli_pip from .pr import cli_write_report_to_pr @@ -18,7 +19,6 @@ from .slurm import cli_slurm_system from .sql import cli_sqlsetup from .summary import cli_summary -from .matrix import cli_matrix_run class Main: diff --git a/milabench/cli/compare.py b/milabench/cli/compare.py index 0edaf3d72..59da8e255 100644 --- a/milabench/cli/compare.py +++ b/milabench/cli/compare.py @@ -3,7 +3,7 @@ from coleo import Option, tooled -from ..common import Option, _read_reports +from ..common import _read_reports from ..compare import compare, fetch_runs from ..summary import make_summary diff --git a/milabench/cli/dev.py b/milabench/cli/dev.py index ef493cd80..8be86cbff 100644 --- a/milabench/cli/dev.py +++ b/milabench/cli/dev.py @@ -5,7 +5,7 @@ from coleo import Option, tooled -from ..common import Option, get_multipack, selection_keys +from ..common import get_multipack, selection_keys # fmt: off diff --git a/milabench/cli/install.py b/milabench/cli/install.py index 03eb429e5..00977aea3 100644 --- a/milabench/cli/install.py +++ b/milabench/cli/install.py @@ -4,7 +4,7 @@ from milabench.utils import validation_layers -from ..common import Option, get_multipack, run_with_loggers +from ..common import get_multipack, run_with_loggers from ..log import DataReporter, TerminalFormatter, TextReporter diff --git a/milabench/cli/matrix.py b/milabench/cli/matrix.py index 9248aeb1b..732db2b2b 100644 --- a/milabench/cli/matrix.py +++ b/milabench/cli/matrix.py @@ -1,10 +1,15 @@ from dataclasses import dataclass from coleo import Option, tooled -import yaml -import sys -from ..common import deduce_arch, build_config, build_system_config, get_base_defaults, merge, is_selected +from ..common import ( + build_config, + build_system_config, + deduce_arch, + get_base_defaults, + is_selected, + merge, +) # fmt: off @@ -33,19 +38,18 @@ def arguments(): return Arguments(base, system, config, select, exclude) - def clean_config(config, args): disabled_benches = [] if args.select: args.select = set(args.select.split(",")) - if args.exclude: + if args.exclude: args.exclude = set(args.exclude.split(",")) for benchname, benchconfig in config.items(): - if 'system' in benchconfig: - del benchconfig['system'] + if "system" in benchconfig: + del benchconfig["system"] if not is_selected(benchconfig, args): disabled_benches.append(benchname) @@ -63,16 +67,10 @@ def cli_matrix_run(args=None): arch = deduce_arch() - base_defaults = get_base_defaults( - base=args.base, - arch=arch, - run_name='matrix' - ) + base_defaults = get_base_defaults(base=args.base, arch=arch, run_name="matrix") system_config = build_system_config( - args.system, - defaults={"system": base_defaults["_defaults"]["system"]}, - gpu=True + args.system, defaults={"system": base_defaults["_defaults"]["system"]}, gpu=True ) overrides = merge({"*": system_config}, overrides) @@ -81,8 +79,7 @@ def cli_matrix_run(args=None): clean_config(config, args) - for k in config: print(k) - # yaml.dump(config, sys.stdout) \ No newline at end of file + # yaml.dump(config, sys.stdout) diff --git a/milabench/cli/pin.py b/milabench/cli/pin.py index c38b2514f..de4264f38 100644 --- a/milabench/cli/pin.py +++ b/milabench/cli/pin.py @@ -5,7 +5,7 @@ from coleo import Option, tooled -from ..common import Option, get_multipack, run_with_loggers +from ..common import get_multipack, run_with_loggers from ..log import TerminalFormatter, TextReporter diff --git a/milabench/cli/pip.py b/milabench/cli/pip.py index 923fb5763..c5ba0c1d9 100644 --- a/milabench/cli/pip.py +++ b/milabench/cli/pip.py @@ -2,7 +2,7 @@ from coleo import Option, tooled -from ..common import Option, get_multipack, run_sync +from ..common import get_multipack, run_sync # fmt: off diff --git a/milabench/cli/pr.py b/milabench/cli/pr.py index 5856a1b80..4a8403dcc 100644 --- a/milabench/cli/pr.py +++ b/milabench/cli/pr.py @@ -3,7 +3,7 @@ from coleo import Option, tooled -from ..common import Option, _short_make_report +from ..common import _short_make_report from ..schedule import post_comment_on_pr diff --git a/milabench/cli/prepare.py b/milabench/cli/prepare.py index 0557ea368..8f1ad4fc7 100644 --- a/milabench/cli/prepare.py +++ b/milabench/cli/prepare.py @@ -4,7 +4,7 @@ from milabench.utils import validation_layers -from ..common import Option, get_multipack, run_with_loggers +from ..common import get_multipack, run_with_loggers from ..log import DataReporter, TerminalFormatter, TextReporter diff --git a/milabench/cli/report.py b/milabench/cli/report.py index cbad44223..cd1a1ca12 100644 --- a/milabench/cli/report.py +++ b/milabench/cli/report.py @@ -4,7 +4,7 @@ from coleo import Option, config as configuration, tooled -from ..common import Option, _error_report, _get_multipack, _read_reports +from ..common import _error_report, _get_multipack, _read_reports from ..report import make_report from ..summary import make_summary diff --git a/milabench/cli/run.py b/milabench/cli/run.py index 6d42a11f7..96e81eeb8 100644 --- a/milabench/cli/run.py +++ b/milabench/cli/run.py @@ -6,7 +6,6 @@ from milabench.utils import validation_layers from ..common import ( - Option, _error_report, _read_reports, get_multipack, diff --git a/milabench/cli/summary.py b/milabench/cli/summary.py index 22823c1b2..3a5b9a83c 100644 --- a/milabench/cli/summary.py +++ b/milabench/cli/summary.py @@ -3,7 +3,7 @@ from coleo import Option, tooled -from ..common import Option, _read_reports +from ..common import _read_reports from ..summary import make_summary diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 45b481a66..6d018b4c8 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -451,8 +451,10 @@ def __init__(self, executor: SingleCmdCommand, *torchrun_argv, **kwargs) -> None # Some vendors force us to have weird venv that can resolve weirdly # use absolute paths to avoid issues - binfolder = executor.pack.config['dirs']['venv'] - super().__init__(executor, f"{binfolder}/bin/torchrun", *torchrun_argv, **kwargs) + binfolder = executor.pack.config["dirs"]["venv"] + super().__init__( + executor, f"{binfolder}/bin/torchrun", *torchrun_argv, **kwargs + ) def _argv(self, **kwargs): devices = self.pack.config.get("devices", []) @@ -480,8 +482,10 @@ class VoirCommand(WrapperCommand): def __init__(self, executor: SingleCmdCommand, *voir_argv, **kwargs) -> None: # Some vendors force us to have weird venv that can resolve weirdly # use absolute paths to avoid issues - binfolder = executor.pack.config['dirs']['venv'] - super().__init__(executor, f"{binfolder}/bin/voir", **{"setsid": True, **kwargs}) + binfolder = executor.pack.config["dirs"]["venv"] + super().__init__( + executor, f"{binfolder}/bin/voir", **{"setsid": True, **kwargs} + ) self.voir_argv = voir_argv def _argv(self, **kwargs) -> List: @@ -650,7 +654,6 @@ def _argv(self, **_) -> List: "--use_deepspeed", "--deepspeed_multinode_launcher=standard", "--zero_stage=2", - ] if self.pack.config["use_deepspeed"] else ["--multi_gpu"] diff --git a/milabench/common.py b/milabench/common.py index 177e2d2bd..4babb8bbc 100644 --- a/milabench/common.py +++ b/milabench/common.py @@ -347,11 +347,11 @@ def validation_names(layers): return all_layers results = set(["error", "ensure_rate", "version"]) - for l in layers: - if l in all_layers: - results.add(l) - elif l != "": - print(f"Layer {l} does not exist") + for layer in layers: + if layer in all_layers: + results.add(layer) + elif layer != "": + print(f"Layer {layer} does not exist") return results diff --git a/milabench/config.py b/milabench/config.py index b0cfec784..e082ead77 100644 --- a/milabench/config.py +++ b/milabench/config.py @@ -1,7 +1,7 @@ import contextvars -from copy import deepcopy import os import socket +from copy import deepcopy import psutil import yaml @@ -74,7 +74,7 @@ def finalize_config(name, bench_config): def combine_args(args, kwargs): if len(args) == 0: - yield kwargs + yield kwargs else: key, values = args.popitem() for value in values: @@ -83,25 +83,25 @@ def combine_args(args, kwargs): def expand_matrix(name, bench_config): - if 'matrix' not in bench_config: + if "matrix" not in bench_config: return [(name, bench_config)] - arguments = deepcopy(bench_config['matrix']) - template = bench_config['job'] - + arguments = deepcopy(bench_config["matrix"]) + template = bench_config["job"] + newbenches = [] for matrix_args in combine_args(arguments, dict()): newbench = deepcopy(template) - name = newbench.pop('name').format(**matrix_args) + name = newbench.pop("name").format(**matrix_args) - for karg, varg in template['argv'].items(): + for karg, varg in template["argv"].items(): try: varg = varg.format(**matrix_args) except: pass - newbench['argv'][karg] = varg - + newbench["argv"][karg] = varg + newbenches.append((name, newbench)) return newbenches @@ -127,13 +127,13 @@ def build_config(*config_files): all_configs = merge(all_configs, layer) all_configs = build_matrix_bench(all_configs) - + for name, bench_config in all_configs.items(): all_configs[name] = resolve_inheritance(bench_config, all_configs) - + for name, bench_config in all_configs.items(): all_configs[name] = finalize_config(name, bench_config) - + config_global.set(all_configs) return all_configs @@ -241,6 +241,7 @@ def get_gpu_capacity(strict=False): raise return 0 + def is_autoscale_enabled(): return ( os.getenv("MILABENCH_SIZER_AUTO", False) diff --git a/milabench/multi.py b/milabench/multi.py index 4ea76c701..292605289 100644 --- a/milabench/multi.py +++ b/milabench/multi.py @@ -2,7 +2,6 @@ import traceback from collections import defaultdict from copy import deepcopy -import os from voir.instruments.gpu import get_gpu_info @@ -229,7 +228,9 @@ async def do_pin( pindir = here.parent / XPath(".pin") constraint_path = pindir / "tmp-constraints.txt" - constraint_files = make_constraints_file(constraint_path, constraints, str(here.parent)) + constraint_files = make_constraints_file( + constraint_path, constraints, str(here.parent) + ) ig_constraint_path = pindir / f"constraints-{ivar}-{ig}.txt" if ig_constraint_path.exists() and from_scratch: @@ -240,7 +241,7 @@ async def do_pin( requirements_file=ig_constraint_path.absolute(), input_files=(*constraint_files, *reqs), argv=pip_compile_args, - working_dir=here.parent + working_dir=here.parent, ) if not ig_constraint_path.exists(): @@ -252,5 +253,5 @@ async def do_pin( await pack.pin( pip_compile_args=pip_compile_args, constraints=new_constraints, - working_dir=here.parent + working_dir=here.parent, ) diff --git a/milabench/pack.py b/milabench/pack.py index df5971556..7678fc72c 100644 --- a/milabench/pack.py +++ b/milabench/pack.py @@ -28,8 +28,8 @@ class defines good default behavior. def is_editable_install(): - import subprocess import json + import subprocess try: output = subprocess.check_output(["pip", "list", "-e", "--format", "json"]) @@ -116,7 +116,7 @@ def __init__(self, config, core=None): self.config = config self.phase = None self.processes = [] - + def copy(self, config): return type(self)(config=merge(self.config, config)) @@ -382,14 +382,14 @@ async def install(self): if is_editable_install(): await install_benchmate(self) - + async def pin( self, clear_previous: bool = True, pip_compile_args: Sequence = tuple(), input_files: Sequence = tuple(), constraints: Sequence = tuple(), - working_dir=None + working_dir=None, ): """Pin versions to requirements file. @@ -400,10 +400,10 @@ async def pin( constraint: The constraint file """ ivar = self.config.get("install_variant", None) - + if ivar == "unpinned": raise Exception("Cannot pin the 'unpinned' variant.") - + # assert self.phase == "pin" for base_reqs, reqs in self.requirements_map().items(): if not base_reqs.exists(): @@ -417,11 +417,16 @@ async def pin( grp = self.config["group"] constraint_path = XPath(".pin") / f"tmp-constraints-{ivar}-{grp}.txt" - constraint_files = make_constraints_file(constraint_path, constraints, working_dir) + constraint_files = make_constraints_file( + constraint_path, constraints, working_dir + ) current_input_files = constraint_files + (base_reqs, *input_files) await self.exec_pip_compile( - reqs, current_input_files, argv=pip_compile_args, working_dir=working_dir + reqs, + current_input_files, + argv=pip_compile_args, + working_dir=working_dir, ) # Add previous requirements as inputs @@ -439,8 +444,10 @@ async def exec_pip_compile( "-m", "piptools", "compile", - "--resolver", "backtracking", - "--output-file", relativize(requirements_file, working_dir), + "--resolver", + "backtracking", + "--output-file", + relativize(requirements_file, working_dir), *argv, *input_files, cwd=working_dir, diff --git a/milabench/report.py b/milabench/report.py index 5eacdad27..8488482e4 100644 --- a/milabench/report.py +++ b/milabench/report.py @@ -47,7 +47,7 @@ def _make_row(summary, compare, weights): acc += metrics[metric] else: acc = row["perf"] - + success_ratio = 1 - row["fail"] / row["n"] score = (acc if acc > 0 else row["perf"]) * success_ratio @@ -91,11 +91,11 @@ def __hrepr__(self, H, hrepr): return tb def __str__(self): - l = max(map(len, self.fields.keys())) + 2 + length = max(map(len, self.fields.keys())) + 2 lines = [] for k, v in self.fields.items(): v = f"{v:10.2f}" if isinstance(v, float) else v - lines.append(f"{k + ':':{l}} {v}") + lines.append(f"{k + ':':{length}} {v}") return "\n".join(lines) @@ -199,7 +199,7 @@ def make_dataframe(summary, compare=None, weights=None): ) ) - return DataFrame( + df = DataFrame( { key: _make_row( summary.get(key, {}), @@ -209,30 +209,12 @@ def make_dataframe(summary, compare=None, weights=None): for key in all_keys } ).transpose() - - return df - - -@error_guard({}) -def make_report( - summary, - compare=None, - html=None, - compare_gpus=False, - price=None, - title=None, - sources=None, - errdata=None, - weights=None, -): - if weights is None: - weights = dict() - - df = make_dataframe(summary, compare, weights) # Reorder columns df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 0))] + return df + @error_guard({}) def make_report( @@ -252,9 +234,6 @@ def make_report( df = make_dataframe(summary, compare, weights) - # Reorder columns - df = df[sorted(df.columns, key=lambda k: columns_order.get(k, 0))] - out = Outputter(stdout=stream, html=html) if sources: @@ -327,6 +306,7 @@ def pandas_to_string(df, formatters): to_csv does not align the output. """ from collections import defaultdict + columns = df.columns.tolist() sep = " | " @@ -334,7 +314,7 @@ def pandas_to_string(df, formatters): col_size = defaultdict(int) for index, row in df.iterrows(): - line = [f'{index:<30}'] + line = [f"{index:<30}"] for col, val in zip(columns, row): fmt = formatters.get(col) val = fmt(val) diff --git a/milabench/sizer.py b/milabench/sizer.py index a156c4200..7d2e56194 100644 --- a/milabench/sizer.py +++ b/milabench/sizer.py @@ -1,8 +1,8 @@ import contextvars +import multiprocessing import os from copy import deepcopy from dataclasses import dataclass -import multiprocessing import numpy as np import yaml @@ -108,11 +108,11 @@ def auto_size(self, benchmark, capacity): config = self.benchscaling(benchmark) model = config.get("model", None) - + if model is None: print(f"Missing batch-size model for {benchmark}") return 1 - + data = list(sorted(config["model"].items(), key=lambda x: x[0])) mem = [to_octet(v[1]) for v in data] size = [float(v[0]) for v in data] @@ -281,13 +281,7 @@ def resolve_argv(pack, argv): device_count = len(pack.config.get("devices", [0])) - ccl = { - "hpu": "hccl", - "cuda": "nccl", - "rocm": "rccl", - "xpu": "ccl", - "cpu": "gloo" - } + ccl = {"hpu": "hccl", "cuda": "nccl", "rocm": "rccl", "xpu": "ccl", "cpu": "gloo"} if device_count <= 0: device_count = 1 @@ -296,11 +290,11 @@ def resolve_argv(pack, argv): context["ccl"] = ccl.get(arch, "gloo") context["cpu_count"] = multiprocessing.cpu_count() context["cpu_per_gpu"] = multiprocessing.cpu_count() // device_count - + context["milabench_data"] = pack.config.get("dirs", {}).get("data", None) context["milabench_cache"] = pack.config.get("dirs", {}).get("cache", None) context["milabench_extra"] = pack.config.get("dirs", {}).get("extra", None) - + max_worker = 16 context["n_worker"] = min(context["cpu_per_gpu"], max_worker) @@ -308,4 +302,4 @@ def resolve_argv(pack, argv): for i, arg in enumerate(argv): argv[i] = str(arg).format(**context) - return argv \ No newline at end of file + return argv diff --git a/milabench/utils.py b/milabench/utils.py index 59f294744..f659764c8 100644 --- a/milabench/utils.py +++ b/milabench/utils.py @@ -121,7 +121,9 @@ def make_constraints_file(pth, constraints, working_dir): with open(constraint_file, "w") as tfile: # We prefix the constraint with ../ because we are creating a constraint # file in ./.pin/,but containing constraints with paths relative to ./ - tfile.write("\n".join([f"-c ../{relativize(c, working_dir)}" for c in constraints])) + tfile.write( + "\n".join([f"-c ../{relativize(c, working_dir)}" for c in constraints]) + ) return (constraint_file,) else: return ()