diff --git a/README.md b/README.md index ada33299a..9baeac939 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ Maintainer: [Philipp Wendler](https://www.philippwendler.de) Contributors: - [Aditya Arora](https://github.com/alohamora) +- [Levente Bajczi](https://github.com/leventeBajczi) - [Dirk Beyer](https://www.sosy-lab.org/people/beyer/) - [Laura Bschor](https://github.com/laurabschor) - [Thomas Bunk](https://github.com/TBunk) diff --git a/contrib/README.md b/contrib/README.md index a035bfab8..1be1c7741 100644 --- a/contrib/README.md +++ b/contrib/README.md @@ -16,6 +16,7 @@ Content: - `create_yaml_files.py`: Script for creating task-definition files from old input files that have expected verdicts encoded in the file name - [`p4-benchmark.py`](p4): BenchExec extension for [P4](https://p4.org/) programs for programmable switches - [`plots`](plots): Scripts and examples for generating plots from BenchExec results using Gnuplot or PGFPlots for LaTeX +- [`slurm-benchmark.py`](slurm): BenchExec extension for execution benchmark runs via [SLURM](https://slurm.schedmd.com/documentation.html) - `serveFileFromZIP.php`: Script for letting a web server serve files from a ZIP archive as if the archive would have been expanded. This is useful for hosting HTML tables with results and links to log files in ZIP archives. - `vcloud-benchmark.py`: BenchExec extension for executing benchmark runs on the VerifierCloud service diff --git a/contrib/slurm-benchmark.py b/contrib/slurm-benchmark.py new file mode 100755 index 000000000..7094b995d --- /dev/null +++ b/contrib/slurm-benchmark.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 + +# This file is part of BenchExec, a framework for reliable benchmarking: +# https://github.com/sosy-lab/benchexec +# +# SPDX-FileCopyrightText: 2007-2020 Dirk Beyer +# SPDX-FileCopyrightText: 2024 Levente Bajczi +# SPDX-FileCopyrightText: Critical Systems Research Group +# SPDX-FileCopyrightText: Budapest University of Technology and Economics +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import sys + +import benchexec.benchexec +import benchexec.tools +import benchexec.util + +sys.dont_write_bytecode = True # prevent creation of .pyc files + +# Add ./benchmark/tools to __path__ of benchexec.tools package +# such that additional tool-wrapper modules can be placed in this directory. +benchexec.tools.__path__ = [ + os.path.join(os.path.dirname(__file__), "benchmark", "tools") +] + benchexec.tools.__path__ + + +class Benchmark(benchexec.benchexec.BenchExec): + """ + An extension of BenchExec to execute benchmarks using SLURM, + optionally via Singularity. + """ + + def create_argument_parser(self): + parser = super(Benchmark, self).create_argument_parser() + + slurm_args = parser.add_argument_group("Options for using SLURM") + slurm_args.add_argument( + "--slurm", + dest="slurm", + action="store_true", + help="Use SLURM to execute benchmarks.", + ) + slurm_args.add_argument( + "--singularity", + dest="singularity", + type=str, + help="The path to the singularity .sif file to use. Will bind $PWD to $HOME when run.", + ) + slurm_args.add_argument( + "--scratchdir", + dest="scratchdir", + type=str, + default="./", + help="The directory where temporary directories can be created for use within singularity.", + ) + + return parser + + def load_executor(self): + if self.config.slurm: + from slurm import slurmexecutor as executor + else: + logging.warning( + "SLURM flag was not specified. Benchexec will be executed only on the local machine." + ) + executor = super(Benchmark, self).load_executor() + + return executor + + +if __name__ == "__main__": + benchexec.benchexec.main(Benchmark()) diff --git a/contrib/slurm/README.md b/contrib/slurm/README.md new file mode 100644 index 000000000..7e869a8fd --- /dev/null +++ b/contrib/slurm/README.md @@ -0,0 +1,87 @@ + +# BenchExec Extension for Benchmarking via SLURM + +This Python script extends BenchExec, a benchmarking framework, to facilitate benchmarking via SLURM, optionally using a Singularity container. + +In case of problems, please tag in an [issue](https://github.com/sosy-lab/benchexec/issues/new/choose): [Levente Bajczi](https://github.com/leventeBajczi) (@leventeBajczi). + +## Preliminaries + +* [SLURM](https://slurm.schedmd.com/documentation.html) is an open-source job scheduling and workload management system used primarily in high-performance computing (HPC) environments. +* [Singularity](https://docs.sylabs.io/guides/latest/user-guide/) is a containerization platform designed for scientific and high-performance computing (HPC) workloads, providing users with a reproducible and portable environment for running applications and workflows. + +## Requirements + +* SLURM, tested with `slurm 22.05.7`, should work within `22.x.x` +* Singularity (optional), tested with `singularity-ce version 4.0.1`, should work within `4.x.x` + +## Usage +1. Run the script with Python 3: + ``` + python3 $BENCHEXEC_FOLDER/contrib/slurm-benchmark.py [options] + ``` + Options: + - `--slurm`: Use SLURM to execute benchmarks. Will revert to regular (local) benchexec if not given. + - `--singularity `: Specify the path to the Singularity .sif file to use. See usage later. + - `--scratchdir `: Specify the directory for temporary files. The script will use this parameter to create temporary directories for file storage per-run, which get discarded later. By default, this is the CWD, which might result in temporary files being generated by the thousands in the working directory. On some systems, this must be on the same mount, or even under the same hierarchy as the current directory. Must exist, be writable, and be a directory. + - `-N `: Specify the factor of parallelism, i.e., how many instances to start at a time. Tested with up to `1000`, probably works with much higher values as well. + +## Overview of the Workflow + +This works similarly to BenchExec, however, instead of delegating each run to `runexec`, it delegates to `srun` from SLURM. + +1. If the `--singularity` option is given, the script wraps the command to run in a container. This is useful for dependency management (in most HPC environments, arbitrary package installations are frowned upon). For a simple container, use the following: + + ```singularity + BootStrap: docker + From: ubuntu:22.04 + + %post + apt -y update + apt -y install openjdk-17-jre-headless libgomp1 libmpfr-dev fuse-overlayfs + ``` + + Use `singularity build [--remote / --fakeroot] --fix-perms .sif .def` to build the container. + + Notice the `fuse-overlayfs` package. That is mandatory for the overlay filesystem to work properly. + + The script parameterizes `singularity exec` with the following params: + * `-B $PWD:/lower`: Bind the working directory to `/lower` (could be read-only) + * `--no-home`: Do not bind the home directory + * `-B {tempdir}:/overlay`: Bind the temporary directory to `/overlay` (must be writeable) + * `--fusemount "container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/overlay/upper -o workdir=/overlay/work $HOME"`: mount an overlay filesystem at $HOME, where modifications go in the temp dir but files can be read from the current dir + +2. Currently, the following parameters are passed to `srun` (calculated from the benchmark's parameters): + * `-t ` CPU timelimit (generally, SLURM will round up to nearest minute) + * `-c ` number of cpus + * `--threads-per-core=1` only use one thread per core + * `--mem-per-cpu ` memory allocaiton in MBs per cpu + * `--ntasks=1` number of tasks per node + +3. The script parses the resulting job ID, and after the job finishes, runs `seff` to gather resource usage data: + * Exit code + * CPU time [s] + * Wall time [s] + * Memory [MB] + +## Limitations + +Currently, there are the following limitations compared to local benchexec: + +1. No advanced resource constraining / monitoring: only CPU time, CPU core and memory limits are handled, and only CPU time, wall time, and memory usage are monitored. +2. No exotic paths in the command are handled: only the current working directory and its children are visible in the container +3. The user on the host and the container should not differ (due to using $HOME in the commands). +4. Without singularity, no constraint is placed on the resulting files of the runs: this will populate the current directory with all the output files of all the runs. +5. For timed-out runs, where SLURM terminated the run, no CPU time values are available. +6. The executor only works with hyperthreading disabled, due to the inability to query nodes about the number of threads per core. Assuming it's always 2 is risky, as it may not hold true universally. Consequently, because we can only request whole cores from SLURM instead of threads, we must divide the requested number of threads by the threads-per-core value, which is unknown if hyperthreading could be enabled. +7. Cancelling a benchmark run (by sending SIGINT) could be delayed up to a few minutes depending on the SLURM configuration. \ No newline at end of file diff --git a/contrib/slurm/__init__.py b/contrib/slurm/__init__.py new file mode 100644 index 000000000..7fab673f9 --- /dev/null +++ b/contrib/slurm/__init__.py @@ -0,0 +1,9 @@ +# This file is part of BenchExec, a framework for reliable benchmarking: +# https://github.com/sosy-lab/benchexec +# +# SPDX-FileCopyrightText: 2007-2020 Dirk Beyer +# SPDX-FileCopyrightText: 2024 Levente Bajczi +# SPDX-FileCopyrightText: Critical Systems Research Group +# SPDX-FileCopyrightText: Budapest University of Technology and Economics +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/contrib/slurm/slurmexecutor.py b/contrib/slurm/slurmexecutor.py new file mode 100644 index 000000000..026822ce6 --- /dev/null +++ b/contrib/slurm/slurmexecutor.py @@ -0,0 +1,334 @@ +# This file is part of BenchExec, a framework for reliable benchmarking: +# https://github.com/sosy-lab/benchexec +# +# SPDX-FileCopyrightText: 2007-2020 Dirk Beyer +# SPDX-FileCopyrightText: 2024 Levente Bajczi +# SPDX-FileCopyrightText: Critical Systems Research Group +# SPDX-FileCopyrightText: Budapest University of Technology and Economics +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import queue +import re +import subprocess +import sys +import tempfile +import threading +import time + +from benchexec import BenchExecException, tooladapter, util +from benchexec.util import ProcessExitCode + +sys.dont_write_bytecode = True # prevent creation of .pyc files + +WORKER_THREADS = [] +STOPPED_BY_INTERRUPT = False + + +def init(config, benchmark): + tool_locator = tooladapter.create_tool_locator(config) + benchmark.executable = benchmark.tool.executable(tool_locator) + benchmark.tool_version = benchmark.tool.version(benchmark.executable) + + +def get_system_info(): + return None + + +def execute_benchmark(benchmark, output_handler): + + if benchmark.config.use_hyperthreading: + sys.exit( + "SLURM can only work properly without hyperthreading enabled, by passing the --no-hyperthreading option. See README.md for details." + ) + + for runSet in benchmark.run_sets: + if STOPPED_BY_INTERRUPT: + break + + if not runSet.should_be_executed(): + output_handler.output_for_skipping_run_set(runSet) + + elif not runSet.runs: + output_handler.output_for_skipping_run_set( + runSet, "because it has no files" + ) + + else: + _execute_run_set( + runSet, + benchmark, + output_handler, + ) + + output_handler.output_after_benchmark(STOPPED_BY_INTERRUPT) + + +def _execute_run_set( + runSet, + benchmark, + output_handler, +): + # get times before runSet + walltime_before = time.monotonic() + + output_handler.output_before_run_set(runSet) + + # put all runs into a queue + for run in runSet.runs: + _Worker.working_queue.put(run) + + # keep a counter of unfinished runs for the below assertion + unfinished_runs = len(runSet.runs) + unfinished_runs_lock = threading.Lock() + + def run_finished(): + nonlocal unfinished_runs + with unfinished_runs_lock: + unfinished_runs -= 1 + + # create some workers + for i in range(min(benchmark.num_of_threads, unfinished_runs)): + if STOPPED_BY_INTERRUPT: + break + WORKER_THREADS.append(_Worker(benchmark, output_handler, run_finished)) + + # wait until workers are finished (all tasks done or STOPPED_BY_INTERRUPT) + for worker in WORKER_THREADS: + worker.join() + assert unfinished_runs == 0 or STOPPED_BY_INTERRUPT + + # get times after runSet + walltime_after = time.monotonic() + usedWallTime = walltime_after - walltime_before + + if STOPPED_BY_INTERRUPT: + output_handler.set_error("interrupted", runSet) + output_handler.output_after_run_set( + runSet, + walltime=usedWallTime, + ) + + +def stop(): + global STOPPED_BY_INTERRUPT + STOPPED_BY_INTERRUPT = True + + +class _Worker(threading.Thread): + """ + A Worker is a deamonic thread, that takes jobs from the working_queue and runs them. + """ + + working_queue = queue.Queue() + + def __init__(self, benchmark, output_handler, run_finished_callback): + threading.Thread.__init__(self) # constuctor of superclass + self.run_finished_callback = run_finished_callback + self.benchmark = benchmark + self.output_handler = output_handler + self.setDaemon(True) + + self.start() + + def run(self): + while not STOPPED_BY_INTERRUPT: + try: + currentRun = _Worker.working_queue.get_nowait() + except queue.Empty: + return + + try: + logging.debug('Executing run "%s"', currentRun.identifier) + self.execute(currentRun) + logging.debug('Finished run "%s"', currentRun.identifier) + except SystemExit as e: + logging.critical(e) + except BenchExecException as e: + logging.critical(e) + except BaseException: + logging.exception("Exception during run execution") + self.run_finished_callback() + _Worker.working_queue.task_done() + + def execute(self, run): + """ + This function executes the tool with a sourcefile with options. + It also calls functions for output before and after the run. + """ + self.output_handler.output_before_run(run) + + args = run.cmdline() + logging.debug("Command line of run is %s", args) + + try: + with open(run.log_file, "w") as f: + for i in range(6): + f.write(os.linesep) + + run_result = run_slurm( + self.benchmark, + args, + run.log_file, + ) + + except KeyboardInterrupt: + # If the run was interrupted, we ignore the result and cleanup. + stop() + + if STOPPED_BY_INTERRUPT: + try: + if self.benchmark.config.debug: + os.rename(run.log_file, run.log_file + ".killed") + else: + os.remove(run.log_file) + except OSError: + pass + return 1 + + run.set_result(run_result) + self.output_handler.output_after_run(run) + return None + + +jobid_pattern = re.compile(r"job (\d*) queued") + + +def run_slurm(benchmark, args, log_file): + timelimit = benchmark.rlimits.cputime + cpus = benchmark.rlimits.cpu_cores + memory = benchmark.rlimits.memory + + srun_timelimit_h = int(timelimit / 3600) + srun_timelimit_m = int((timelimit % 3600) / 60) + srun_timelimit_s = int(timelimit % 60) + srun_timelimit = f"{srun_timelimit_h}:{srun_timelimit_m}:{srun_timelimit_s}" + + mem_per_cpu = int(memory / cpus / 1000000) + + if not benchmark.config.scratchdir: + sys.exit("No scratchdir present. Please specify using --scratchdir .") + elif not os.path.exists(benchmark.config.scratchdir): + os.makedirs(benchmark.config.scratchdir) + logging.debug(f"Created scratchdir: {benchmark.config.scratchdir}") + elif not os.path.isdir(benchmark.config.scratchdir): + sys.exit( + f"Scratchdir {benchmark.config.scratchdir} not a directory. Please specify using --scratchdir ." + ) + + with tempfile.TemporaryDirectory(dir=benchmark.config.scratchdir) as tempdir: + + os.makedirs(os.path.join(tempdir, "upper")) + os.makedirs(os.path.join(tempdir, "work")) + + srun_command = [ + "srun", + "-t", + str(srun_timelimit), + "-c", + str(cpus), + "-o", + str(log_file), + "--mem-per-cpu", + str(mem_per_cpu), + "--threads-per-core=1", # --use_hyperthreading=False is always given here + "--ntasks=1", + ] + if benchmark.config.singularity: + srun_command.extend( + [ + "singularity", + "exec", + "-B", + "./:/lower", + "--no-home", + "-B", + f"{tempdir}:/overlay", + "--fusemount", + f"container:fuse-overlayfs -o lowerdir=/lower -o upperdir=/overlay/upper -o workdir=/overlay/work /home/{os.getlogin()}", + benchmark.config.singularity, + ] + ) + srun_command.extend(args) + + logging.debug( + "Command to run: %s", " ".join(map(util.escape_string_shell, srun_command)) + ) + srun_result = subprocess.run( + srun_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + logging.debug( + "srun: returncode: %d, output: %s", + srun_result.returncode, + srun_result.stdout, + ) + jobid_match = jobid_pattern.search(str(srun_result.stdout)) + if jobid_match: + jobid = int(jobid_match.group(1)) + else: + logging.debug("Jobid not found in stderr, aborting") + stop() + return -1 + + seff_command = ["seff", str(jobid)] + logging.debug( + "Command to run: %s", " ".join(map(util.escape_string_shell, seff_command)) + ) + result = subprocess.run( + seff_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + # Runexec would populate the first 6 lines with metadata + with open(log_file, "r+") as file: + content = file.read() + file.seek(0, 0) + empty_lines = "\n" * 6 + file.write(empty_lines + content) + + exit_code, cpu_time, wall_time, memory_usage = parse_seff(str(result.stdout)) + + return { + "walltime": wall_time, + "cputime": cpu_time, + "memory": memory_usage, + "exitcode": ProcessExitCode.create(value=exit_code), + } + + +exit_code_pattern = re.compile(r"exit code (\d+)") +cpu_time_pattern = re.compile(r"CPU Utilized: (\d+):(\d+):(\d+)") +wall_time_pattern = re.compile(r"Job Wall-clock time: (\d+):(\d+):(\d+)") +memory_pattern = re.compile(r"Memory Utilized: (\d+\.\d+) MB") + + +def parse_seff(result): + logging.debug(f"Got output from seff: {result}") + exit_code_match = exit_code_pattern.search(result) + cpu_time_match = cpu_time_pattern.search(result) + wall_time_match = wall_time_pattern.search(result) + memory_match = memory_pattern.search(result) + if exit_code_match: + exit_code = int(exit_code_match.group(1)) + else: + raise Exception(f"Exit code not matched in output: {result}") + cpu_time = None + if cpu_time_match: + hours, minutes, seconds = map(int, cpu_time_match.groups()) + cpu_time = hours * 3600 + minutes * 60 + seconds + wall_time = None + if wall_time_match: + hours, minutes, seconds = map(int, wall_time_match.groups()) + wall_time = hours * 3600 + minutes * 60 + seconds + memory_usage = float(memory_match.group(1)) * 1000000 if memory_match else None + + logging.debug( + f"Exit code: {exit_code}, memory usage: {memory_usage}, walltime: {wall_time}, cpu time: {cpu_time}" + ) + + return exit_code, cpu_time, wall_time, memory_usage