Skip to content

Commit

Permalink
Update bench to use low overhead measure
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed May 16, 2024
1 parent be3681c commit 009a857
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 135 deletions.
130 changes: 60 additions & 70 deletions benchmarks/torchvision/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import torchcompat.core as accelerator

import voir
from voir.asynctimer import DataloaderWrapper, DataloaderWrapperGiver, DataloaderWrapperSmuggle, StopProgram
from giving import give, given
from cantilever.core.timer import timeit, timeiterator, show_timings

Expand Down Expand Up @@ -68,37 +69,28 @@ def scaling(enable, dtype):
def train_epoch(model, criterion, optimizer, loader, device, dtype, scaler=None):
model.train()

def toiterator(loader):
with timeit("loader"):
return iter(loader)

iterator = timeiterator(voir.iterate("train", toiterator(loader), True))
for inp, target in loader:
inp = inp.to(device, dtype=dtype)
target = target.to(device)
optimizer.zero_grad()

for inp, target in iterator:

with timeit("batch"):
inp = inp.to(device, dtype=dtype)
target = target.to(device)
optimizer.zero_grad()

with scaling(scaler is not None, dtype):
output = model(inp)
loss = criterion(output, target)
give(loss=loss.item())

if scaler:
scaler.scale(loss).backward()
accelerator.mark_step()

scaler.step(optimizer)
scaler.update()
else:
loss.backward()
accelerator.mark_step()
optimizer.step()
with scaling(scaler is not None, dtype):
output = model(inp)
loss = criterion(output, target)
loader.add_loss(loss)

if scaler:
scaler.scale(loss).backward()
accelerator.mark_step()
accelerator.synchronize()

scaler.step(optimizer)
scaler.update()
else:
loss.backward()
accelerator.mark_step()
optimizer.step()

accelerator.mark_step()


class SyntheticData:
Expand All @@ -123,15 +115,19 @@ def __len__(self):

def dataloader(args):
train = datasets.ImageFolder(os.path.join(args.data, "train"), data_transforms)
train_loader = torch.utils.data.DataLoader(
train,
batch_size=args.batch_size,
num_workers=args.num_workers,
sampler=torch.utils.data.RandomSampler(
train,
replacement=True,
num_samples=len(train) * args.epochs
)
train_loader = DataloaderWrapperGiver(
torch.utils.data.DataLoader(
train,
batch_size=args.batch_size,
num_workers=args.num_workers,
sampler=torch.utils.data.RandomSampler(
train,
replacement=True,
num_samples=len(train) * args.epochs
)
),
accelerator.Event,
earlystop=60
)
return train_loader

Expand All @@ -145,22 +141,11 @@ def iobench(args):
device = accelerator.fetch_device(0)
dtype = float_dtype(args.precision)

def toiterator(loader):
with timeit("loader"):
return iter(loader)

with given() as gv:
for epoch in voir.iterate("main", range(args.epochs)):
with timeit("epoch"):

iterator = timeiterator(voir.iterate("train", toiterator(loader), True))

for inp, target in iterator:
with timeit("batch"):
inp = inp.to(device, dtype=dtype)
target = target.to(device)

accelerator.synchronize()
for epoch in range(args.epochs):
for inp, target in loader:
inp = inp.to(device, dtype=dtype)
target = target.to(device)


def main():
Expand Down Expand Up @@ -287,30 +272,35 @@ def trainbench(args):
if args.data:
train_loader = dataloader(args)
else:
train_loader = SyntheticData(
model=model,
device=device,
batch_size=args.batch_size,
n=1000,
fixed_batch=args.fixed_batch,
train_loader = DataloaderWrapperGiver(SyntheticData(
model=model,
device=device,
batch_size=args.batch_size,
n=1000,
fixed_batch=args.fixed_batch,
),
accelerator.Event,
earlystop=60
)

scaler = NoScale()
if torch.cuda.is_available():
scaler = accelerator.amp.GradScaler(enabled=is_fp16_allowed(args))

with given() as gv:
if not args.no_stdout:
gv.where("loss").display()

for epoch in voir.iterate("main", range(args.epochs)):
with timeit("epoch"):
if not args.no_stdout:
print(f"Begin training epoch {epoch}/{args.epochs}")
train_epoch(
model, criterion, optimizer, train_loader, device, scaler=scaler, dtype=float_dtype(args.precision)
)
for epoch in range(args.epochs):
train_epoch(
model,
criterion,
optimizer,
train_loader,
device,
scaler=scaler,
dtype=float_dtype(args.precision),
)


if __name__ == "__main__":
main()
try:
main()
except StopProgram:
pass
78 changes: 13 additions & 65 deletions benchmarks/torchvision_ddp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import torchvision.datasets as datasets

import voir
from voir.asynctimer import DataloaderWrapper, DataloaderWrapperSmuggle, StopProgram
from voir.smuggle import SmuggleWriter
from giving import give, given
from cantilever.core.timer import timeit, timeiterator, show_timings
Expand Down Expand Up @@ -56,20 +57,24 @@ def __init__(
self.rank = gpu_id
self.device = accelerator.fetch_device(gpu_id)
self.model = model.to(self.device)
self.train_data = train_data
self.train_data = DataloaderWrapperSmuggle(
train_data,
accelerator.Event,
rank=self.rank,
device=self.device,
earlystop=60
)
self.optimizer = optimizer
# self.model = FSDP(model, device_id=self.device)
self.model = DDP(model, device_ids=[self.device])
self.world_size = world_size
self.data_file = SmuggleWriter(sys.stdout)

def print(self, *args, **kwargs):
if self.rank == 0:
print(*args, **kwargs)

def _run_batch(self, source, targets):
with accelerator.amp.autocast(dtype=torch.bfloat16):

self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
Expand All @@ -83,75 +88,20 @@ def _run_batch(self, source, targets):
return loss.detach()

def _run_epoch(self, epoch):
def toiterator(loader):
with timeit("loader"):
return iter(loader)

sample_count = 0
losses = []
events = []

self.train_data.sampler.set_epoch(epoch)
loader = timeiterator(voir.iterate("train", toiterator(self.train_data), True))

start_event = accelerator.Event(enable_timing=True)
start_event.record()

for source, targets in loader:
end_event = accelerator.Event(enable_timing=True)

for source, targets in self.train_data:
with timeit("batch"):
source = source.to(self.device)
targets = targets.to(self.device)

n = len(source)
sample_count += n

loss = self._run_batch(source, targets)
losses.append(loss)

end_event.record()
events.append((start_event, end_event, n))
start_event = end_event

for start, end, n in events:
end.synchronize()
elapsed = start.elapsed_time(end) / 1000
rate = (n * self.world_size) / elapsed
self.log({
"task": "train",
"rate": rate,
"units": "items/s",
})

total_count = torch.tensor([sample_count], dtype=torch.int64, device=self.device)
dist.reduce(total_count, dst=0)

loss = sum([l.item() for l in losses]) / len(losses)
return total_count.item(), loss
self.train_data.add_loss(loss)

def train(self, max_epochs: int):
with given() as gv:
for epoch in range(max_epochs):
with timeit("epoch") as timer:
total_count, loss = self._run_epoch(epoch)

self.perf(loss, total_count, timer)

def log(self, data):
if self.rank == 0 and self.data_file is not None:
msg = json.dumps(data)
print(msg, file=self.data_file)
print(msg)

def perf(self, loss, total_count, timer):
if self.rank == 0:
self.log({"task": "train", "loss": loss})
# self.log({
# "task": "train",
# "rate": total_count / (timer.end - timer.start),
# "units": "items/s",
# })
self._run_epoch(epoch)


def image_transforms():
Expand All @@ -168,7 +118,6 @@ def image_transforms():

def prepare_dataloader(dataset: Dataset, args):
dsampler = DistributedSampler(dataset)
# next(iter(dsampler))

return DataLoader(
dataset,
Expand Down Expand Up @@ -231,11 +180,10 @@ def worker_main(rank: int, world_size: int, args):
destroy_process_group()

print(f"<<< rank: {rank}")
except StopProgram:
print("Early stopping")
except Exception as err:
print(err)
finally:
if rank == 0:
show_timings(True)


def main():
Expand Down

0 comments on commit 009a857

Please sign in to comment.