diff --git a/benchmarks/accelerate_opt/benchfile.py b/benchmarks/accelerate_opt/benchfile.py index b0ad099cd..962caa501 100644 --- a/benchmarks/accelerate_opt/benchfile.py +++ b/benchmarks/accelerate_opt/benchfile.py @@ -7,20 +7,13 @@ ) from milabench.pack import Package from milabench.utils import select_nodes -from milabench.sizer import resolve_argv - - -def resolve_placeholder(pack, name): - placeholder = pack.config["argv"][name] - return resolve_argv(pack, [placeholder]) - class AccelerateBenchmark(Package): base_requirements = "requirements.in" def make_env(self): env = super().make_env() - value = resolve_placeholder(pack, "--cpus_per_gpu") + value = self.resolve_argument(self, "--cpus_per_gpu") env["OMP_NUM_THREADS"] = str(value) return env diff --git a/benchmate/benchmate/dataloader.py b/benchmate/benchmate/dataloader.py index ada211ffb..f29eae79a 100644 --- a/benchmate/benchmate/dataloader.py +++ b/benchmate/benchmate/dataloader.py @@ -195,10 +195,7 @@ def pytorch(folder, batch_size, num_workers, distributed=False, epochs=60): def synthetic(model, batch_size, fixed_batch): return SyntheticData( tensors=generate_tensor_classification( - model, - batch_size, - (3, 244, 244), - device=accelerator.fetch_device(0) + model, batch_size, (3, 244, 244), device=accelerator.fetch_device(0) ), n=1000, fixed_batch=fixed_batch, diff --git a/benchmate/benchmate/monitor.py b/benchmate/benchmate/monitor.py index d277c29d8..e21931458 100644 --- a/benchmate/benchmate/monitor.py +++ b/benchmate/benchmate/monitor.py @@ -24,7 +24,7 @@ def monitor_fn(): } for gpu in get_gpu_info()["gpus"].values() } - mblog({"task": "main", "gpudata": data}) + mblog({"task": "train", "gpudata": data}) monitor_fn() monitor = Monitor(3, monitor_fn) @@ -74,7 +74,7 @@ def monitor_fn(): } for gpu in get_gpu_info()["gpus"].values() } - return {"task": "main", "gpudata": data, "time": time.time(), "units": "s"} + return {"task": "train", "gpudata": data, "time": time.time(), "units": "s"} monitor = CustomMonitor(0.5, monitor_fn) diff --git a/benchmate/benchmate/observer.py b/benchmate/benchmate/observer.py index 62aa84a58..3155e4e3f 100644 --- a/benchmate/benchmate/observer.py +++ b/benchmate/benchmate/observer.py @@ -23,7 +23,13 @@ class BenchObserver: """ def __init__( - self, *args, backward_callback=None, step_callback=None, stdout=False, rank=None, **kwargs + self, + *args, + backward_callback=None, + step_callback=None, + stdout=False, + rank=None, + **kwargs, ): self.wrapped = None self.args = args diff --git a/benchmate/benchmate/warden.py b/benchmate/benchmate/warden.py index ac8650d43..21d9ab738 100644 --- a/benchmate/benchmate/warden.py +++ b/benchmate/benchmate/warden.py @@ -1,53 +1,89 @@ -from dataclasses import dataclass -import re +import logging import os +import re +import signal import subprocess +import time import traceback -import signal +import warnings +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from dataclasses import dataclass -from voir.instruments.gpu import get_gpu_info from milabench.syslog import syslog +from voir.instruments.gpu import get_gpu_info + +log = logging.getLogger(__name__) + @dataclass class ProcessInfo: - gpu: int pid: int - type: str - process_name: str - memory: int - unit: str + gpu_name: int = None + type: str = None + process_name: str = None + memory: int = None + used_memory: str = None + gpu_bus_id: str = None + gpu_serial: str = None + gpu_uuid: str = None def _hpu_parse_processes(): output = subprocess.check_output(["hl-smi"], text=True) line_format = re.compile( - r"\|(\s+)(?P\d+)(\s+)(?P\d+)(\s+)(?P\w+)(\s+)(?P\w+)(\s+)(?P\d+)((?P\w+))(\s+)" + r"\|(\s+)(?P\d+)(\s+)(?P\d+)(\s+)(?P\w+)(\s+)(?P\w+)(\s+)(?P\d+)((?P\w+))(\s+)" ) - + info = [] for line in output.split("\n"): if match := line_format.match(line): info.append(ProcessInfo(**match.groupdict())) - + return info +def _cuda_parse_processes(): + metrics = [ + "pid", + "gpu_name", + "gpu_bus_id", + "gpu_serial", + "gpu_uuid", + "process_name", + "used_memory", + ] + query = ",".join(metrics) + cmd = ["nvidia-smi", f"--query-compute-apps={query}", "--format=csv,noheader"] + output = subprocess.check_output(cmd, text=True) + + info = [] + for line in output.split("\n"): + frags = line.splti(",") + info.append(ProcessInfo(**dict(*zip(metrics, frags)))) + return info + def _default(): return [] + backends = { "hpu": _hpu_parse_processes, - "cpu": _default + "cuda": _cuda_parse_processes, + # ROCM + # XPU + "cpu": _default, } class GPUProcessWarden: """Ensure all the process using the GPU are killed before & after the bench""" + def __init__(self, kill_on_start=True, kill_on_end=True): self.gpus = get_gpu_info() - self.arch = self.gpus['arch'] + self.arch = self.gpus["arch"] self.fetch_fun = backends.get(self.arch, _default) self.kill_on_start = kill_on_start self.kill_on_end = kill_on_end @@ -56,19 +92,19 @@ def __init__(self, kill_on_start=True, kill_on_end=True): def __enter__(self): if self.kill_on_start: self.ensure_free() - + return self def __exit__(self, *args): if self.kill_on_end: self.ensure_free() - + return None def fetch_processes(self): try: return self.fetch_fun() - except : + except: traceback.print_exc() return [] @@ -77,7 +113,7 @@ def kill(self, pid, signal): return try: - os.kill(pid, signal): + os.kill(pid, signal) except ProcessLookupError: self.dead_processes.append(pid) @@ -85,7 +121,7 @@ def ensure_free(self): processes = self.fetch_processes() if len(processes) == 0: return - + syslog("Found {0} still using devices after bench ended", len(processes)) # Keyboard interrupt @@ -100,3 +136,250 @@ def ensure_free(self): for process in processes: self.kill(process.pid, signal.SIGKILL) + +class Protected: + """Prevent a signal to be raised during the execution of some code""" + + def __init__(self): + self.signal_received = [] + self.handlers = {} + self.start = 0 + self.delayed = 0 + self.signal_installed = False + + def __enter__(self): + """Override the signal handlers with our delayed handler""" + self.signal_received = [] + + try: + self.handlers[signal.SIGINT] = signal.signal(signal.SIGINT, self.handler) + self.handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, self.handler) + self.signal_installed = True + + except ValueError: # ValueError: signal only works in main thread + warnings.warn( + "SIGINT/SIGTERM protection hooks could not be installed because " + "Runner is executing inside a thread/subprocess, results could get lost " + "on interruptions" + ) + + return self + + def stop(self): + pass + + def handler(self, sig, frame): + """Register the received signal for later""" + + log.warning("Delaying signal %d to finish operations", sig) + log.warning( + "Press CTRL-C again to terminate the program now (You may lose results)" + ) + + self.start = time.time() + self.signal_received.append((sig, frame)) + + # if CTRL-C is pressed again the original handlers will handle it + # and make the program stop + self.restore_handlers() + + self.stop() + + def restore_handlers(self): + """Restore old signal handlers""" + if not self.signal_installed: + return + + signal.signal(signal.SIGINT, self.handlers[signal.SIGINT]) + signal.signal(signal.SIGTERM, self.handlers[signal.SIGTERM]) + + self.signal_installed = False + + def maybe_stop(self): + """Raise the delayed signal if any or restore the old signal handlers""" + + if len(self.signal_received) == 0: + self.restore_handlers() + + else: + self.delayed = time.time() - self.start + + log.warning("Termination was delayed by %.4f s", self.delayed) + handler = self.handlers[self.signal_received[0]] + + if callable(handler): + for signal in self.signal_received: + handler(*signal) + + def __exit__(self, *args): + self.maybe_stop() + + +def destroy_processes(processes): + killer = ProcessKiller(30) + killer.kill(processes) + + +class ProcessKiller: + """Use a thread pool to kill n processes. + + * Send SIGTERM first + * Wait for 30 seconds + * Send SIGKILL if still alive + + """ + + def __init__(self, delay=30): + self.futures = [] + self.delay = delay + self.stats = [] + self.proc_pids = [] + self.group_pids = [] + self.already_dead = [] + # State machine Previous -> Next + self.signal_flow = {None: signal.SIGTERM, signal.SIGTERM: signal.SIGKILL} + + def kill(self, processes): + with ThreadPoolExecutor(max_workers=8) as pool: + self.queue_kill(pool, processes) + + self.wait(pool) + + self.report(processes) + + def report(self, processes): + # + # We want data on this: `SIGKILL` should never be necesary + # + to_be_killed = len(processes) - len(self.already_dead) + syslog( + "{0} kill event for {1} processes ({2} already dead)", + len(self.starts), + to_be_killed, + len(self.already_dead), + ) + for pid, method, elapsed, sig in self.stats: + syslog( + " - {0} was killed with {1} after {2} sec ({3})", + pid, + sig, + elapsed, + method, + ) + + def nextsignal(self, previous=None): + return self.signal_flow.get(previous, None) + + def submit(self, pool, pid, signal, method): + self.futures.append( + pool.submit( + ProcessKiller._kill_pid_with_delay, pid, signal, method, 1, self.delay + ) + ) + + def queue_kill(self, pool, processes): + ( + self.proc_pids, + self.group_pids, + self.already_dead, + ) = self._filter_process_groups(processes) + + for _, pid in self.group_pids: + self.submit(pool, pid, self.nextsignal(), os.killpg) + + for _, pid in self.proc_pids: + self.submit(pool, pid, self.nextsignal(), os.kill) + + def requeue(self, pool, pid, sig, method): + if (sig := self.nextsignal(sig)) is not None: + self.futures.append(self.submit(pool, pid, sig, method)) + else: + syslog("{0} failed on pid {1}", sig, pid) + + def wait(self, pool): + self.futures = list(reversed(self.futures)) + + while self.futures: + future = self.futures.pop() + pid, method, elapsed, sig = future.result(timeout=self.delay + 1) + + # on failure submit a SIGKILL + if pid is not None: + self.requeue(pool, pid, sig, method) + + self.stats.append([pid, method, elapsed, sig]) + + def _filter_process_groups(processes): + group_pids = [] + proc_pids = [] + already_dead = [] + + for proc in processes: + if proc.returncode is not None: + already_dead.append((proc, proc.id)) + continue + + if getattr(proc, "did_setsid", False): + group_pids.append((proc, os.getpgid(proc.pid))) + else: + proc_pids.append((proc, proc.pid)) + + return proc_pids, group_pids, already_dead + + @staticmethod + def _kill_pid_with_delay(pid, sig, method, step, delay): + acc = 0 + try: + while acc < delay: + method(pid, sig) + time.sleep(step) + acc += step + return pid, method, acc, sig + except ProcessLookupError: + # success + return None, method, acc, sig + except PermissionError: + syslog("Not allowed to kill pid {0}", pid) + return None, method, acc, sig + except OSError: + syslog("Unhandled os error for pid {0}", pid) + return None, method, acc, sig + + @staticmethod + def _kill_proc_with_delay(proc, sig, delay): + pid = proc.pid + start = -time.time() + + def elasped(): + return start + time.time() + + proc.send_signal(sig) + try: + proc.wait(timeout=delay) + + # success + return None, None, elasped(), sig + except subprocess.TimeoutExpired: + return pid, None, elasped(), sig + + +class SignalReceived(Exception): + pass + + +class SignalProtected(Protected): + def stop(self): + raise SignalReceived() + + +@contextmanager +def process_cleaner(): + """Delay signal handling until all the processes have been killed""" + processes = [] + + with SignalProtected(): + try: + yield processes + + except SignalReceived: + destroy_processes(processes) diff --git a/milabench/_version.py b/milabench/_version.py index bdd9c4326..f09618832 100644 --- a/milabench/_version.py +++ b/milabench/_version.py @@ -1,5 +1,5 @@ """This file is generated, do not modify""" -__tag__ = "v0.1.0-20-g7246295a" -__commit__ = "7246295a356186b55fa4b2b75480e3700c279b15" -__date__ = "2024-06-20 09:18:17 -0400" +__tag__ = "v0.1.0-30-g94b27a71" +__commit__ = "94b27a71145d3ba754a2713aeca60e5a28be4bc5" +__date__ = "2024-06-25 13:49:52 -0400" diff --git a/milabench/alt_async.py b/milabench/alt_async.py index 6c48ece87..f9fef364a 100644 --- a/milabench/alt_async.py +++ b/milabench/alt_async.py @@ -10,8 +10,8 @@ from collections import deque from functools import wraps +from benchmate.warden import process_cleaner, destroy_processes from voir.proc import run as voir_run -from .syslog import syslog class FeedbackEventLoop(type(asyncio.get_event_loop())): @@ -149,6 +149,13 @@ def proceed(self, coro): raise +def destroy(*processes): + for proc in processes: + if getattr(proc, "did_setsid", False): + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + else: + proc.kill() + def feedback_runner(gen): @wraps(gen) async def wrapped(*args, **kwargs): @@ -166,134 +173,30 @@ async def wrapped(*args, **kwargs): return wrapped -def _kill_pid_with_delay(pid, sig, method, step, delay): - acc = 0 - try: - while acc < delay: - method(pid, sig) - time.sleep(step) - acc += step - return pid, method, acc, sig - except ProcessLookupError: - # success - return None, method, acc, sig - except PermissionError: - syslog("Not allowed to kill pid {0}", pid) - return None, method, acc, sig - except OSError: - syslog("Unhandled os error for pid {0}", pid) - return None, method, acc, sig - - -def _kill_proc_with_delay(proc, sig, delay): - start = - time.time() - def elasped(): - return start + time.time() - - proc.send_signal(sig) - try: - proc.wait(timeout=delay) - - # success - return None, None, elasped(), sig - except subprocess.TimeoutExpired: - return pid, None, elasped(), sig - - -def _filter_process_groups(processes): - group_pids = [] - proc_pids = [] - already_dead = [] - - for proc in processes: - if proc.returncode is not None: - already_dead.append((proc, proc.id)) - continue - - if getattr(proc, "did_setsid", False): - group_pids.append((proc, os.getpgid(proc.pid))) - else: - proc_pids.append((proc, proc.pid)) +@feedback_runner +def run(argv, setsid=None, process_accumulator=None, info={}, **kwargs): + with process_cleaner() as processes: + if setsid: + kwargs["preexec_fn"] = os.setsid - return proc_pids, group_pids, already_dead - - -def destroy_all(processes, delay=30): - from concurrent.futures import ThreadPoolExecutor - - futures = [] - def submit(pool, pid, signal, method): - args = ( - pid, - signal, - method, - 1, - delay - ) - futures.append(pool.submit(_kill_with_delay, *args)) + mx = voir_run(argv, info=info, **kwargs, timeout=0) + processes.extend(mx.processes) - signal_flow = { - None : signal.SIGTERM, - signal.SIGTERM: signal.SIGKILL - } + if process_accumulator is not None: + process_accumulator.extend(mx.processes) - def nextsignal(previous=None): - return signal_flow.get(previous, None) + if setsid: + for proc in mx.processes: + proc.did_setsid = True - proc_pids, group_pids, already_dead = _filter_process_groups(processes) - stats = [] - with ThreadPoolExecutor(max_workers=8) as pool: - for proc, pid in group_pids: - submit(pool, pid, nextsignal(), os.killpg) - - for proc, pid in group_pids: - submit(pool, pid, nextsignal(), os.kill) - - futures = list(reversed(futures)) - - while futures: - future = futures.pop() - pid, method, elapsed, sig = future.result(timeout=delay + 1) - - # on failure submit a SIGKILL - if pid is not None: - if (sig := nextsignal(sig)) is not None: - futures.append(submit(pool, pid, sig, method)) - else: - syslog("{0} failed on pid {1}", sig, pid) - - stats.append([pid, method, elapsed, sig]) - - # - # We want data on this: `SIGKILL` should never be necesary - # - to_be_killed = len(processes) - len(already_dead) - syslog("{0} kill event for {1} processes ({2} already dead)", len(starts), to_be_killed, len(already_dead)) - for pid, method, elapsed, sig in stats: - syslog(" - {0} was killed with {1} after {2} sec ({3})", pid, sig, elapsed, method) - - -def destroy(*processes): - destroy_all(processes, delay=30) - - -@feedback_runner -def run(argv, setsid=None, info={}, process_accumulator=None, **kwargs): - if setsid: - kwargs["preexec_fn"] = os.setsid - mx = voir_run(argv, info=info, **kwargs, timeout=0) - if process_accumulator is not None: - process_accumulator.extend(mx.processes) - if setsid: - for proc in mx.processes: - proc.did_setsid = True - loop = asyncio.get_running_loop() - loop._multiplexers.append(mx) - for entry in mx: - if entry and entry.event == "stop": - destroy(*mx.processes) - yield entry + loop = asyncio.get_running_loop() + loop._multiplexers.append(mx) + for entry in mx: + if entry and entry.event == "stop": + destroy_processes(*mx.processes) + yield entry + def proceed(coro): loop = FeedbackEventLoop() diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 9fad6a920..566da182d 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -659,14 +659,7 @@ def _argv(self, **_) -> List: else ["--multi_gpu"] ) - # - # Can this logic be removed? - # - from ..sizer import new_argument_resolver - - resolver = new_argument_resolver(self.pack) - - cpu_per_process = resolver(str(self.pack.config["argv"]["--cpus_per_gpu"])) + cpu_per_process = self.resolve_argument(self, '--cpus_per_gpu') return [ # -- Run the command in the right venv # This could be inside the SSH Command diff --git a/milabench/log.py b/milabench/log.py index 80fec44a4..602fd02d9 100644 --- a/milabench/log.py +++ b/milabench/log.py @@ -148,7 +148,7 @@ def __call__(self, entry): elif event == "config": def _show(k, entry): - if k.startswith("config.system") or k: + if k.startswith("config.system"): return if isinstance(entry, dict): diff --git a/milabench/pack.py b/milabench/pack.py index 7678fc72c..8b484c48c 100644 --- a/milabench/pack.py +++ b/milabench/pack.py @@ -505,3 +505,21 @@ def build_run_plan(self) -> "cmd.Command": main = self.dirs.code / self.main_script pack = cmd.PackCommand(self, *self.argv, lazy=True) return cmd.VoirCommand(pack, cwd=main.parent) + + def resolve_argument(self, name): + """Resolve as single placeholder argument""" + placeholder = str(self.config["argv"][name]) + return self.resolve_argument(placeholder) + + def resolve_placeholder(self, placeholder): + """Resolve as single placeholder argument + + Examples + -------- + >>> resolve_placeholder("auto({n_worker}, 8)") + 16 + + """ + from milabench.sizer import resolve_argv + + return str(resolve_argv(self, [str(placeholder)])[0]) diff --git a/milabench/scripts/milabench_run.bash b/milabench/scripts/milabench_run.bash index 7ec73480e..409d13797 100755 --- a/milabench/scripts/milabench_run.bash +++ b/milabench/scripts/milabench_run.bash @@ -29,6 +29,7 @@ BASE="$LOC/base" ENV="./env" REMAINING_ARGS="" FUN="run" +OUTPUT="$HOME/RUN_$SLURM_JOB_ID" while getopts ":hm:p:e:b:o:c:f:" opt; do case $opt in @@ -117,6 +118,9 @@ function setup() { export PYTHONUNBUFFERED=1 export MILABENCH_BASE=$BASE export MILABENCH_CONFIG=$CONFIG + export MILABENCH_VENV=$ENV + export BENCHMARK_VENV="$BASE/venv/torch" + # # Fetch the repo # @@ -161,7 +165,7 @@ function run() { cat $SYSTEM module load gcc/9.3.0 - module load cuda/11.8 + module load cuda/12.3.2 echo "" echo "Install" @@ -178,11 +182,13 @@ function run() { echo "---" milabench run --config $CONFIG --system $SYSTEM --base $BASE $REMAINING_ARGS - echo "" - echo "Report" - echo "------" + mkdir -p $OUTPUT + mv $BASE/runs $OUTPUT - milabench write_report_to_pr --remote $ORIGIN --branch $BRANCH --config $CONFIG + # echo "" + # echo "Report" + # echo "------" + # milabench write_report_to_pr --remote $ORIGIN --branch $BRANCH --config $CONFIG echo "----" echo "Done after $SECONDS" @@ -196,4 +202,4 @@ case "$FUN" in pin) pin ;; -esac \ No newline at end of file +esac