diff --git a/benchmarks/accelerate_opt/benchfile.py b/benchmarks/accelerate_opt/benchfile.py index b0ad099cd..20e940585 100644 --- a/benchmarks/accelerate_opt/benchfile.py +++ b/benchmarks/accelerate_opt/benchfile.py @@ -7,20 +7,13 @@ ) from milabench.pack import Package from milabench.utils import select_nodes -from milabench.sizer import resolve_argv - - -def resolve_placeholder(pack, name): - placeholder = pack.config["argv"][name] - return resolve_argv(pack, [placeholder]) - class AccelerateBenchmark(Package): base_requirements = "requirements.in" def make_env(self): env = super().make_env() - value = resolve_placeholder(pack, "--cpus_per_gpu") + value = self.resolve_argument("--cpus_per_gpu") env["OMP_NUM_THREADS"] = str(value) return env diff --git a/benchmate/benchmate/dataloader.py b/benchmate/benchmate/dataloader.py index ada211ffb..f29eae79a 100644 --- a/benchmate/benchmate/dataloader.py +++ b/benchmate/benchmate/dataloader.py @@ -195,10 +195,7 @@ def pytorch(folder, batch_size, num_workers, distributed=False, epochs=60): def synthetic(model, batch_size, fixed_batch): return SyntheticData( tensors=generate_tensor_classification( - model, - batch_size, - (3, 244, 244), - device=accelerator.fetch_device(0) + model, batch_size, (3, 244, 244), device=accelerator.fetch_device(0) ), n=1000, fixed_batch=fixed_batch, diff --git a/benchmate/benchmate/monitor.py b/benchmate/benchmate/monitor.py index d277c29d8..e21931458 100644 --- a/benchmate/benchmate/monitor.py +++ b/benchmate/benchmate/monitor.py @@ -24,7 +24,7 @@ def monitor_fn(): } for gpu in get_gpu_info()["gpus"].values() } - mblog({"task": "main", "gpudata": data}) + mblog({"task": "train", "gpudata": data}) monitor_fn() monitor = Monitor(3, monitor_fn) @@ -74,7 +74,7 @@ def monitor_fn(): } for gpu in get_gpu_info()["gpus"].values() } - return {"task": "main", "gpudata": data, "time": time.time(), "units": "s"} + return {"task": "train", "gpudata": data, "time": time.time(), "units": "s"} monitor = CustomMonitor(0.5, monitor_fn) diff --git a/benchmate/benchmate/observer.py b/benchmate/benchmate/observer.py index 62aa84a58..3155e4e3f 100644 --- a/benchmate/benchmate/observer.py +++ b/benchmate/benchmate/observer.py @@ -23,7 +23,13 @@ class BenchObserver: """ def __init__( - self, *args, backward_callback=None, step_callback=None, stdout=False, rank=None, **kwargs + self, + *args, + backward_callback=None, + step_callback=None, + stdout=False, + rank=None, + **kwargs, ): self.wrapped = None self.args = args diff --git a/benchmate/benchmate/warden.py b/benchmate/benchmate/warden.py index ac8650d43..d96775fcb 100644 --- a/benchmate/benchmate/warden.py +++ b/benchmate/benchmate/warden.py @@ -1,53 +1,89 @@ -from dataclasses import dataclass -import re +import logging import os +import re +import signal import subprocess +import time import traceback -import signal +import warnings +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from dataclasses import dataclass -from voir.instruments.gpu import get_gpu_info from milabench.syslog import syslog +from voir.instruments.gpu import get_gpu_info + +log = logging.getLogger(__name__) + @dataclass class ProcessInfo: - gpu: int pid: int - type: str - process_name: str - memory: int - unit: str + gpu_name: int = None + type: str = None + process_name: str = None + memory: int = None + used_memory: str = None + gpu_bus_id: str = None + gpu_serial: str = None + gpu_uuid: str = None def _hpu_parse_processes(): output = subprocess.check_output(["hl-smi"], text=True) line_format = re.compile( - r"\|(\s+)(?P\d+)(\s+)(?P\d+)(\s+)(?P\w+)(\s+)(?P\w+)(\s+)(?P\d+)((?P\w+))(\s+)" + r"\|(\s+)(?P\d+)(\s+)(?P\d+)(\s+)(?P\w+)(\s+)(?P\w+)(\s+)(?P\d+)((?P\w+))(\s+)" ) - + info = [] for line in output.split("\n"): if match := line_format.match(line): info.append(ProcessInfo(**match.groupdict())) - + return info +def _cuda_parse_processes(): + metrics = [ + "pid", + "gpu_name", + "gpu_bus_id", + "gpu_serial", + "gpu_uuid", + "process_name", + "used_memory", + ] + query = ",".join(metrics) + cmd = ["nvidia-smi", f"--query-compute-apps={query}", "--format=csv,noheader"] + output = subprocess.check_output(cmd, text=True) + + info = [] + for line in output.split("\n"): + frags = line.splti(",") + info.append(ProcessInfo(**dict(*zip(metrics, frags)))) + return info + def _default(): return [] + backends = { "hpu": _hpu_parse_processes, - "cpu": _default + "cuda": _cuda_parse_processes, + # ROCM + # XPU + "cpu": _default, } class GPUProcessWarden: """Ensure all the process using the GPU are killed before & after the bench""" + def __init__(self, kill_on_start=True, kill_on_end=True): self.gpus = get_gpu_info() - self.arch = self.gpus['arch'] + self.arch = self.gpus["arch"] self.fetch_fun = backends.get(self.arch, _default) self.kill_on_start = kill_on_start self.kill_on_end = kill_on_end @@ -56,19 +92,19 @@ def __init__(self, kill_on_start=True, kill_on_end=True): def __enter__(self): if self.kill_on_start: self.ensure_free() - + return self def __exit__(self, *args): if self.kill_on_end: self.ensure_free() - + return None def fetch_processes(self): try: return self.fetch_fun() - except : + except: traceback.print_exc() return [] @@ -77,7 +113,7 @@ def kill(self, pid, signal): return try: - os.kill(pid, signal): + os.kill(pid, signal) except ProcessLookupError: self.dead_processes.append(pid) @@ -85,18 +121,140 @@ def ensure_free(self): processes = self.fetch_processes() if len(processes) == 0: return - - syslog("Found {0} still using devices after bench ended", len(processes)) - # Keyboard interrupt - for process in processes: - self.kill(process.pid, signal.SIGINT) + syslog("Found {0} still using devices after bench ended", len(processes)) - # Sig Term, please close now for process in processes: self.kill(process.pid, signal.SIGTERM) - - # Sig Kill, just die + for process in processes: self.kill(process.pid, signal.SIGKILL) + +class Protected: + """Prevent a signal to be raised during the execution of some code""" + + def __init__(self): + self.signal_received = False + self.handlers = {} + self.start = 0 + self.delayed = 0 + self.signal_installed = False + + def __enter__(self): + """Override the signal handlers with our delayed handler""" + self.signal_received = False + + try: + self.handlers[signal.SIGINT] = signal.signal(signal.SIGINT, self.handler) + self.handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, self.handler) + self.signal_installed = True + + except ValueError: # ValueError: signal only works in main thread + warnings.warn( + "SIGINT/SIGTERM protection hooks could not be installed because " + "Runner is executing inside a thread/subprocess, results could get lost " + "on interruptions" + ) + + return self + + def stop(self): + pass + + def handler(self, sig, frame): + """Register the received signal for later""" + + log.warning("Delaying signal %d to finish operations", sig) + log.warning( + "Press CTRL-C again to terminate the program now (You may lose results)" + ) + + self.start = time.time() + self.signal_received = (sig, frame) + + # if CTRL-C is pressed again the original handlers will handle it + # and make the program stop + self.restore_handlers() + + self.stop() + + def restore_handlers(self): + """Restore old signal handlers""" + if not self.signal_installed: + return + + signal.signal(signal.SIGINT, self.handlers[signal.SIGINT]) + signal.signal(signal.SIGTERM, self.handlers[signal.SIGTERM]) + + self.signal_installed = False + + def maybe_stop(self): + """Raise the delayed signal if any or restore the old signal handlers""" + + if not self.signal_received: + self.restore_handlers() + + else: + self.delayed = time.time() - self.start + + log.warning("Termination was delayed by %.4f s", self.delayed) + handler = self.handlers[self.signal_received[0]] + + if callable(handler): + handler(*self.signal_received) + + def __exit__(self, *args): + self.maybe_stop() + + +def destroy(*processes, step=1, timeout=30): + def kill(proc, signal): + try: + if getattr(proc, "did_setsid", False): + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + else: + os.kill(proc.pid, signal.SIGTERM) + except ProcessLookupError: + pass + + for proc in processes: + kill(proc, signal.SIGTERM) + + # Wait a total amout of time, not per process + elapsed = 0 + + def wait(proc): + nonlocal elapsed + + while (ret := proc.poll()) is None and elapsed < timeout: + time.sleep(step) + elapsed += step + + return ret is None + + for proc in processes: + if wait(proc): + kill(proc, signal.SIGKILL) + + +class SignalProtected(Protected): + """Delay event handling until all the processes are killed""" + + def __init__(self): + super().__init__() + self.processes = [] + + def add_process(self, *processes): + self.processes.extend(processes) + + def stop(self): + destroy(*self.processes) + + +@contextmanager +def process_cleaner(): + """Delay signal handling until all the processes have been killed""" + + with SignalProtected() as warden: + yield warden diff --git a/milabench/_version.py b/milabench/_version.py index bdd9c4326..f09618832 100644 --- a/milabench/_version.py +++ b/milabench/_version.py @@ -1,5 +1,5 @@ """This file is generated, do not modify""" -__tag__ = "v0.1.0-20-g7246295a" -__commit__ = "7246295a356186b55fa4b2b75480e3700c279b15" -__date__ = "2024-06-20 09:18:17 -0400" +__tag__ = "v0.1.0-30-g94b27a71" +__commit__ = "94b27a71145d3ba754a2713aeca60e5a28be4bc5" +__date__ = "2024-06-25 13:49:52 -0400" diff --git a/milabench/alt_async.py b/milabench/alt_async.py index 6c48ece87..cd5faef00 100644 --- a/milabench/alt_async.py +++ b/milabench/alt_async.py @@ -10,8 +10,8 @@ from collections import deque from functools import wraps +from benchmate.warden import process_cleaner, destroy from voir.proc import run as voir_run -from .syslog import syslog class FeedbackEventLoop(type(asyncio.get_event_loop())): @@ -166,134 +166,30 @@ async def wrapped(*args, **kwargs): return wrapped -def _kill_pid_with_delay(pid, sig, method, step, delay): - acc = 0 - try: - while acc < delay: - method(pid, sig) - time.sleep(step) - acc += step - return pid, method, acc, sig - except ProcessLookupError: - # success - return None, method, acc, sig - except PermissionError: - syslog("Not allowed to kill pid {0}", pid) - return None, method, acc, sig - except OSError: - syslog("Unhandled os error for pid {0}", pid) - return None, method, acc, sig - - -def _kill_proc_with_delay(proc, sig, delay): - start = - time.time() - def elasped(): - return start + time.time() - - proc.send_signal(sig) - try: - proc.wait(timeout=delay) - - # success - return None, None, elasped(), sig - except subprocess.TimeoutExpired: - return pid, None, elasped(), sig - - -def _filter_process_groups(processes): - group_pids = [] - proc_pids = [] - already_dead = [] - - for proc in processes: - if proc.returncode is not None: - already_dead.append((proc, proc.id)) - continue - - if getattr(proc, "did_setsid", False): - group_pids.append((proc, os.getpgid(proc.pid))) - else: - proc_pids.append((proc, proc.pid)) +@feedback_runner +def run(argv, setsid=None, process_accumulator=None, info={}, **kwargs): + with process_cleaner() as processes: + if setsid: + kwargs["preexec_fn"] = os.setsid - return proc_pids, group_pids, already_dead - - -def destroy_all(processes, delay=30): - from concurrent.futures import ThreadPoolExecutor - - futures = [] - def submit(pool, pid, signal, method): - args = ( - pid, - signal, - method, - 1, - delay - ) - futures.append(pool.submit(_kill_with_delay, *args)) - - signal_flow = { - None : signal.SIGTERM, - signal.SIGTERM: signal.SIGKILL - } - - def nextsignal(previous=None): - return signal_flow.get(previous, None) - - proc_pids, group_pids, already_dead = _filter_process_groups(processes) - stats = [] - with ThreadPoolExecutor(max_workers=8) as pool: - for proc, pid in group_pids: - submit(pool, pid, nextsignal(), os.killpg) - - for proc, pid in group_pids: - submit(pool, pid, nextsignal(), os.kill) + mx = voir_run(argv, info=info, **kwargs, timeout=0) + processes.add_process(*mx.processes) - futures = list(reversed(futures)) + if process_accumulator is not None: + process_accumulator.extend(mx.processes) - while futures: - future = futures.pop() - pid, method, elapsed, sig = future.result(timeout=delay + 1) + if setsid: + for proc in mx.processes: + proc.did_setsid = True - # on failure submit a SIGKILL - if pid is not None: - if (sig := nextsignal(sig)) is not None: - futures.append(submit(pool, pid, sig, method)) - else: - syslog("{0} failed on pid {1}", sig, pid) - - stats.append([pid, method, elapsed, sig]) - - # - # We want data on this: `SIGKILL` should never be necesary - # - to_be_killed = len(processes) - len(already_dead) - syslog("{0} kill event for {1} processes ({2} already dead)", len(starts), to_be_killed, len(already_dead)) - for pid, method, elapsed, sig in stats: - syslog(" - {0} was killed with {1} after {2} sec ({3})", pid, sig, elapsed, method) - - -def destroy(*processes): - destroy_all(processes, delay=30) - - -@feedback_runner -def run(argv, setsid=None, info={}, process_accumulator=None, **kwargs): - if setsid: - kwargs["preexec_fn"] = os.setsid - mx = voir_run(argv, info=info, **kwargs, timeout=0) - if process_accumulator is not None: - process_accumulator.extend(mx.processes) - if setsid: - for proc in mx.processes: - proc.did_setsid = True - loop = asyncio.get_running_loop() - loop._multiplexers.append(mx) - for entry in mx: - if entry and entry.event == "stop": - destroy(*mx.processes) - yield entry + loop = asyncio.get_running_loop() + loop._multiplexers.append(mx) + for entry in mx: + if entry and entry.event == "stop": + destroy(*mx.processes) + yield entry + def proceed(coro): loop = FeedbackEventLoop() diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 9fad6a920..58630681f 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -659,14 +659,7 @@ def _argv(self, **_) -> List: else ["--multi_gpu"] ) - # - # Can this logic be removed? - # - from ..sizer import new_argument_resolver - - resolver = new_argument_resolver(self.pack) - - cpu_per_process = resolver(str(self.pack.config["argv"]["--cpus_per_gpu"])) + cpu_per_process = self.resolve_argument('--cpus_per_gpu') return [ # -- Run the command in the right venv # This could be inside the SSH Command diff --git a/milabench/log.py b/milabench/log.py index 80fec44a4..602fd02d9 100644 --- a/milabench/log.py +++ b/milabench/log.py @@ -148,7 +148,7 @@ def __call__(self, entry): elif event == "config": def _show(k, entry): - if k.startswith("config.system") or k: + if k.startswith("config.system"): return if isinstance(entry, dict): diff --git a/milabench/pack.py b/milabench/pack.py index 7678fc72c..8b484c48c 100644 --- a/milabench/pack.py +++ b/milabench/pack.py @@ -505,3 +505,21 @@ def build_run_plan(self) -> "cmd.Command": main = self.dirs.code / self.main_script pack = cmd.PackCommand(self, *self.argv, lazy=True) return cmd.VoirCommand(pack, cwd=main.parent) + + def resolve_argument(self, name): + """Resolve as single placeholder argument""" + placeholder = str(self.config["argv"][name]) + return self.resolve_argument(placeholder) + + def resolve_placeholder(self, placeholder): + """Resolve as single placeholder argument + + Examples + -------- + >>> resolve_placeholder("auto({n_worker}, 8)") + 16 + + """ + from milabench.sizer import resolve_argv + + return str(resolve_argv(self, [str(placeholder)])[0]) diff --git a/milabench/scripts/milabench_run.bash b/milabench/scripts/milabench_run.bash index 7ec73480e..409d13797 100755 --- a/milabench/scripts/milabench_run.bash +++ b/milabench/scripts/milabench_run.bash @@ -29,6 +29,7 @@ BASE="$LOC/base" ENV="./env" REMAINING_ARGS="" FUN="run" +OUTPUT="$HOME/RUN_$SLURM_JOB_ID" while getopts ":hm:p:e:b:o:c:f:" opt; do case $opt in @@ -117,6 +118,9 @@ function setup() { export PYTHONUNBUFFERED=1 export MILABENCH_BASE=$BASE export MILABENCH_CONFIG=$CONFIG + export MILABENCH_VENV=$ENV + export BENCHMARK_VENV="$BASE/venv/torch" + # # Fetch the repo # @@ -161,7 +165,7 @@ function run() { cat $SYSTEM module load gcc/9.3.0 - module load cuda/11.8 + module load cuda/12.3.2 echo "" echo "Install" @@ -178,11 +182,13 @@ function run() { echo "---" milabench run --config $CONFIG --system $SYSTEM --base $BASE $REMAINING_ARGS - echo "" - echo "Report" - echo "------" + mkdir -p $OUTPUT + mv $BASE/runs $OUTPUT - milabench write_report_to_pr --remote $ORIGIN --branch $BRANCH --config $CONFIG + # echo "" + # echo "Report" + # echo "------" + # milabench write_report_to_pr --remote $ORIGIN --branch $BRANCH --config $CONFIG echo "----" echo "Done after $SECONDS" @@ -196,4 +202,4 @@ case "$FUN" in pin) pin ;; -esac \ No newline at end of file +esac diff --git a/tests/benchmate/test_protected.py b/tests/benchmate/test_protected.py new file mode 100644 index 000000000..ce0ca377f --- /dev/null +++ b/tests/benchmate/test_protected.py @@ -0,0 +1,144 @@ + + +import time +import multiprocessing +import signal +import os +import subprocess + +import pytest + +from benchmate.warden import process_cleaner + + +def _worker(delay): + time.sleep(delay) + print('done') + + + +def spawn(delay, warden): + procs = [] + for _ in range(10): + proc = multiprocessing.Process(target=_worker, args=(delay,)) + proc.start() + procs.append(proc) + warden.add_process(proc) + + return procs + +def wait(procs): + for proc in procs: + proc.join() + + +def _protected_process(delay): + with process_cleaner() as warden: + procs = spawn(delay, warden) + + wait(procs) + + +def test_process_cleaner_process_ended_already(): + start = time.time() + + proc = multiprocessing.Process(target=_protected_process, args=(1,)) + proc.start() + + time.sleep(2) + os.kill(proc.pid, signal.SIGINT) + + elapsed = time.time() - start + assert elapsed < 30 + + +def test_process_cleaner_process(): + start = time.time() + + proc = multiprocessing.Process(target=_protected_process, args=(60,)) + proc.start() + + time.sleep(1) + os.kill(proc.pid, signal.SIGINT) + + elapsed = time.time() - start + assert elapsed < 30 + + +def test_keyboard_cleaner_process(): + start = time.time() + + with pytest.raises(KeyboardInterrupt): + with process_cleaner() as warden: + procs = spawn(60, warden) + + time.sleep(1) + os.kill(os.getpid(), signal.SIGINT) + + wait(procs) + + elapsed = time.time() - start + assert elapsed < 30 + + +def test_keyboard_cleaner_process_ended(): + start = time.time() + + with pytest.raises(KeyboardInterrupt): + with process_cleaner() as warden: + procs = spawn(1, warden) + + time.sleep(2) + os.kill(os.getpid(), signal.SIGINT) + + wait(procs) + + elapsed = time.time() - start + assert elapsed < 30 + + +def test_protected_multiplexer(): + from voir.proc import Multiplexer + + start = time.time() + + def ctor(*args, **kwargs): + return kwargs + + with pytest.raises(KeyboardInterrupt): + with process_cleaner() as warden: + mx = Multiplexer(timeout=0, constructor=ctor) + proc = mx.start(["sleep", "60"], info={}, env={}, **{}) + warden.add_process(proc) + + time.sleep(2) + os.kill(os.getpid(), signal.SIGINT) + + for entry in mx: + if entry: + print(entry) + + elapsed = time.time() - start + assert elapsed < 30 + + +def test_protected_multiplexer_ended(): + from voir.proc import Multiplexer + + start = time.time() + + with pytest.raises(KeyboardInterrupt): + with process_cleaner() as warden: + mx = Multiplexer(timeout=0, constructor=lambda **kwargs: kwargs) + proc = mx.start(["sleep", "1"], info={}, env={}, **{}) + warden.add_process(proc) + + time.sleep(2) + os.kill(os.getpid(), signal.SIGINT) + + for entry in mx: + if entry: + print(entry) + + elapsed = time.time() - start + assert elapsed < 30 \ No newline at end of file