From 009a85745a1900171461ee13c6e1565c0e028b6f Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 16 May 2024 18:18:23 +0000 Subject: [PATCH] Update bench to use low overhead measure --- benchmarks/torchvision/main.py | 130 +++++++++++++---------------- benchmarks/torchvision_ddp/main.py | 78 +++-------------- 2 files changed, 73 insertions(+), 135 deletions(-) diff --git a/benchmarks/torchvision/main.py b/benchmarks/torchvision/main.py index 5bbbea806..5206666dc 100644 --- a/benchmarks/torchvision/main.py +++ b/benchmarks/torchvision/main.py @@ -11,6 +11,7 @@ import torchcompat.core as accelerator import voir +from voir.asynctimer import DataloaderWrapper, DataloaderWrapperGiver, DataloaderWrapperSmuggle, StopProgram from giving import give, given from cantilever.core.timer import timeit, timeiterator, show_timings @@ -68,37 +69,28 @@ def scaling(enable, dtype): def train_epoch(model, criterion, optimizer, loader, device, dtype, scaler=None): model.train() - def toiterator(loader): - with timeit("loader"): - return iter(loader) - - iterator = timeiterator(voir.iterate("train", toiterator(loader), True)) + for inp, target in loader: + inp = inp.to(device, dtype=dtype) + target = target.to(device) + optimizer.zero_grad() - for inp, target in iterator: - - with timeit("batch"): - inp = inp.to(device, dtype=dtype) - target = target.to(device) - optimizer.zero_grad() - - with scaling(scaler is not None, dtype): - output = model(inp) - loss = criterion(output, target) - give(loss=loss.item()) - - if scaler: - scaler.scale(loss).backward() - accelerator.mark_step() - - scaler.step(optimizer) - scaler.update() - else: - loss.backward() - accelerator.mark_step() - optimizer.step() + with scaling(scaler is not None, dtype): + output = model(inp) + loss = criterion(output, target) + loader.add_loss(loss) + if scaler: + scaler.scale(loss).backward() accelerator.mark_step() - accelerator.synchronize() + + scaler.step(optimizer) + scaler.update() + else: + loss.backward() + accelerator.mark_step() + optimizer.step() + + accelerator.mark_step() class SyntheticData: @@ -123,15 +115,19 @@ def __len__(self): def dataloader(args): train = datasets.ImageFolder(os.path.join(args.data, "train"), data_transforms) - train_loader = torch.utils.data.DataLoader( - train, - batch_size=args.batch_size, - num_workers=args.num_workers, - sampler=torch.utils.data.RandomSampler( - train, - replacement=True, - num_samples=len(train) * args.epochs - ) + train_loader = DataloaderWrapperGiver( + torch.utils.data.DataLoader( + train, + batch_size=args.batch_size, + num_workers=args.num_workers, + sampler=torch.utils.data.RandomSampler( + train, + replacement=True, + num_samples=len(train) * args.epochs + ) + ), + accelerator.Event, + earlystop=60 ) return train_loader @@ -145,22 +141,11 @@ def iobench(args): device = accelerator.fetch_device(0) dtype = float_dtype(args.precision) - def toiterator(loader): - with timeit("loader"): - return iter(loader) - with given() as gv: - for epoch in voir.iterate("main", range(args.epochs)): - with timeit("epoch"): - - iterator = timeiterator(voir.iterate("train", toiterator(loader), True)) - - for inp, target in iterator: - with timeit("batch"): - inp = inp.to(device, dtype=dtype) - target = target.to(device) - - accelerator.synchronize() + for epoch in range(args.epochs): + for inp, target in loader: + inp = inp.to(device, dtype=dtype) + target = target.to(device) def main(): @@ -287,30 +272,35 @@ def trainbench(args): if args.data: train_loader = dataloader(args) else: - train_loader = SyntheticData( - model=model, - device=device, - batch_size=args.batch_size, - n=1000, - fixed_batch=args.fixed_batch, + train_loader = DataloaderWrapperGiver(SyntheticData( + model=model, + device=device, + batch_size=args.batch_size, + n=1000, + fixed_batch=args.fixed_batch, + ), + accelerator.Event, + earlystop=60 ) scaler = NoScale() if torch.cuda.is_available(): scaler = accelerator.amp.GradScaler(enabled=is_fp16_allowed(args)) - with given() as gv: - if not args.no_stdout: - gv.where("loss").display() - - for epoch in voir.iterate("main", range(args.epochs)): - with timeit("epoch"): - if not args.no_stdout: - print(f"Begin training epoch {epoch}/{args.epochs}") - train_epoch( - model, criterion, optimizer, train_loader, device, scaler=scaler, dtype=float_dtype(args.precision) - ) + for epoch in range(args.epochs): + train_epoch( + model, + criterion, + optimizer, + train_loader, + device, + scaler=scaler, + dtype=float_dtype(args.precision), + ) if __name__ == "__main__": - main() + try: + main() + except StopProgram: + pass \ No newline at end of file diff --git a/benchmarks/torchvision_ddp/main.py b/benchmarks/torchvision_ddp/main.py index 117cfe083..cbb905ae4 100755 --- a/benchmarks/torchvision_ddp/main.py +++ b/benchmarks/torchvision_ddp/main.py @@ -25,6 +25,7 @@ import torchvision.datasets as datasets import voir +from voir.asynctimer import DataloaderWrapper, DataloaderWrapperSmuggle, StopProgram from voir.smuggle import SmuggleWriter from giving import give, given from cantilever.core.timer import timeit, timeiterator, show_timings @@ -56,12 +57,17 @@ def __init__( self.rank = gpu_id self.device = accelerator.fetch_device(gpu_id) self.model = model.to(self.device) - self.train_data = train_data + self.train_data = DataloaderWrapperSmuggle( + train_data, + accelerator.Event, + rank=self.rank, + device=self.device, + earlystop=60 + ) self.optimizer = optimizer # self.model = FSDP(model, device_id=self.device) self.model = DDP(model, device_ids=[self.device]) self.world_size = world_size - self.data_file = SmuggleWriter(sys.stdout) def print(self, *args, **kwargs): if self.rank == 0: @@ -69,7 +75,6 @@ def print(self, *args, **kwargs): def _run_batch(self, source, targets): with accelerator.amp.autocast(dtype=torch.bfloat16): - self.optimizer.zero_grad() output = self.model(source) loss = F.cross_entropy(output, targets) @@ -83,75 +88,20 @@ def _run_batch(self, source, targets): return loss.detach() def _run_epoch(self, epoch): - def toiterator(loader): - with timeit("loader"): - return iter(loader) - - sample_count = 0 - losses = [] - events = [] - self.train_data.sampler.set_epoch(epoch) - loader = timeiterator(voir.iterate("train", toiterator(self.train_data), True)) - - start_event = accelerator.Event(enable_timing=True) - start_event.record() - - for source, targets in loader: - end_event = accelerator.Event(enable_timing=True) - + for source, targets in self.train_data: with timeit("batch"): source = source.to(self.device) targets = targets.to(self.device) - n = len(source) - sample_count += n - loss = self._run_batch(source, targets) - losses.append(loss) - - end_event.record() - events.append((start_event, end_event, n)) - start_event = end_event - - for start, end, n in events: - end.synchronize() - elapsed = start.elapsed_time(end) / 1000 - rate = (n * self.world_size) / elapsed - self.log({ - "task": "train", - "rate": rate, - "units": "items/s", - }) - - total_count = torch.tensor([sample_count], dtype=torch.int64, device=self.device) - dist.reduce(total_count, dst=0) - - loss = sum([l.item() for l in losses]) / len(losses) - return total_count.item(), loss + self.train_data.add_loss(loss) def train(self, max_epochs: int): with given() as gv: for epoch in range(max_epochs): with timeit("epoch") as timer: - total_count, loss = self._run_epoch(epoch) - - self.perf(loss, total_count, timer) - - def log(self, data): - if self.rank == 0 and self.data_file is not None: - msg = json.dumps(data) - print(msg, file=self.data_file) - print(msg) - - def perf(self, loss, total_count, timer): - if self.rank == 0: - self.log({"task": "train", "loss": loss}) - # self.log({ - # "task": "train", - # "rate": total_count / (timer.end - timer.start), - # "units": "items/s", - # }) + self._run_epoch(epoch) def image_transforms(): @@ -168,7 +118,6 @@ def image_transforms(): def prepare_dataloader(dataset: Dataset, args): dsampler = DistributedSampler(dataset) - # next(iter(dsampler)) return DataLoader( dataset, @@ -231,11 +180,10 @@ def worker_main(rank: int, world_size: int, args): destroy_process_group() print(f"<<< rank: {rank}") + except StopProgram: + print("Early stopping") except Exception as err: print(err) - finally: - if rank == 0: - show_timings(True) def main():