From 677453835ac0a55e61137ce4437b44da3cf40b4b Mon Sep 17 00:00:00 2001 From: "pierre.delaunay" Date: Thu, 6 Jun 2024 11:52:59 -0400 Subject: [PATCH] refactor imagenet related code --- benchmarks/torchvision/main.py | 4 - benchmarks/torchvision_ddp/main.py | 61 +---- benchmarks/torchvision_ddp/prepare.py | 57 +---- benchmate/benchmate/datagen.py | 21 -- benchmate/benchmate/dataloader.py | 128 +++++++--- benchmate/benchmate/iterator.py | 349 -------------------------- benchmate/pyproject.toml | 1 - milabench/sizer.py | 3 + 8 files changed, 103 insertions(+), 521 deletions(-) delete mode 100644 benchmate/benchmate/iterator.py diff --git a/benchmarks/torchvision/main.py b/benchmarks/torchvision/main.py index fb048b501..3e6dfe8f7 100644 --- a/benchmarks/torchvision/main.py +++ b/benchmarks/torchvision/main.py @@ -157,10 +157,6 @@ def _main(): trainbench(args) def iobench(args): - data_directory = os.environ.get("MILABENCH_DIR_DATA", None) - if args.data is None and data_directory: - args.data = os.path.join(data_directory, "FakeImageNet") - device = accelerator.fetch_device(0) model = getattr(tvmodels, args.model)() model.to(device) diff --git a/benchmarks/torchvision_ddp/main.py b/benchmarks/torchvision_ddp/main.py index ad8d03dcb..1a12004b9 100755 --- a/benchmarks/torchvision_ddp/main.py +++ b/benchmarks/torchvision_ddp/main.py @@ -29,6 +29,7 @@ from voir.smuggle import SmuggleWriter from giving import give, given import torchcompat.core as accelerator +from benchmate.dataloader import imagenet_dataloader, dataloader_arguments def ddp_setup(rank, world_size): @@ -114,50 +115,16 @@ def image_transforms(): ) return data_transforms -def prepare_dataloader(dataset: Dataset, args): - dsampler = DistributedSampler(dataset) - - return DataLoader( - dataset, - batch_size=args.batch_size, - num_workers=args.num_workers if not args.noio else 0, - pin_memory=not args.noio, - shuffle=False, - sampler=dsampler - ) - -class FakeDataset: - def __init__(self, args): - self.data = [ - (torch.randn((3, 224, 224)), i % 1000) for i in range(60 * args.batch_size) - ] - - def __len__(self): - return len(self.data) - - def __getitem__(self, item): - return self.data[item] - - -def dataset(args): - if args.noio: - return FakeDataset(args) - else: - data_directory = os.environ.get("MILABENCH_DIR_DATA", None) - if args.data is None and data_directory: - args.data = os.path.join(data_directory, "FakeImageNet") - - return datasets.ImageFolder(os.path.join(args.data, "train"), image_transforms()) +def prepare_dataloader(args, model, rank, world_size): + return imagenet_dataloader(args, model, rank, world_size) def load_train_objs(args): - train = dataset(args) - model = getattr(torchvision_models, args.model)() optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) - return train, model, optimizer + return model, optimizer def worker_main(rank: int, world_size: int, args): @@ -166,8 +133,9 @@ def worker_main(rank: int, world_size: int, args): ddp_setup(rank, world_size) - dataset, model, optimizer = load_train_objs(args) - train_data = prepare_dataloader(dataset, args) + model, optimizer = load_train_objs(args) + + train_data = prepare_dataloader(args, model, rank, world_size) trainer = Trainer(model, train_data, optimizer, rank, world_size) @@ -184,7 +152,7 @@ def worker_main(rank: int, world_size: int, args): def main(): parser = argparse.ArgumentParser(description='simple distributed training job') - parser.add_argument('--batch-size', default=512, type=int, help='Input batch size on each device (default: 32)') + dataloader_arguments(parser) parser.add_argument( "--model", type=str, help="torchvision model name", default="resnet50" ) @@ -195,18 +163,6 @@ def main(): metavar="N", help="number of epochs to train (default: 10)", ) - parser.add_argument( - "--num-workers", - type=int, - default=8, - help="number of workers for data loading", - ) - parser.add_argument( - "--noio", - action='store_true', - default=False, - help="Disable IO by providing an in memory dataset", - ) parser.add_argument( "--precision", type=str, @@ -214,7 +170,6 @@ def main(): default="fp32", help="Precision configuration", ) - parser.add_argument("--data", type=str, help="data directory") args = parser.parse_args() world_size = accelerator.device_count() diff --git a/benchmarks/torchvision_ddp/prepare.py b/benchmarks/torchvision_ddp/prepare.py index 9158ae0e0..7c88e94a6 100755 --- a/benchmarks/torchvision_ddp/prepare.py +++ b/benchmarks/torchvision_ddp/prepare.py @@ -1,59 +1,6 @@ #!/usr/bin/env python -import multiprocessing -import os -from pathlib import Path - -from tqdm import tqdm - - -def write(args): - from torchvision.datasets import FakeData - - image_size, offset, count, outdir = args - dataset = FakeData( - size=count, image_size=image_size, num_classes=1000, random_offset=offset - ) - - image, y = next(iter(dataset)) - class_val = int(y) - image_name = f"{offset}.jpeg" - - path = os.path.join(outdir, str(class_val)) - os.makedirs(path, exist_ok=True) - - image_path = os.path.join(path, image_name) - image.save(image_path) - - -def generate(image_size, n, outdir): - p_count = min(multiprocessing.cpu_count(), 8) - pool = multiprocessing.Pool(p_count) - for _ in tqdm( - pool.imap_unordered(write, ((image_size, i, n, outdir) for i in range(n))), - total=n, - ): - pass - - -def generate_sets(root, sets, shape): - root = Path(root) - sentinel = root / "done" - if sentinel.exists(): - print(f"{root} was already generated") - return - if root.exists(): - print(f"{root} exists but is not marked complete; deleting") - root.rm() - for name, n in sets.items(): - print(f"Generating {name}") - generate(shape, n, os.path.join(root, name)) - sentinel.touch() - +from benchmate.datagen import generate_fakeimagenet if __name__ == "__main__": - data_directory = os.environ["MILABENCH_DIR_DATA"] - dest = os.path.join(data_directory, "FakeImageNet") - print(f"Generating fake data into {dest}...") - generate_sets(dest, {"train": 4096, "val": 16, "test": 16}, (3, 384, 384)) - print("Done!") + generate_fakeimagenet() diff --git a/benchmate/benchmate/datagen.py b/benchmate/benchmate/datagen.py index e8b463a84..7299a07f8 100644 --- a/benchmate/benchmate/datagen.py +++ b/benchmate/benchmate/datagen.py @@ -15,27 +15,6 @@ from tqdm import tqdm - -class FakeInMemoryDataset: - def __init__(self, producer, batch_size, batch_count): - self.data = [producer(i) for i in range(batch_size * batch_count)] - - def __len__(self): - return len(self.data) - - def __getitem__(self, item): - return self.data[item] - - -class FakeImageClassification(FakeInMemoryDataset): - def __init__(self, shape, batch_size, batch_count): - def producer(i): - return (torch.randn(shape), i % 1000) - - super().__init__(producer, batch_size, batch_count) - - - def write(args): import torch import torchvision.transforms as transforms diff --git a/benchmate/benchmate/dataloader.py b/benchmate/benchmate/dataloader.py index a1f80388a..28177f6d3 100644 --- a/benchmate/benchmate/dataloader.py +++ b/benchmate/benchmate/dataloader.py @@ -6,6 +6,7 @@ import torchvision.datasets as datasets import torchvision.transforms as transforms import torchcompat.core as accelerator +from torch.utils.data.distributed import DistributedSampler def generate_tensors(batch_size, shapes, device): @@ -40,6 +41,25 @@ def generate_tensor_classification(model, batch_size, in_shape, device): return inp, out +class FakeInMemoryDataset: + def __init__(self, producer, batch_size, batch_count): + self.data = [producer(i) for i in range(batch_size * batch_count)] + + def __len__(self): + return len(self.data) + + def __getitem__(self, item): + return self.data[item] + + +class FakeImageClassification(FakeInMemoryDataset): + def __init__(self, shape, batch_size, batch_count): + def producer(i): + return (torch.randn(shape), i % 1000) + + super().__init__(producer, batch_size, batch_count) + + class SyntheticData: def __init__(self, tensors, n, fixed_batch): self.n = n @@ -59,7 +79,7 @@ def __len__(self): return self.n -def dali(folder, batch_size, num_workers): +def dali(folder, batch_size, num_workers, rank=0, world_size=1): from nvidia.dali.pipeline import pipeline_def import nvidia.dali.types as types import nvidia.dali.fn as fn @@ -71,6 +91,8 @@ def get_dali_pipeline(): file_root=folder, random_shuffle=True, name="Reader", + shard_id=rank, + num_shards=world_size, ) # decode data on the GPU @@ -112,7 +134,19 @@ def __iter__(self): return Adapter(train_data) -def pytorch(folder, batch_size, num_workers): +def pytorch_fakedataset(folder, batch_size, num_workers): + train = FakeImageClassification((3, 224, 224), batch_size, 60) + + return torch.utils.data.DataLoader( + train, + batch_size=batch_size, + num_workers=num_workers, + pin_memory=True, + shuffle=True, + ) + + +def image_transforms(): normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) data_transforms = transforms.Compose( [ @@ -122,22 +156,36 @@ def pytorch(folder, batch_size, num_workers): normalize, ] ) - - train = datasets.ImageFolder(os.path.join(folder, "train"), data_transforms) + return data_transforms + +def pytorch(folder, batch_size, num_workers, distributed=False): + train = datasets.ImageFolder( + os.path.join(folder, "train"), + image_transforms() + ) + + kwargs = {"shuffle": True} + if distributed: + kwargs["sampler"] = DistributedSampler(train) + kwargs["shuffle"] = False + + # The dataloader needs a warmup sometimes + # by avoiding to go through too many epochs + # we reduce the standard deviation + if False: + kwargs["sampler"] = torch.utils.data.RandomSampler( + train, + replacement=True, + num_samples=len(train) * args.epochs + ) + kwargs["shuffle"] = False + return torch.utils.data.DataLoader( train, batch_size=batch_size, num_workers=num_workers, pin_memory=True, - shuffle=True, - # The dataloader needs a warmup sometimes - # by avoiding to go through too many epochs - # we reduce the standard deviation - # sampler=torch.utils.data.RandomSampler( - # train, - # replacement=True, - # num_samples=len(train) * args.epochs - # ) + **kwargs, ) @@ -167,7 +215,7 @@ def dataloader_arguments(parser: argparse.ArgumentParser): help="input batch size for training (default: 16)", ) parser.add_argument( - "--loader", type=str, help="Dataloader implementation", + "--loader", type=str, help="Dataloader implementation (dali, pytorch, synthetic_fixed, synthetic_random)", default="pytorch" ) parser.add_argument( @@ -178,37 +226,41 @@ def dataloader_arguments(parser: argparse.ArgumentParser): "--data", type=str, default=os.environ.get("MILABENCH_DIR_DATA", None), help="data directory" ) - parser.add_argument( - "--synthetic-data", action="store_true", - help="whether to use synthetic data" - ) - parser.add_argument( - "--fixed-batch", action="store_true", - help="use a fixed batch for training" - ) -def imagenet_dataloader(args, model): + +def data_folder(args): if not args.data: data_directory = os.environ.get("MILABENCH_DIR_DATA", None) if data_directory: args.data = os.path.join(data_directory, "FakeImageNet") + return args.data - if args.fixed_batch: - args.synthetic_data = True - if args.synthetic_data: - args.data = None +def imagenet_dataloader(args, model, rank=0, world_size=1): + if args.loader == "synthetic_random": + return synthetic( + model=model, + batch_size=args.batch_size, + fixed_batch=False + ) + + if args.loader == "synthetic_fixed": + return synthetic( + model=model, + batch_size=args.batch_size, + fixed_batch=True + ) + + if args.loader == "pytorch_fakedataset": + return pytorch_fakedataset( + None, + batch_size=args.batch_size, + num_workers=args.num_workers + ) - if args.data: - folder = os.path.join(args.data, "train") + folder = os.path.join(data_folder(args), "train") - if args.loader == "dali": - return dali(folder, args.batch_size, args.num_workers) - - return pytorch(folder, args.batch_size, args.num_workers) + if args.loader == "dali": + return dali(folder, args.batch_size, args.num_workers, rank, world_size) - return synthetic( - model=model, - batch_size=args.batch_size, - fixed_batch=args.fixed_batch - ) \ No newline at end of file + return pytorch(folder, args.batch_size, args.num_workers, world_size > 1) diff --git a/benchmate/benchmate/iterator.py b/benchmate/benchmate/iterator.py deleted file mode 100644 index 64686944f..000000000 --- a/benchmate/benchmate/iterator.py +++ /dev/null @@ -1,349 +0,0 @@ -import json -import os -import sys -import time - -from voir.helpers import current_overseer -from voir.phase import StopProgram -from voir.smuggle import SmuggleWriter - -TORCH_ERROR = None -try: - import torch - import torch.distributed as dist -except ImportError as err: - TORCH_ERROR = err - - - -def file_push(fp=sys.stdout): - def message(**kwargs): - kwargs.setdefault("task", "train") - msg = json.dumps(kwargs) - print(msg, file=fp) - - return message - - -def sumggle_push(): - fp = SmuggleWriter(sys.stdout) - return file_push(fp) - - -def give_push(): - ov = current_overseer.get() - return ov.give - - -def earlystop_count(): - return int(os.getenv("VOIR_EARLYSTOP_COUNT", 60)) + int( - os.getenv("VOIR_EARLYSTOP_SKIP", 10) - ) - - -class DataloaderWrapper: - """Time the body of a loop, ignoring the time it took to initialize the iterator.` - The timings are measured using `torch.cuda.Event` to avoid explicit sync. - - An explicit sync is done at the end of an epoch or if the max number of observation is reached. - - Because the timings are async, voir only gets triggered when the explicit sync happens which - is outside of the scope of performance measure, so no matter how long voir takes to process - the events it will not impact the measures. - - The wrapper also works in multi-gpu, multi-node setups and computes the real batch-size - by reducing the batch size on all processes. Only rank 0 logs data. - - Notes - ----- - The event progress is the only one that is feed synchronously so - this event should be handled quickly. - - Arguments - --------- - loader: Dataloader - original pytorch dataloader - - event_fn: - event constructor (torch.cuda.Evemt, toch.xpu.Event, etc...) - - rank: int - rank of the current process, only required if distributed - - device: - device used, only required if distributed - - earlystop: int - number of observation to produce before raising StopProgram - - push: function: - function used to message/push metrics - - Examples - -------- - - .. code-block:: - - loader = DataloaderWrapper(loader, torch.cuda.Event, earlystop=60) # < here - - for e in range(epochs): - for i in loader: - loss = criterion(model(x), y) - - loader.add_loss(loss) # < here - """ - - @classmethod - def with_sumggler(cls, *args, push=None, **kwargs): - return cls(*args, push=sumggle_push(), **kwargs) - - @classmethod - def with_stdout(cls, *args, push=None, **kwargs): - return cls(*args, push=file_push(), **kwargs) - - @classmethod - def with_give(cls, *args, push=None, **kwargs): - return cls(*args, push=give_push(), **kwargs) - - def __init__( - self, - loader, - event_fn, - rank=0, - push=file_push(), - device=None, - earlystop=earlystop_count(), - raise_stop_program=False, - batch_size_fn=None, - ): - self.loader = loader - self.events = [] - self.losses = [] - self.overhead = [] - self.loader_init_time = [] - - self.total_obs = 0 - self.event_fn = event_fn - self.world_size = 1 - self.early_stop = earlystop - self.rank = None - self.device = device - self.datafile = sys.stdout - self.n = len(loader) - self.unit = 1000 # timer is ms - self.profile_instrumentation = False - self.message_push = push - self.raise_stop_program = raise_stop_program - self.break_count = 0 - self.batch_size_fn = batch_size_fn - - if not TORCH_ERROR and dist.is_initialized(): - self.rank = rank - assert ( - self.device is not None - ), "device is required to compute the final batch size" - - def __getattr__(self, item): - return getattr(self.loader, item) - - def __len__(self): - return len(self.loader) - - def __iter__(self): - self.log_progress() - - # This takes much more time than expected good thing to keep track of it - start = -time.time() - iterator = iter(self.loader) - end = time.time() - - self.loader_init_time.append(start + end) - return self.wrapped(iterator) - - def wrapped(self, iterator): - # Time IO wait + batch compute - start = self.event_fn(enable_timing=True) - start.record() - - for data in iterator: - yield data - - overhead_start = -time.time() - - end = self.event_fn(enable_timing=True) - end.record() - bs = self.deduce_batch_size(data) - self.events.append((start, end, bs)) - - # check for early stopping to avoid doing the full epoch - self.log_progress() - if self.is_done() and self.break_count == 0: - self.break_count += 1 - break - - start = end - overhead_end = time.time() - self.overhead.append(overhead_start + overhead_end) - - self._push() - self.earlystop() - - def deduce_batch_size(self, elem): - if self.batch_size_fn: - return self.batch_size_fn(elem) - - try: - if len(elem) == 2: - return len(elem[0]) - return len(elem) - except ValueError: - return 0 - - def progress(self): - return len(self.events) + self.total_obs - - def is_done(self): - return self.early_stop is not None and self.progress() >= self.early_stop - - def earlystop(self, exception=StopProgram): - if self.is_done(): - self._push() - - if self.raise_stop_program: - raise exception() - - def extra_work(self): - pass - - def batch_size(self, bs): - # multi GPU, batch size count - if not TORCH_ERROR and dist.is_initialized(): - bs = torch.tensor([bs], dtype=torch.int64, device=self.device) - dist.reduce(bs, dst=0) - return bs.item() - return bs - - def _push(self): - """Push all the accumulated metrics""" - event = self.event_fn() - event.record() - event.synchronize() - - s = -time.time() - self.extra_work() - - # Push synchronize to have the final compute times - for start, end, bs in self.events: - end.synchronize() - - elapsed = start.elapsed_time(end) / self.unit - rate = self.batch_size(bs) / elapsed - - self.log_rate(rate) - - for loss in self.losses: - self.log_loss(loss.item()) - - if self.profile_instrumentation: - for ov in self.overhead: - self.message(overhead=ov, units="s", task="train") - - for iterinit in self.loader_init_time: - self.message(__iter__=iterinit, units="s", task="train") - - self.total_obs += len(self.events) - self.events = [] - self.losses = [] - self.overhead = [] - self.loader_init_time = [] - e = time.time() - - self.message(process_time=(e + s), units="s", task="train") - - def add_loss(self, loss): - # avoid .item() that cause sync - self.losses.append(loss.detach()) - return loss - - def log_rate(self, rate): - self.message(rate=rate, units="items/s", task="train") - - def log_loss(self, loss): - self.message(loss=loss, task="train") - - def log_progress(self): - if self.early_stop is not None: - progress = self.progress() - self.message(progress=(progress, self.early_stop), task="early_stop") - - def message(self, **kwargs): - if self.rank is None or self.rank == 0: - self.message_push(**kwargs) - - -class Wrapper: - """Helper class to create override function for ptera - - Examples - -------- - - .. code-block:: - - probe = ov.probe("//dataloader() as loader", overridable=True) - probe['loader'].override(wrapper.loader) - - probe = ov.probe("//train_epoch > criterion", overridable=True) - probe['criterion'].override(wrapper.criterion) - - """ - - def __init__( - self, *args, backward_callback=None, step_callback=None, stdout=False, **kwargs - ): - self.wrapped = None - self.args = args - self.kwargs = kwargs - self.backward_callback = backward_callback - self.optimizer_step_callback = step_callback - self.stdout = stdout - - def loader(self, loader): - """Wrap a dataloader or an iterable which enable accurate measuring of time spent in the loop's body""" - ctor = DataloaderWrapper.with_give - if self.stdout: - ctor = DataloaderWrapper.with_sumggler - - self.wrapped = ctor(loader, *self.args, **self.kwargs) - return self.wrapped - - def criterion(self, criterion): - """Wrap a loss value to log and enable a .backward callback""" - - def wrapped(*args): - loss = criterion(*args) - - if self.backward_callback: - original = loss.backward - - def new_backward(*args, **kwargs): - original(*args, **kwargs) - self.backward_callback() - - loss.backward = new_backward - - self.wrapped.add_loss(loss) - return loss - - return wrapped - - def optimizer(self, optimizer): - """Wrap an optimizer to enable a .step callback""" - if self.optimizer_step_callback: - original = optimizer.step - - def new_step(*args, **kwargs): - original(*args, **kwargs) - self.optimizer_step_callback() - - optimizer.step = new_step - return optimizer diff --git a/benchmate/pyproject.toml b/benchmate/pyproject.toml index a3437dc2e..e9af8bfcc 100644 --- a/benchmate/pyproject.toml +++ b/benchmate/pyproject.toml @@ -8,7 +8,6 @@ authors = [ license = "MIT" [tool.poetry.dependencies] -voir = {git = "https://github.com/breuleux/voir", branch = "master"} python = ">=3.8,<4.0" torchcompat = "^1.0.0" diff --git a/milabench/sizer.py b/milabench/sizer.py index b8225261f..5e4c42f07 100644 --- a/milabench/sizer.py +++ b/milabench/sizer.py @@ -289,6 +289,9 @@ def resolve_argv(pack, argv): context["cpu_count"] = multiprocessing.cpu_count() context["cpu_per_gpu"] = multiprocessing.cpu_count() // device_count + context["milabench_data"] = pack.config.get("dirs", {}).get("data", None) + context["milabench_cache"] = pack.config.get("dirs", {}).get("cache", None) + max_worker = 16 context["n_worker"] = min(context["cpu_per_gpu"], max_worker)