Skip to content

Commit

Permalink
Use benchmate TimedIterator and BenchObserver instead
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre.delaunay committed Jun 11, 2024
1 parent c8a6fe5 commit 69baffe
Show file tree
Hide file tree
Showing 20 changed files with 646 additions and 205 deletions.
47 changes: 9 additions & 38 deletions benchmarks/accelerate_opt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ def arguments():
default_data_collator,
get_scheduler,
)
from voir.smuggle import SmuggleWriter
from voir.instruments.gpu import get_gpu_info
from voir.instruments.utils import Monitor
from benchmate.observer import BenchObserver

logger = get_logger(__name__)

Expand Down Expand Up @@ -145,35 +143,11 @@ class CustomInitProcessGroupKwargs(InitProcessGroupKwargs):
else:
accelerator = Accelerator()

# Set up logging for milabench (only in the run phase, for the main process)
monitor = None
if not is_prepare_phase and accelerator.is_main_process:
# Set up logging for milabench (only in the run phase, for the main process)

data_file = SmuggleWriter(sys.stdout)
def mblog(data):
if data_file is not None:
print(json.dumps(data), file=data_file)

def monitor_fn():
data = {
gpu["device"]: {
"memory": [gpu["memory"]["used"], gpu["memory"]["total"]],
"load": gpu["utilization"]["compute"],
"temperature": gpu["temperature"],
}
for gpu in get_gpu_info()["gpus"].values()
}
mblog({"task": "main", "gpudata": data})

monitor_fn()
monitor = Monitor(3, monitor_fn)
monitor.start()

else:

def mblog(data):
pass

monitor = None
from benchmate.common import opt_voir
monitor = opt_voir()

logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -374,7 +348,6 @@ def group_texts(examples):
total_batch_size = (
per_gpu_batch_size * accelerator.num_processes * gradient_accumulation_steps
)
print("HERE", per_gpu_batch_size, total_batch_size)

logger.info("***** Running training *****")
logger.info(f" Num examples = {len(train_dataset)}")
Expand All @@ -388,28 +361,26 @@ def group_texts(examples):

completed_steps = 0
starting_epoch = 0
last_log_time = time.time()

from voir.wrapper import Wrapper
wrapper = Wrapper(
observer = BenchObserver(
event_fn=acc.Event,
earlystop=30,
rank=int(os.environ["RANK"]),
device=acc.fetch_device(int(os.environ["RANK"])),
stdout=True,
batch_size_fn=lambda batch: batch["labels"].shape[0]
)
loader = wrapper.loader(train_dataloader)
loader = observer.loader(train_dataloader)

for epoch in range(starting_epoch, num_train_epochs):
model.train()
for step, batch in enumerate(loader):
outputs = model(**batch)
loss = outputs.loss
loss = loss / gradient_accumulation_steps

if accelerator.is_main_process:
loader.add_loss(loss)
# mblog({"task": "train", "loss": loss.detach().item()})
observer.record_loss(loss)

accelerator.backward(loss)

Expand Down
60 changes: 1 addition & 59 deletions benchmarks/flops/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
#!/usr/bin/env python

from argparse import ArgumentParser
import json
import time
import sys
import multiprocessing

import torch
import torchcompat.core as accelerator

from voir.smuggle import SmuggleWriter
from voir.instruments.gpu import get_gpu_info
from voir.instruments.utils import Monitor
from benchmate.common import setupvoir

KILO = 1e3
MEGA = 1e6
Expand All @@ -31,31 +26,6 @@ def synchronize():
accelerator.synchronize()


def _worker(state, queue, func, delay):
import time

while state["running"]:
queue.put(func())
time.sleep(delay)


class Monitor:
def __init__(self, delay, func):
self.manager = multiprocessing.Manager()
self.state = self.manager.dict()
self.state["running"] = True
self.results = multiprocessing.Queue()
self.process = multiprocessing.Process(
target=_worker, args=(self.state, self.results, func, delay),
)

def start(self):
self.process.start()

def stop(self):
self.state["running"] = False
self.process.join()


def modelflops(
model: torch.nn.Module, shape, repeat=10, dtype=torch.float32, unit=TERA
Expand Down Expand Up @@ -123,34 +93,6 @@ def f(N, R=30, m=5000000, n=256, unit=TERA, dtype=torch.float32, log=None):
empty_cache()


def setupvoir():
# wtf this do
data_file = SmuggleWriter(sys.stdout)
# data_file = sys.stdout

def log(data):
if data_file is not None:
data["t"] = time.time()
print(json.dumps(data), file=data_file)

while not monitor.results.empty():
print(json.dumps(monitor.results.get()), file=data_file)

def monitor_fn():
data = {
gpu["device"]: {
"memory": [gpu["memory"]["used"], gpu["memory"]["total"],],
"load": gpu["utilization"]["compute"],
"temperature": gpu["temperature"],
"power": gpu["power"],
}
for gpu in get_gpu_info()["gpus"].values()
}
return {"task": "main", "gpudata": data, "t": time.time()}

monitor = Monitor(0.5, monitor_fn)
monitor.start()
return log, monitor


def main():
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/huggingface/bench/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import torchcompat.core as accelerator

import transformers
from voir.wrapper import Wrapper
from benchmate.observer import BenchObserver

from .models import models
from .synth import SyntheticData, generators
Expand Down Expand Up @@ -102,17 +102,17 @@ def batch_size(bs):
print(list(bs.keys()))
raise RuntimeError("Batch size unknown")

wrapper = Wrapper(
observer = BenchObserver(
event_fn=accelerator.Event,
batch_size_fn=batch_size
)
loader = wrapper.loader(self.loader)
loader = observer.loader(self.loader)

for data in loader:
data = {k: v.to(self.device) for k, v in data.items()}
loss = self.step(data)

loader.add_loss(loss)
observer.record_loss(loss)


def parser():
Expand Down
16 changes: 16 additions & 0 deletions benchmarks/lightning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

# Torchvision

Benchmark torchvision models on fake ImageNet data.

## prepare

Generates 1000 training samples in `$MILABENCH_BASE/data/FakeImageNet`, to be read during training.

## run

Any of the following models can be used with `--model`:

* resnet18
* resnet50
* ...
10 changes: 10 additions & 0 deletions benchmarks/lightning/benchfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from milabench.pack import Package


class LightningBenchmark(Package):
base_requirements = "requirements.in"
prepare_script = "prepare.py"
main_script = "main.py"


__pack__ = LightningBenchmark
Empty file added benchmarks/lightning/main.py
Empty file.
2 changes: 2 additions & 0 deletions benchmarks/lightning/prepare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env python

5 changes: 5 additions & 0 deletions benchmarks/lightning/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
torch
torchvision
torchcompat
lightning
voir
Empty file.
66 changes: 1 addition & 65 deletions benchmarks/llama/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import argparse
import time
import sys
import multiprocessing

import torch

from voir.smuggle import SmuggleWriter
from voir.instruments.gpu import get_gpu_info
from benchmate.common import setupvoir
import torchcompat.core as accelerator

root = os.path.dirname(__file__)
Expand All @@ -28,66 +26,6 @@ def available_models():
return models


def _worker(state, queue, func, delay):
import time

while state["running"]:
queue.put(func())
time.sleep(delay)


class Monitor:
def __init__(self, delay, func):
self.manager = multiprocessing.Manager()
self.state = self.manager.dict()
self.state["running"] = True
self.results = multiprocessing.Queue()
self.process = multiprocessing.Process(
target=_worker,
args=(self.state, self.results, func, delay),
)

def start(self):
self.process.start()

def stop(self):
self.state["running"] = False
self.process.join()


def setupvoir():
# wtf this do
data_file = SmuggleWriter(sys.stdout)
# data_file = sys.stdout

def log(data):
if data_file is not None:
data["t"] = time.time()
print(json.dumps(data), file=data_file)

while not monitor.results.empty():
print(json.dumps(monitor.results.get()), file=data_file)

def monitor_fn():
data = {
gpu["device"]: {
"memory": [
gpu["memory"]["used"],
gpu["memory"]["total"],
],
"load": gpu["utilization"]["compute"],
"temperature": gpu["temperature"],
"power": gpu["power"],
}
for gpu in get_gpu_info()["gpus"].values()
}
return {"task": "main", "gpudata": data, "t": time.time()}

monitor = Monitor(0.5, monitor_fn)
monitor.start()
return log, monitor


class WrappedTokenizer:
def __init__(self, tokenizer):
self.tokenizer = tokenizer
Expand Down Expand Up @@ -121,9 +59,7 @@ def huggingface_main(args, model, config):
import transformers
from transformers import LlamaForCausalLM, LlamaTokenizerFast
from transformers.models.llama.configuration_llama import LlamaConfig
from voir.wrapper import DataloaderWrapper, Wrapper
from datasets import load_dataset
import optimum.habana

# Dataset here
println("Dataset")
Expand Down
9 changes: 4 additions & 5 deletions benchmarks/stargan/stargan/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import os
import time
import datetime
from giving import give
import voir.wrapper
from benchmate.observer import BenchObserver


class Solver(object):
Expand Down Expand Up @@ -227,11 +226,11 @@ def train(self):
data_loader = self.synth_loader

# Fetch fixed inputs for debugging.
wrapper = voir.wrapper.Wrapper(
observer = BenchObserver(
event_fn=accelerator.Event,
batch_size_fn=lambda x: len(x[0])
)
loader = wrapper.loader(data_loader)
loader = observer.loader(data_loader)

data_iter = iter(loader)
x_fixed, c_org = next(data_iter)
Expand Down Expand Up @@ -316,7 +315,7 @@ def train(self):
+ self.lambda_gp * d_loss_gp
)
# give(task="train", loss=d_loss.item())
loader.add_loss(d_loss)
observer.record_loss(d_loss.detach())
self.reset_grad()

d_loss.backward()
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/stargan/voirfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class Config:

@configurable
def instrument_main(ov, options: Config):
import torchcompat.core as accelerator
from voir.wrapper import earlystop_count
# import torchcompat.core as accelerator
# from benchmate.observer import BenchObserver

yield ov.phases.init

Expand Down
Loading

0 comments on commit 69baffe

Please sign in to comment.