diff --git a/.gitignore b/.gitignore index 8fedda861..5c661d462 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ output/ workspace/ .pin/tmp-* dry/ + +stderr.txt +stdout.txt diff --git a/benchmarks/accelerate_opt/benchfile.py b/benchmarks/accelerate_opt/benchfile.py index 8dfe62c30..b0ad099cd 100644 --- a/benchmarks/accelerate_opt/benchfile.py +++ b/benchmarks/accelerate_opt/benchfile.py @@ -7,6 +7,12 @@ ) from milabench.pack import Package from milabench.utils import select_nodes +from milabench.sizer import resolve_argv + + +def resolve_placeholder(pack, name): + placeholder = pack.config["argv"][name] + return resolve_argv(pack, [placeholder]) class AccelerateBenchmark(Package): @@ -14,7 +20,8 @@ class AccelerateBenchmark(Package): def make_env(self): env = super().make_env() - env["OMP_NUM_THREADS"] = str(self.config["argv"]["--cpus_per_gpu"]) + value = resolve_placeholder(pack, "--cpus_per_gpu") + env["OMP_NUM_THREADS"] = str(value) return env def build_prepare_plan(self): @@ -22,7 +29,7 @@ def build_prepare_plan(self): self, "accelerate", "launch", - "--mixed_precision=fp16", + "--mixed_precision=bf16", "--num_machines=1", "--dynamo_backend=no", "--num_processes=1", diff --git a/benchmarks/brax/voirfile.py b/benchmarks/brax/voirfile.py index 771927e50..a26eb5d6d 100644 --- a/benchmarks/brax/voirfile.py +++ b/benchmarks/brax/voirfile.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate @@ -40,3 +41,8 @@ def instrument_main(ov, options: Config): early_stop(n=options.stop, key="rate", task="train"), gpu_monitor(poll_interval=3), ) + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/dlrm/voirfile.py b/benchmarks/dlrm/voirfile.py index d7b267c2e..2c08da3ed 100644 --- a/benchmarks/dlrm/voirfile.py +++ b/benchmarks/dlrm/voirfile.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate @@ -53,3 +54,8 @@ def instrument_main(ov, options: Config): ov.probe( "//run(inputBatch as batch, !#loop_inputBatch as step, !!#endloop_inputBatch as step_end)" ).augment(task=lambda: "train").give() + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/huggingface/voirfile.py b/benchmarks/huggingface/voirfile.py index 885560db6..6937bc2c0 100644 --- a/benchmarks/huggingface/voirfile.py +++ b/benchmarks/huggingface/voirfile.py @@ -3,6 +3,7 @@ from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate @@ -43,3 +44,8 @@ def instrument_main(ov, options: Config): os.environ["VOIR_EARLYSTOP_COUNT"] = str(options.stop) os.environ["VOIR_EARLYSTOP_SKIP"] = str(options.skip) + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/llama/main.py b/benchmarks/llama/main.py index 499bd7263..696d16288 100755 --- a/benchmarks/llama/main.py +++ b/benchmarks/llama/main.py @@ -8,6 +8,7 @@ import torch +from voir.phase import StopProgram from benchmate.monitor import setupvoir import torchcompat.core as accelerator diff --git a/benchmarks/llama/voirfile.py b/benchmarks/llama/voirfile.py new file mode 100644 index 000000000..04ab07b99 --- /dev/null +++ b/benchmarks/llama/voirfile.py @@ -0,0 +1,9 @@ +from voir.phase import StopProgram + + +@configurable +def instrument_main(ov): + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/stargan/voirfile.py b/benchmarks/stargan/voirfile.py index 9e35648db..94c826568 100644 --- a/benchmarks/stargan/voirfile.py +++ b/benchmarks/stargan/voirfile.py @@ -2,6 +2,7 @@ import os from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate @@ -43,3 +44,8 @@ def instrument_main(ov, options: Config): os.environ["VOIR_EARLYSTOP_COUNT"] = str(options.stop) os.environ["VOIR_EARLYSTOP_SKIP"] = str(options.skip) + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/super-slomo/voirfile.py b/benchmarks/super-slomo/voirfile.py index 77f00ad6c..45776ddd3 100644 --- a/benchmarks/super-slomo/voirfile.py +++ b/benchmarks/super-slomo/voirfile.py @@ -2,6 +2,7 @@ import os from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate @@ -42,3 +43,8 @@ def instrument_main(ov, options: Config): os.environ["VOIR_EARLYSTOP_COUNT"] = str(options.stop) os.environ["VOIR_EARLYSTOP_SKIP"] = str(options.skip) + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/timm/voirfile.py b/benchmarks/timm/voirfile.py index 5dbe351be..c85406aaf 100644 --- a/benchmarks/timm/voirfile.py +++ b/benchmarks/timm/voirfile.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log, rate import torchcompat.core as accelerator @@ -70,3 +71,8 @@ def instrument_main(ov, options: Config): instruments.append(early_stop(n=options.stop, key="rate", task="train", signal="stop")) ov.require(*instruments) + + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmarks/torchvision/voirfile.py b/benchmarks/torchvision/voirfile.py index 90c5e00a5..d9395ab07 100644 --- a/benchmarks/torchvision/voirfile.py +++ b/benchmarks/torchvision/voirfile.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from voir import configurable +from voir.phase import StopProgram from voir.instruments import dash, early_stop, gpu_monitor, log from benchmate.observer import BenchObserver @@ -58,5 +59,7 @@ def instrument_main(ov, options: Config): probe = ov.probe("//train_epoch > criterion", overridable=True) probe['criterion'].override(observer.criterion) - - + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") diff --git a/benchmarks/torchvision_ddp/voirfile.py b/benchmarks/torchvision_ddp/voirfile.py new file mode 100644 index 000000000..04ab07b99 --- /dev/null +++ b/benchmarks/torchvision_ddp/voirfile.py @@ -0,0 +1,9 @@ +from voir.phase import StopProgram + + +@configurable +def instrument_main(ov): + try: + yield ov.phases.run_script + except StopProgram: + print("early stopped") \ No newline at end of file diff --git a/benchmate/benchmate/dataloader.py b/benchmate/benchmate/dataloader.py index 0e3caf84c..ada211ffb 100644 --- a/benchmate/benchmate/dataloader.py +++ b/benchmate/benchmate/dataloader.py @@ -34,8 +34,8 @@ def generate_tensors(batch_size, shapes, device): return tensors -def generate_tensor_classification(model, batch_size, in_shape, device): - model = model.to(device) +def generate_tensor_classification(original_model, batch_size, in_shape, device): + model = original_model.to(device=device) inp = torch.randn((batch_size, *in_shape), device=device) out = torch.rand_like(model(inp)) return inp, out @@ -195,7 +195,10 @@ def pytorch(folder, batch_size, num_workers, distributed=False, epochs=60): def synthetic(model, batch_size, fixed_batch): return SyntheticData( tensors=generate_tensor_classification( - model, batch_size, (3, 244, 244), device=accelerator.fetch_device(0) + model, + batch_size, + (3, 244, 244), + device=accelerator.fetch_device(0) ), n=1000, fixed_batch=fixed_batch, diff --git a/benchmate/benchmate/metrics.py b/benchmate/benchmate/metrics.py index b6ca483c7..29d3aa511 100644 --- a/benchmate/benchmate/metrics.py +++ b/benchmate/benchmate/metrics.py @@ -79,12 +79,19 @@ def push(self, pusher): class LazyLossPusher(LazyMetricPusher): def record(self, loss): + value = loss # no .item() we do not want to sync - self.append(loss.detach()) + if hasattr(loss, "detach"): + value = loss.detach() + self.append(value) def materialize(self, loss): + value = loss # synch here is fine - return {"loss": loss.item(), "task": self.task} + if hasattr(loss, "item"): + value = loss.item() + + return {"loss": value, "task": self.task} class CPUTimer: diff --git a/benchmate/benchmate/observer.py b/benchmate/benchmate/observer.py index 5ead66a5b..62aa84a58 100644 --- a/benchmate/benchmate/observer.py +++ b/benchmate/benchmate/observer.py @@ -23,7 +23,7 @@ class BenchObserver: """ def __init__( - self, *args, backward_callback=None, step_callback=None, stdout=False, **kwargs + self, *args, backward_callback=None, step_callback=None, stdout=False, rank=None, **kwargs ): self.wrapped = None self.args = args @@ -32,6 +32,7 @@ def __init__( self.optimizer_step_callback = step_callback self.stdout = stdout self.task = "train" + self.rank = rank self.losses = LazyLossPusher(self.task) self.pusher = give_push() @@ -43,7 +44,8 @@ def on_iterator_stop_iterator(self): self.losses.push(self.pusher) def record_loss(self, loss): - self.losses.record(loss) + if self.rank is None or self.rank == 1: + self.losses.record(loss) return loss def override_return_value(self, function, override): @@ -62,7 +64,7 @@ def override_return_value(self, function, override): def loader(self, loader): """Wrap a dataloader or an iterable which enable accurate measuring of time spent in the loop's body""" self.wrapped = TimedIterator( - loader, *self.args, push=self.pusher, **self.kwargs + loader, *self.args, rank=self.rank, push=self.pusher, **self.kwargs ) self.wrapped.task = self.task self.wrapped.on_iterator_stop_iterator = self.on_iterator_stop_iterator diff --git a/benchmate/benchmate/warden.py b/benchmate/benchmate/warden.py new file mode 100644 index 000000000..ac8650d43 --- /dev/null +++ b/benchmate/benchmate/warden.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass +import re +import os +import subprocess +import traceback +import signal + +from voir.instruments.gpu import get_gpu_info +from milabench.syslog import syslog + +@dataclass +class ProcessInfo: + gpu: int + pid: int + type: str + process_name: str + memory: int + unit: str + + +def _hpu_parse_processes(): + output = subprocess.check_output(["hl-smi"], text=True) + + line_format = re.compile( + r"\|(\s+)(?P\d+)(\s+)(?P\d+)(\s+)(?P\w+)(\s+)(?P\w+)(\s+)(?P\d+)((?P\w+))(\s+)" + ) + + info = [] + for line in output.split("\n"): + if match := line_format.match(line): + info.append(ProcessInfo(**match.groupdict())) + + return info + + + +def _default(): + return [] + +backends = { + "hpu": _hpu_parse_processes, + "cpu": _default +} + + +class GPUProcessWarden: + """Ensure all the process using the GPU are killed before & after the bench""" + def __init__(self, kill_on_start=True, kill_on_end=True): + self.gpus = get_gpu_info() + self.arch = self.gpus['arch'] + self.fetch_fun = backends.get(self.arch, _default) + self.kill_on_start = kill_on_start + self.kill_on_end = kill_on_end + self.dead_processes = [] + + def __enter__(self): + if self.kill_on_start: + self.ensure_free() + + return self + + def __exit__(self, *args): + if self.kill_on_end: + self.ensure_free() + + return None + + def fetch_processes(self): + try: + return self.fetch_fun() + except : + traceback.print_exc() + return [] + + def kill(self, pid, signal): + if pid in self.dead_processes: + return + + try: + os.kill(pid, signal): + except ProcessLookupError: + self.dead_processes.append(pid) + + def ensure_free(self): + processes = self.fetch_processes() + if len(processes) == 0: + return + + syslog("Found {0} still using devices after bench ended", len(processes)) + + # Keyboard interrupt + for process in processes: + self.kill(process.pid, signal.SIGINT) + + # Sig Term, please close now + for process in processes: + self.kill(process.pid, signal.SIGTERM) + + # Sig Kill, just die + for process in processes: + self.kill(process.pid, signal.SIGKILL) + diff --git a/config/base.yaml b/config/base.yaml index 0c0d45f76..29c9163a5 100644 --- a/config/base.yaml +++ b/config/base.yaml @@ -232,7 +232,7 @@ resnet152-ddp: --model: resnet152 --batch-size: 256 --num-workers: "auto({n_worker}, 8)" - --loader: dali + --loader: torch efficientnet_b4: inherits: _torchvision diff --git a/milabench/alt_async.py b/milabench/alt_async.py index 457be5716..6c48ece87 100644 --- a/milabench/alt_async.py +++ b/milabench/alt_async.py @@ -11,6 +11,7 @@ from functools import wraps from voir.proc import run as voir_run +from .syslog import syslog class FeedbackEventLoop(type(asyncio.get_event_loop())): @@ -165,13 +166,115 @@ async def wrapped(*args, **kwargs): return wrapped -def destroy(*processes): +def _kill_pid_with_delay(pid, sig, method, step, delay): + acc = 0 + try: + while acc < delay: + method(pid, sig) + time.sleep(step) + acc += step + return pid, method, acc, sig + except ProcessLookupError: + # success + return None, method, acc, sig + except PermissionError: + syslog("Not allowed to kill pid {0}", pid) + return None, method, acc, sig + except OSError: + syslog("Unhandled os error for pid {0}", pid) + return None, method, acc, sig + + +def _kill_proc_with_delay(proc, sig, delay): + start = - time.time() + def elasped(): + return start + time.time() + + proc.send_signal(sig) + try: + proc.wait(timeout=delay) + + # success + return None, None, elasped(), sig + except subprocess.TimeoutExpired: + return pid, None, elasped(), sig + + +def _filter_process_groups(processes): + group_pids = [] + proc_pids = [] + already_dead = [] + for proc in processes: + if proc.returncode is not None: + already_dead.append((proc, proc.id)) + continue + if getattr(proc, "did_setsid", False): - os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + group_pids.append((proc, os.getpgid(proc.pid))) else: - proc.kill() - # TODO: send SIGKILL after a certain time if the process is not dead + proc_pids.append((proc, proc.pid)) + + return proc_pids, group_pids, already_dead + + +def destroy_all(processes, delay=30): + from concurrent.futures import ThreadPoolExecutor + + futures = [] + def submit(pool, pid, signal, method): + args = ( + pid, + signal, + method, + 1, + delay + ) + futures.append(pool.submit(_kill_with_delay, *args)) + + signal_flow = { + None : signal.SIGTERM, + signal.SIGTERM: signal.SIGKILL + } + + def nextsignal(previous=None): + return signal_flow.get(previous, None) + + proc_pids, group_pids, already_dead = _filter_process_groups(processes) + stats = [] + with ThreadPoolExecutor(max_workers=8) as pool: + for proc, pid in group_pids: + submit(pool, pid, nextsignal(), os.killpg) + + for proc, pid in group_pids: + submit(pool, pid, nextsignal(), os.kill) + + futures = list(reversed(futures)) + + while futures: + future = futures.pop() + pid, method, elapsed, sig = future.result(timeout=delay + 1) + + # on failure submit a SIGKILL + if pid is not None: + if (sig := nextsignal(sig)) is not None: + futures.append(submit(pool, pid, sig, method)) + else: + syslog("{0} failed on pid {1}", sig, pid) + + stats.append([pid, method, elapsed, sig]) + + # + # We want data on this: `SIGKILL` should never be necesary + # + to_be_killed = len(processes) - len(already_dead) + syslog("{0} kill event for {1} processes ({2} already dead)", len(starts), to_be_killed, len(already_dead)) + for pid, method, elapsed, sig in stats: + syslog(" - {0} was killed with {1} after {2} sec ({3})", pid, sig, elapsed, method) + + +def destroy(*processes): + destroy_all(processes, delay=30) @feedback_runner diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 9e2ca1d77..9fad6a920 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -678,7 +678,7 @@ def _argv(self, **_) -> List: # -- "accelerate", "launch", - "--mixed_precision=fp16", + "--mixed_precision=bf16", "--dynamo_backend=no", f"--machine_rank={self.rank}", f"--num_machines={num_machines}", diff --git a/milabench/log.py b/milabench/log.py index 3224b412f..80fec44a4 100644 --- a/milabench/log.py +++ b/milabench/log.py @@ -148,7 +148,7 @@ def __call__(self, entry): elif event == "config": def _show(k, entry): - if k in ("meta", "system"): + if k.startswith("config.system") or k: return if isinstance(entry, dict): @@ -159,6 +159,9 @@ def _show(k, entry): _show("config", data) + elif event == "meta": + pass + elif event == "message": console.pretty(T.bold(f"[{event}]"), data["message"]) diff --git a/milabench/syslog.py b/milabench/syslog.py new file mode 100644 index 000000000..6a8f51f37 --- /dev/null +++ b/milabench/syslog.py @@ -0,0 +1,6 @@ +import sys + +stderr = sys.stderr + +def syslog(fmt, *args, **kwargs): + print(fmt.format(*args, **kwargs), file=stderr) diff --git a/scripts/article/run_batch_x_worker.sh b/scripts/article/run_batch_x_worker.sh index b79e4044d..27e023e6b 100644 --- a/scripts/article/run_batch_x_worker.sh +++ b/scripts/article/run_batch_x_worker.sh @@ -10,18 +10,20 @@ FINAL_OUTPUT="$HOME/batch_x_worker" export MILABENCH_SIZER_SAVE="$FINAL_OUTPUT/scaling.yaml" mkdir -p $FINAL_OUTPUT -module load cuda/12.3.2 +# module load cuda/12.3.2 # # Install # if [ "$DRY" -eq 0 ]; then export MILABENCH_PREPARE=1 - source $SCRIPT_DIR/run_cuda.sh + source $SCRIPT_DIR/run_hpu.sh fi source $MILABENCH_WORDIR/env/bin/activate +pip install -e $MILABENCH_WORDIR/milabench + maybe_run() { local name=$1 local first_part=$(echo "$name" | cut -d'.' -f1) @@ -36,7 +38,7 @@ maybe_run() { else echo "running $name" milabench prepare - milabench run --run-name $name + milabench run --run-name $name --exclude llama mv $MILABENCH_BASE/runs/* $FINAL_OUTPUT/ fi fi diff --git a/scripts/article/run_cuda.sh b/scripts/article/run_cuda.sh index eefee0491..6cfdc96fb 100644 --- a/scripts/article/run_cuda.sh +++ b/scripts/article/run_cuda.sh @@ -22,7 +22,7 @@ install_prepare() { fi if [ ! -d "$MILABENCH_WORDIR/milabench" ]; then - git clone https://github.com/mila-iqia/milabench.git -b worker_x_batch + git clone https://github.com/mila-iqia/milabench.git fi . $MILABENCH_WORDIR/env/bin/activate diff --git a/scripts/article/run_hpu.sh b/scripts/article/run_hpu.sh index ba90f5868..c9c7541ca 100644 --- a/scripts/article/run_hpu.sh +++ b/scripts/article/run_hpu.sh @@ -9,6 +9,10 @@ export MILABENCH_CONFIG="$MILABENCH_WORDIR/milabench/config/standard.yaml" export MILABENCH_VENV="$MILABENCH_WORDIR/env" export BENCHMARK_VENV="$MILABENCH_WORDIR/results/venv/torch" +if [ -z "${MILABENCH_PREPARE}" ]; then + export MILABENCH_PREPARE=0 +fi + install_prepare() { mkdir -p $MILABENCH_WORDIR cd $MILABENCH_WORDIR @@ -71,13 +75,15 @@ else . $MILABENCH_WORDIR/env/bin/activate fi -cd $MILABENCH_WORDIR +if [ "$MILABENCH_PREPARE" -eq 0 ]; then + cd $MILABENCH_WORDIR -# -# Run the benchmakrs -milabench run "$@" + # + # Run the benchmakrs + milabench run "$@" -# -# Display report -milabench report --runs $MILABENCH_WORDIR/results/runs + # + # Display report + milabench report --runs $MILABENCH_WORDIR/results/runs +fi \ No newline at end of file