From 69baffed05bc047a99de2fd2e604a11c56d9dc93 Mon Sep 17 00:00:00 2001 From: "pierre.delaunay" Date: Tue, 11 Jun 2024 12:25:51 -0400 Subject: [PATCH] Use benchmate TimedIterator and BenchObserver instead --- benchmarks/accelerate_opt/main.py | 47 +-- benchmarks/flops/main.py | 60 +--- benchmarks/huggingface/bench/__main__.py | 8 +- benchmarks/lightning/README.md | 16 + benchmarks/lightning/benchfile.py | 10 + benchmarks/lightning/main.py | 0 benchmarks/lightning/prepare.py | 2 + benchmarks/lightning/requirements.in | 5 + benchmarks/lightning/voirfile.py | 0 benchmarks/llama/main.py | 66 +--- benchmarks/stargan/stargan/solver.py | 9 +- benchmarks/stargan/voirfile.py | 4 +- benchmarks/super-slomo/slomo/train.py | 8 +- benchmarks/timm/voirfile.py | 10 +- benchmarks/torchvision/main.py | 7 +- benchmarks/torchvision/voirfile.py | 20 +- benchmarks/torchvision_ddp/main.py | 14 +- benchmate/benchmate/common.py | 92 ++++++ benchmate/benchmate/metrics.py | 375 +++++++++++++++++++++++ benchmate/benchmate/observer.py | 98 ++++++ 20 files changed, 646 insertions(+), 205 deletions(-) create mode 100644 benchmarks/lightning/README.md create mode 100644 benchmarks/lightning/benchfile.py create mode 100644 benchmarks/lightning/main.py create mode 100755 benchmarks/lightning/prepare.py create mode 100644 benchmarks/lightning/requirements.in create mode 100644 benchmarks/lightning/voirfile.py create mode 100644 benchmate/benchmate/common.py create mode 100644 benchmate/benchmate/metrics.py create mode 100644 benchmate/benchmate/observer.py diff --git a/benchmarks/accelerate_opt/main.py b/benchmarks/accelerate_opt/main.py index 41c30b321..dc38a147c 100644 --- a/benchmarks/accelerate_opt/main.py +++ b/benchmarks/accelerate_opt/main.py @@ -88,9 +88,7 @@ def arguments(): default_data_collator, get_scheduler, ) -from voir.smuggle import SmuggleWriter -from voir.instruments.gpu import get_gpu_info -from voir.instruments.utils import Monitor +from benchmate.observer import BenchObserver logger = get_logger(__name__) @@ -145,35 +143,11 @@ class CustomInitProcessGroupKwargs(InitProcessGroupKwargs): else: accelerator = Accelerator() + # Set up logging for milabench (only in the run phase, for the main process) + monitor = None if not is_prepare_phase and accelerator.is_main_process: - # Set up logging for milabench (only in the run phase, for the main process) - - data_file = SmuggleWriter(sys.stdout) - def mblog(data): - if data_file is not None: - print(json.dumps(data), file=data_file) - - def monitor_fn(): - data = { - gpu["device"]: { - "memory": [gpu["memory"]["used"], gpu["memory"]["total"]], - "load": gpu["utilization"]["compute"], - "temperature": gpu["temperature"], - } - for gpu in get_gpu_info()["gpus"].values() - } - mblog({"task": "main", "gpudata": data}) - - monitor_fn() - monitor = Monitor(3, monitor_fn) - monitor.start() - - else: - - def mblog(data): - pass - - monitor = None + from benchmate.common import opt_voir + monitor = opt_voir() logging.basicConfig( level=logging.INFO, @@ -374,7 +348,6 @@ def group_texts(examples): total_batch_size = ( per_gpu_batch_size * accelerator.num_processes * gradient_accumulation_steps ) - print("HERE", per_gpu_batch_size, total_batch_size) logger.info("***** Running training *****") logger.info(f" Num examples = {len(train_dataset)}") @@ -388,10 +361,8 @@ def group_texts(examples): completed_steps = 0 starting_epoch = 0 - last_log_time = time.time() - from voir.wrapper import Wrapper - wrapper = Wrapper( + observer = BenchObserver( event_fn=acc.Event, earlystop=30, rank=int(os.environ["RANK"]), @@ -399,7 +370,7 @@ def group_texts(examples): stdout=True, batch_size_fn=lambda batch: batch["labels"].shape[0] ) - loader = wrapper.loader(train_dataloader) + loader = observer.loader(train_dataloader) for epoch in range(starting_epoch, num_train_epochs): model.train() @@ -407,9 +378,9 @@ def group_texts(examples): outputs = model(**batch) loss = outputs.loss loss = loss / gradient_accumulation_steps + if accelerator.is_main_process: - loader.add_loss(loss) - # mblog({"task": "train", "loss": loss.detach().item()}) + observer.record_loss(loss) accelerator.backward(loss) diff --git a/benchmarks/flops/main.py b/benchmarks/flops/main.py index 04db361aa..9520b5c7e 100755 --- a/benchmarks/flops/main.py +++ b/benchmarks/flops/main.py @@ -1,17 +1,12 @@ #!/usr/bin/env python from argparse import ArgumentParser -import json import time -import sys -import multiprocessing import torch import torchcompat.core as accelerator -from voir.smuggle import SmuggleWriter -from voir.instruments.gpu import get_gpu_info -from voir.instruments.utils import Monitor +from benchmate.common import setupvoir KILO = 1e3 MEGA = 1e6 @@ -31,31 +26,6 @@ def synchronize(): accelerator.synchronize() -def _worker(state, queue, func, delay): - import time - - while state["running"]: - queue.put(func()) - time.sleep(delay) - - -class Monitor: - def __init__(self, delay, func): - self.manager = multiprocessing.Manager() - self.state = self.manager.dict() - self.state["running"] = True - self.results = multiprocessing.Queue() - self.process = multiprocessing.Process( - target=_worker, args=(self.state, self.results, func, delay), - ) - - def start(self): - self.process.start() - - def stop(self): - self.state["running"] = False - self.process.join() - def modelflops( model: torch.nn.Module, shape, repeat=10, dtype=torch.float32, unit=TERA @@ -123,34 +93,6 @@ def f(N, R=30, m=5000000, n=256, unit=TERA, dtype=torch.float32, log=None): empty_cache() -def setupvoir(): - # wtf this do - data_file = SmuggleWriter(sys.stdout) - # data_file = sys.stdout - - def log(data): - if data_file is not None: - data["t"] = time.time() - print(json.dumps(data), file=data_file) - - while not monitor.results.empty(): - print(json.dumps(monitor.results.get()), file=data_file) - - def monitor_fn(): - data = { - gpu["device"]: { - "memory": [gpu["memory"]["used"], gpu["memory"]["total"],], - "load": gpu["utilization"]["compute"], - "temperature": gpu["temperature"], - "power": gpu["power"], - } - for gpu in get_gpu_info()["gpus"].values() - } - return {"task": "main", "gpudata": data, "t": time.time()} - - monitor = Monitor(0.5, monitor_fn) - monitor.start() - return log, monitor def main(): diff --git a/benchmarks/huggingface/bench/__main__.py b/benchmarks/huggingface/bench/__main__.py index 2cb13ff43..e19d3f9a2 100644 --- a/benchmarks/huggingface/bench/__main__.py +++ b/benchmarks/huggingface/bench/__main__.py @@ -7,7 +7,7 @@ import torchcompat.core as accelerator import transformers -from voir.wrapper import Wrapper +from benchmate.observer import BenchObserver from .models import models from .synth import SyntheticData, generators @@ -102,17 +102,17 @@ def batch_size(bs): print(list(bs.keys())) raise RuntimeError("Batch size unknown") - wrapper = Wrapper( + observer = BenchObserver( event_fn=accelerator.Event, batch_size_fn=batch_size ) - loader = wrapper.loader(self.loader) + loader = observer.loader(self.loader) for data in loader: data = {k: v.to(self.device) for k, v in data.items()} loss = self.step(data) - loader.add_loss(loss) + observer.record_loss(loss) def parser(): diff --git a/benchmarks/lightning/README.md b/benchmarks/lightning/README.md new file mode 100644 index 000000000..38acf5c1f --- /dev/null +++ b/benchmarks/lightning/README.md @@ -0,0 +1,16 @@ + +# Torchvision + +Benchmark torchvision models on fake ImageNet data. + +## prepare + +Generates 1000 training samples in `$MILABENCH_BASE/data/FakeImageNet`, to be read during training. + +## run + +Any of the following models can be used with `--model`: + +* resnet18 +* resnet50 +* ... diff --git a/benchmarks/lightning/benchfile.py b/benchmarks/lightning/benchfile.py new file mode 100644 index 000000000..13d6aa0dd --- /dev/null +++ b/benchmarks/lightning/benchfile.py @@ -0,0 +1,10 @@ +from milabench.pack import Package + + +class LightningBenchmark(Package): + base_requirements = "requirements.in" + prepare_script = "prepare.py" + main_script = "main.py" + + +__pack__ = LightningBenchmark diff --git a/benchmarks/lightning/main.py b/benchmarks/lightning/main.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarks/lightning/prepare.py b/benchmarks/lightning/prepare.py new file mode 100755 index 000000000..cf529d79a --- /dev/null +++ b/benchmarks/lightning/prepare.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python + diff --git a/benchmarks/lightning/requirements.in b/benchmarks/lightning/requirements.in new file mode 100644 index 000000000..62e7cd92c --- /dev/null +++ b/benchmarks/lightning/requirements.in @@ -0,0 +1,5 @@ +torch +torchvision +torchcompat +lightning +voir \ No newline at end of file diff --git a/benchmarks/lightning/voirfile.py b/benchmarks/lightning/voirfile.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarks/llama/main.py b/benchmarks/llama/main.py index 40d5b6bc2..bc85c056e 100755 --- a/benchmarks/llama/main.py +++ b/benchmarks/llama/main.py @@ -5,12 +5,10 @@ import argparse import time import sys -import multiprocessing import torch -from voir.smuggle import SmuggleWriter -from voir.instruments.gpu import get_gpu_info +from benchmate.common import setupvoir import torchcompat.core as accelerator root = os.path.dirname(__file__) @@ -28,66 +26,6 @@ def available_models(): return models -def _worker(state, queue, func, delay): - import time - - while state["running"]: - queue.put(func()) - time.sleep(delay) - - -class Monitor: - def __init__(self, delay, func): - self.manager = multiprocessing.Manager() - self.state = self.manager.dict() - self.state["running"] = True - self.results = multiprocessing.Queue() - self.process = multiprocessing.Process( - target=_worker, - args=(self.state, self.results, func, delay), - ) - - def start(self): - self.process.start() - - def stop(self): - self.state["running"] = False - self.process.join() - - -def setupvoir(): - # wtf this do - data_file = SmuggleWriter(sys.stdout) - # data_file = sys.stdout - - def log(data): - if data_file is not None: - data["t"] = time.time() - print(json.dumps(data), file=data_file) - - while not monitor.results.empty(): - print(json.dumps(monitor.results.get()), file=data_file) - - def monitor_fn(): - data = { - gpu["device"]: { - "memory": [ - gpu["memory"]["used"], - gpu["memory"]["total"], - ], - "load": gpu["utilization"]["compute"], - "temperature": gpu["temperature"], - "power": gpu["power"], - } - for gpu in get_gpu_info()["gpus"].values() - } - return {"task": "main", "gpudata": data, "t": time.time()} - - monitor = Monitor(0.5, monitor_fn) - monitor.start() - return log, monitor - - class WrappedTokenizer: def __init__(self, tokenizer): self.tokenizer = tokenizer @@ -121,9 +59,7 @@ def huggingface_main(args, model, config): import transformers from transformers import LlamaForCausalLM, LlamaTokenizerFast from transformers.models.llama.configuration_llama import LlamaConfig - from voir.wrapper import DataloaderWrapper, Wrapper from datasets import load_dataset - import optimum.habana # Dataset here println("Dataset") diff --git a/benchmarks/stargan/stargan/solver.py b/benchmarks/stargan/stargan/solver.py index c4fa364b7..08ccf6bae 100644 --- a/benchmarks/stargan/stargan/solver.py +++ b/benchmarks/stargan/stargan/solver.py @@ -10,8 +10,7 @@ import os import time import datetime -from giving import give -import voir.wrapper +from benchmate.observer import BenchObserver class Solver(object): @@ -227,11 +226,11 @@ def train(self): data_loader = self.synth_loader # Fetch fixed inputs for debugging. - wrapper = voir.wrapper.Wrapper( + observer = BenchObserver( event_fn=accelerator.Event, batch_size_fn=lambda x: len(x[0]) ) - loader = wrapper.loader(data_loader) + loader = observer.loader(data_loader) data_iter = iter(loader) x_fixed, c_org = next(data_iter) @@ -316,7 +315,7 @@ def train(self): + self.lambda_gp * d_loss_gp ) # give(task="train", loss=d_loss.item()) - loader.add_loss(d_loss) + observer.record_loss(d_loss.detach()) self.reset_grad() d_loss.backward() diff --git a/benchmarks/stargan/voirfile.py b/benchmarks/stargan/voirfile.py index 715625335..9e35648db 100644 --- a/benchmarks/stargan/voirfile.py +++ b/benchmarks/stargan/voirfile.py @@ -27,8 +27,8 @@ class Config: @configurable def instrument_main(ov, options: Config): - import torchcompat.core as accelerator - from voir.wrapper import earlystop_count + # import torchcompat.core as accelerator + # from benchmate.observer import BenchObserver yield ov.phases.init diff --git a/benchmarks/super-slomo/slomo/train.py b/benchmarks/super-slomo/slomo/train.py index c4a4493fa..8ec3bddac 100644 --- a/benchmarks/super-slomo/slomo/train.py +++ b/benchmarks/super-slomo/slomo/train.py @@ -11,7 +11,7 @@ import model from giving import give -import voir.wrapper +from benchmate.observer import BenchObserver from synth import SyntheticData @@ -179,12 +179,12 @@ def get_lr(optimizer): print(device) - wrapper = voir.wrapper.Wrapper( + observer = BenchObserver( event_fn=accelerator.Event, earlystop=60, batch_size_fn=lambda batch: batch[1].shape[0] ) - loader = wrapper.loader(trainloader) + loader = observer.loader(trainloader) ### Main training loop for epoch in range(dict1["epoch"] + 1, args.epochs): @@ -275,7 +275,7 @@ def get_lr(optimizer): # since the loss in paper is calculated for input pixels in range 0-255 # and the input to our network is in range 0-1 loss = 204 * recnLoss + 102 * warpLoss + 0.005 * prcpLoss + loss_smooth - loader.add_loss(loss) + observer.record_loss(loss.detach()) # Backpropagate loss.backward() diff --git a/benchmarks/timm/voirfile.py b/benchmarks/timm/voirfile.py index 975446ba7..1f25e1b48 100644 --- a/benchmarks/timm/voirfile.py +++ b/benchmarks/timm/voirfile.py @@ -32,12 +32,12 @@ def instrument_main(ov, options: Config): import os import torchcompat.core as accelerator - from voir.wrapper import DataloaderWrapper, Wrapper + from benchmate.observer import BenchObserver from timm.utils.distributed import is_global_primary from timm.data import create_loader - wrapper = Wrapper( + observer = BenchObserver( accelerator.Event, earlystop=options.stop + options.skip, rank=int(os.getenv("RANK", 0)), @@ -48,13 +48,13 @@ def instrument_main(ov, options: Config): ) probe = ov.probe("/timm.data.loader/create_loader() as loader", overridable=True) - probe['loader'].override(wrapper.loader) + probe['loader'].override(observer.loader) probe = ov.probe("//train_one_epoch > loss_fn", overridable=True) - probe['loss_fn'].override(wrapper.criterion) + probe['loss_fn'].override(observer.criterion) probe = ov.probe("//train_one_epoch > optimizer", overridable=True) - probe['optimizer'].override(wrapper.optimizer) + probe['optimizer'].override(observer.optimizer) # Do not save checkpoints probe = ov.probe("//main > saver", overridable=True) diff --git a/benchmarks/torchvision/main.py b/benchmarks/torchvision/main.py index 3e6dfe8f7..4d35a6cfe 100644 --- a/benchmarks/torchvision/main.py +++ b/benchmarks/torchvision/main.py @@ -1,6 +1,5 @@ import argparse import contextlib -import os import torch import torch.cuda.amp @@ -9,8 +8,7 @@ import torchcompat.core as accelerator from benchmate.dataloader import imagenet_dataloader, dataloader_arguments - -from voir.wrapper import StopProgram +from benchmate.metrics import StopProgram def is_tf32_allowed(args): @@ -186,8 +184,7 @@ def train_epoch(args, model, criterion, optimizer, loader, device, dtype, scaler with scaling(scaler is not None, dtype): output = model(inp) loss = criterion(output, target) - loader.add_loss(loss) - + scaler.scale(loss).backward() accelerator.mark_step() diff --git a/benchmarks/torchvision/voirfile.py b/benchmarks/torchvision/voirfile.py index 72bc1b9ca..90c5e00a5 100644 --- a/benchmarks/torchvision/voirfile.py +++ b/benchmarks/torchvision/voirfile.py @@ -1,9 +1,8 @@ from dataclasses import dataclass from voir import configurable -from voir.instruments import dash, early_stop, gpu_monitor, log, rate -from voir.wrapper import DataloaderWrapper, Wrapper - +from voir.instruments import dash, early_stop, gpu_monitor, log +from benchmate.observer import BenchObserver @dataclass class Config: @@ -27,16 +26,11 @@ class Config: @configurable def instrument_main(ov, options: Config): - import torchcompat.core as accelerator - from ptera import refstring - from benchmate.dataloader import imagenet_dataloader - yield ov.phases.init if options.dash: ov.require(dash) - overhead_metrics = [] # "__iter__", "overhead", "process_time" ov.require( @@ -47,18 +41,22 @@ def instrument_main(ov, options: Config): yield ov.phases.load_script + from benchmate.dataloader import imagenet_dataloader + import torchcompat.core as accelerator + from ptera import refstring + # Note: the wrapper can also do early stopping, if raise_stop_program=True - wrapper = Wrapper( + observer = BenchObserver( accelerator.Event, earlystop=options.stop + options.skip, batch_size_fn=lambda x: len(x[0]) ) probe = ov.probe(f"{refstring(imagenet_dataloader)}() as loader", overridable=True) - probe['loader'].override(wrapper.loader) + probe['loader'].override(observer.loader) probe = ov.probe("//train_epoch > criterion", overridable=True) - probe['criterion'].override(wrapper.criterion) + probe['criterion'].override(observer.criterion) diff --git a/benchmarks/torchvision_ddp/main.py b/benchmarks/torchvision_ddp/main.py index 0cd9c4ad3..40de9169c 100755 --- a/benchmarks/torchvision_ddp/main.py +++ b/benchmarks/torchvision_ddp/main.py @@ -17,7 +17,7 @@ import torchvision.transforms as transforms import torchvision.models as torchvision_models -from voir.wrapper import DataloaderWrapper, StopProgram +from benchmate.metrics import BenchObserver, StopProgram import torchcompat.core as accelerator from benchmate.dataloader import imagenet_dataloader, dataloader_arguments @@ -47,15 +47,16 @@ def __init__( self.rank = gpu_id self.device = accelerator.fetch_device(gpu_id) self.model = model.to(self.device) - self.train_data = DataloaderWrapper.with_sumggler( - train_data, + self.observer = BenchObserver( accelerator.Event, rank=self.rank, device=self.device, earlystop=60, raise_stop_program=True, - batch_size_fn=lambda x: len(x[0]) + batch_size_fn=lambda x: len(x[0]), + stdout=True ) + self.train_data = self.observer.loader(train_data) self.optimizer = optimizer # self.model = FSDP(model, device_id=self.device) self.model = DDP(model, device_ids=[self.device]) @@ -77,7 +78,7 @@ def _run_batch(self, source, targets): self.optimizer.step() accelerator.mark_step() - return loss.detach() + self.observer.record_loss(loss.detach()) def _run_epoch(self, epoch): self.train_data.sampler.set_epoch(epoch) @@ -85,8 +86,7 @@ def _run_epoch(self, epoch): source = source.to(self.device) targets = targets.to(self.device) - loss = self._run_batch(source, targets) - self.train_data.add_loss(loss) + self._run_batch(source, targets) def train(self, max_epochs: int): for epoch in range(max_epochs): diff --git a/benchmate/benchmate/common.py b/benchmate/benchmate/common.py new file mode 100644 index 000000000..199945012 --- /dev/null +++ b/benchmate/benchmate/common.py @@ -0,0 +1,92 @@ +import json +import time +import sys +import multiprocessing + +from voir.smuggle import SmuggleWriter +from voir.instruments.gpu import get_gpu_info +from voir.instruments.utils import Monitor + + +# +# TODO: this use deprecated things +# + +def _worker(state, queue, func, delay): + import time + + while state["running"]: + queue.put(func()) + time.sleep(delay) + + +class _Monitor: + def __init__(self, delay, func): + self.manager = multiprocessing.Manager() + self.state = self.manager.dict() + self.state["running"] = True + self.results = multiprocessing.Queue() + self.process = multiprocessing.Process( + target=_worker, args=(self.state, self.results, func, delay), + ) + + def start(self): + self.process.start() + + def stop(self): + self.state["running"] = False + self.process.join() + + + +def setupvoir(): + # wtf this do + data_file = SmuggleWriter(sys.stdout) + # data_file = sys.stdout + + def log(data): + if data_file is not None: + data["t"] = time.time() + print(json.dumps(data), file=data_file) + + while not monitor.results.empty(): + print(json.dumps(monitor.results.get()), file=data_file) + + def monitor_fn(): + data = { + gpu["device"]: { + "memory": [gpu["memory"]["used"], gpu["memory"]["total"],], + "load": gpu["utilization"]["compute"], + "temperature": gpu["temperature"], + "power": gpu["power"], + } + for gpu in get_gpu_info()["gpus"].values() + } + return {"task": "main", "gpudata": data, "t": time.time()} + + monitor = _Monitor(0.5, monitor_fn) + monitor.start() + return log, monitor + + +def opt_voir(): + data_file = SmuggleWriter(sys.stdout) + def mblog(data): + if data_file is not None: + print(json.dumps(data), file=data_file) + + def monitor_fn(): + data = { + gpu["device"]: { + "memory": [gpu["memory"]["used"], gpu["memory"]["total"]], + "load": gpu["utilization"]["compute"], + "temperature": gpu["temperature"], + } + for gpu in get_gpu_info()["gpus"].values() + } + mblog({"task": "main", "gpudata": data}) + + monitor_fn() + monitor = Monitor(3, monitor_fn) + monitor.start() + return monitor \ No newline at end of file diff --git a/benchmate/benchmate/metrics.py b/benchmate/benchmate/metrics.py new file mode 100644 index 000000000..53bef1ad1 --- /dev/null +++ b/benchmate/benchmate/metrics.py @@ -0,0 +1,375 @@ +import json +import os +import sys +import time + +from voir.helpers import current_overseer +from voir.phase import StopProgram +from voir.smuggle import SmuggleWriter + +TORCH_ERROR = None +try: + import torch + import torch.distributed as dist +except ImportError as err: + TORCH_ERROR = err + + +def file_push(fp=sys.stdout): + def message(**kwargs): + kwargs.setdefault("task", "train") + msg = json.dumps(kwargs) + print(msg, file=fp) + + return message + + +def sumggle_push(): + fp = SmuggleWriter(sys.stdout) + return file_push(fp) + + +def is_running_with_voir(): + return current_overseer.get() is not None + + +def give_push(): + ov = current_overseer.get() + + if ov is not None: + return ov.give + + return file_push() + + +def earlystop_count(): + return int(os.getenv("VOIR_EARLYSTOP_COUNT", 60)) + int( + os.getenv("VOIR_EARLYSTOP_SKIP", 10) + ) + + +class LazyMetricPusher: + def __init__(self, task): + self.task = task + self.delayed = [] + + def append(self, *args, **kwargs): + self.delayed.append(args, kwargs) + + def record(self, *args, **kwargs): + """Record data for a future metric. + + No synchronization here. + """ + self.append(*args, **kwargs) + + def materialize(self, *args, **kwargs): + """Transform raw data into a metric. + + Synchronization happens here. + """ + return *args, kwargs + + def push(self, pusher): + """Iterate through data and push metrics.""" + for args, kwargs in self.delayed: + pusher(self.materialize(*args, **kwargs)) + self.delayed = [] + + +class LazyLossPusher(LazyMetricPusher): + def record(self, loss): + # no .item() we do not want to sync + self.append(loss.detach()) + + def materialize(self, loss): + # synch here is fine + return {"loss": loss.item(), "task": self.task} + + +class CPUTimer: + def __init__(self): + self._start = None + self._end = None + + def start(self): + self._start = - time.time() + + def end(self): + self._end = time.time() + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.end() + + def elapsed(self): + return self._end + self._start + + +class DeviceTimer: + def __init__(self, event_fn): + self._start = event_fn(enable_timing=True) + self._end = event_fn(enable_timing=True) + + def start(self): + self._start.record() + + def end(self): + self._end.record() + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.end() + + def elapsed(self): + self._end.synchronize() + return self._start.elapsed_time(self._end) + + +class TimedIterator: + """Time the body of a loop, ignoring the time it took to initialize the iterator.` + The timings are measured using `torch.cuda.Event` to avoid explicit sync. + + An explicit sync is done at the end of an epoch or if the max number of observation is reached. + + Because the timings are async, voir only gets triggered when the explicit sync happens which + is outside of the scope of performance measure, so no matter how long voir takes to process + the events it will not impact the measures. + + The wrapper also works in multi-gpu, multi-node setups and computes the real batch-size + by reducing the batch size on all processes. Only rank 0 logs data. + + Notes + ----- + The event progress is the only one that is feed synchronously so + this event should be handled quickly. + + Arguments + --------- + loader: Dataloader + original pytorch dataloader + + event_fn: + event constructor (torch.cuda.Evemt, toch.xpu.Event, etc...) + + rank: int + rank of the current process, only required if distributed + + device: + device used, only required if distributed + + earlystop: int + number of observation to produce before raising StopProgram + + push: function: + function used to message/push metrics + + Examples + -------- + + .. code-block:: + + loader = TimedIterator(loader, torch.cuda.Event, earlystop=60) # < here + + for e in range(epochs): + for i in loader: + loss = criterion(model(x), y) + """ + + @classmethod + def with_sumggler(cls, *args, push=None, **kwargs): + return cls(*args, push=sumggle_push(), **kwargs) + + @classmethod + def with_stdout(cls, *args, push=None, **kwargs): + return cls(*args, push=file_push(), **kwargs) + + @classmethod + def with_give(cls, *args, push=None, **kwargs): + return cls(*args, push=give_push(), **kwargs) + + def __init__( + self, + loader, + event_fn, + rank=0, + push=file_push(), + device=None, + earlystop=earlystop_count(), + raise_stop_program=False, + batch_size_fn=None, + ): + self.loader = loader # original iterator + self.events = [] # accumulated events to be pushed + + self.task = "train" # voir task usually train but could be validation/test + self.total_obs = 0 # Number of "pushed" observations + self.event_fn = event_fn # function to create a device event + self.early_stop = earlystop # Number of observation to target + self.unit = 1000 # device timer is ms + + self.message_push = push # How to push the metrics usually voir or stdout + + # Number of times we broke out of the iterator for early stopping + # we should really only do this onece + self.break_count = 0 + self.batch_size_fn = batch_size_fn + + # Multi-GPU setup + self.rank = None + self.device = device + self.world_size = 1 + + # Options + self.raise_stop_program = raise_stop_program # Does TimedIterator raise StopProgram + self.profile_instrumentation = False + self.overhead = [] + self.loader_init_time = [] + self.sub_overhead = 0 + + if not TORCH_ERROR and dist.is_initialized(): + self.rank = rank + assert ( + self.device is not None + ), "device is required to compute the final batch size" + + def __getattr__(self, item): + return getattr(self.loader, item) + + def __len__(self): + return len(self.loader) + + def __iter__(self): + self.log_progress() + + # This takes much more time than expected good thing to keep track of it + with CPUTimer() as ct: + iterator = iter(self.loader) + + self.loader_init_time.append(ct.elapsed()) + return self.wrapped(iterator) + + def wrapped(self, iterator): + # Time IO wait + batch compute + start = self.event_fn(enable_timing=True) + start.record() + + for data in iterator: + yield data + + with CPUTimer() as ct: + end = self.event_fn(enable_timing=True) + end.record() + + bs = self.deduce_batch_size(data) + self.events.append((start, end, bs, self.overhead[-1])) + + # Log progress so it looks somewhat responsive + self.log_progress() + + # check for early stopping to avoid doing the full epoch + if self.is_done() and self.break_count == 0: + self.break_count += 1 + break + + start = end + self.overhead.append(ct.elapsed()) + + self._push() + self.earlystop() + + def deduce_batch_size(self, elem): + if self.batch_size_fn: + return self.batch_size_fn(elem) + + try: + if len(elem) == 2: + return len(elem[0]) + return len(elem) + except ValueError: + return 0 + + def progress(self): + return len(self.events) + self.total_obs + + def is_done(self): + return self.early_stop is not None and self.progress() >= self.early_stop + + def earlystop(self, exception=StopProgram): + if self.is_done(): + self._push() + + if self.raise_stop_program: + raise exception() + + def on_iterator_stop_iterator(self): + """Extension point called when timers are off""" + pass + + def batch_size(self, bs): + # multi GPU, batch size count + if not TORCH_ERROR and dist.is_initialized(): + bs = torch.tensor([bs], dtype=torch.int64, device=self.device) + dist.reduce(bs, dst=0) + return bs.item() + return bs + + def _push_time_steps(self): + for start, end, bs, overhead in self.events: + end.synchronize() + elapsed = (start.elapsed_time(end) - self.sub_overhead * overhead) / self.unit + rate = self.batch_size(bs) / elapsed + self.log_rate(rate) + + self.total_obs += len(self.events) + self.events = [] + + def _push_profile_metrics(self): + if self.profile_instrumentation: + for ov in self.overhead: + self.message(overhead=ov, units="s", task=self.task) + + for iterinit in self.loader_init_time: + self.message(__iter__=iterinit, units="s", task=self.task) + self.overhead = [] + self.loader_init_time = [] + + def _push(self): + """Push all the accumulated metrics""" + + with CPUTimer() as sync_time: + event = self.event_fn() + event.record() + event.synchronize() + + with CPUTimer() as process_time: + self.on_iterator_stop_iterator() + + # Push synchronize to have the final compute times + self._push_time_steps() + + # Optional + self._push_profile_metrics() + + self.message(sync_time=sync_time.elapsed(), units="s", task=self.task) + self.message(process_time=process_time.elapsed(), units="s", task=self.task) + + def log_rate(self, rate): + self.message(rate=rate, units="items/s", task=self.task) + + def log_progress(self): + if self.early_stop is not None: + progress = self.progress() + self.message(progress=(progress, self.early_stop), task="early_stop") + + def message(self, **kwargs): + if self.rank is None or self.rank == 0: + self.message_push(**kwargs) + + diff --git a/benchmate/benchmate/observer.py b/benchmate/benchmate/observer.py new file mode 100644 index 000000000..e00b1fec4 --- /dev/null +++ b/benchmate/benchmate/observer.py @@ -0,0 +1,98 @@ +from voir.helpers import current_overseer + +from .metrics import LazyLossPusher, give_push, sumggle_push, TimedIterator + + +class BenchObserver: + """Helper class to create override function for ptera + + Examples + -------- + + .. code-block:: + + + observer = BenchObserver() + + probe = ov.probe("//dataloader() as loader", overridable=True) + probe['loader'].override(observer.loader) + + probe = ov.probe("//train_epoch > criterion", overridable=True) + probe['criterion'].override(observer.criterion) + + """ + + def __init__( + self, *args, backward_callback=None, step_callback=None, stdout=False, **kwargs + ): + self.wrapped = None + self.args = args + self.kwargs = kwargs + self.backward_callback = backward_callback + self.optimizer_step_callback = step_callback + self.stdout = stdout + self.task = "train" + self.losses = LazyLossPusher(self.task) + + self.pusher = give_push() + if self.stdout: + self.pusher = sumggle_push() + + def on_iterator_stop_iterator(self): + """Called when the timed iterator stops, used to do extra work when timers are off""" + self.losses.push(self.pusher) + + def record_loss(self, loss): + self.losses.record(loss) + return loss + + def override_return_value(self, function, override): + import ptera + refstring = ptera.refstring(function) + + ov = current_overseer.get() + + if ov is not None: + probe = ov.probe(f"{refstring}() as retval", overridable=True) + probe['retval'].override(override) + else: + raise RuntimeError("Not running through voir") + + def loader(self, loader): + """Wrap a dataloader or an iterable which enable accurate measuring of time spent in the loop's body""" + self.wrapped = TimedIterator(loader, *self.args, push=self.pusher, **self.kwargs) + self.wrapped.task = self.task + self.wrapped.on_iterator_stop_iterator = self.on_iterator_stop_iterator + return self.wrapped + + def criterion(self, criterion): + """Wrap a loss value to log and enable a .backward callback""" + + def wrapped(*args): + loss = criterion(*args) + + if self.backward_callback: + original = loss.backward + + def new_backward(*args, **kwargs): + original(*args, **kwargs) + self.backward_callback() + + loss.backward = new_backward + + self.record_loss(loss.detach()) + return loss + + return wrapped + + def optimizer(self, optimizer): + """Wrap an optimizer to enable a .step callback""" + if self.optimizer_step_callback: + original = optimizer.step + + def new_step(*args, **kwargs): + original(*args, **kwargs) + self.optimizer_step_callback() + + optimizer.step = new_step + return optimizer