diff --git a/abipy/flowtk/qutils.py b/abipy/flowtk/qutils.py index 787f3fc6c..06381ee54 100644 --- a/abipy/flowtk/qutils.py +++ b/abipy/flowtk/qutils.py @@ -13,6 +13,7 @@ from monty.string import is_string from pymatgen.core.units import Time, Memory +from abipy.tools.typing import PathLike from abipy.tools import duck @@ -164,3 +165,112 @@ def slurm_get_jobs(username=None) -> dict[int, dict]: entries.append(d) return {e["JOBID"]: e for e in entries} + + + +class SlurmJobArray: + """ + + Example: + + header = '''\ +#!/bin/bash + +#SBATCH --account=battab +#SBATCH --job-name=abiml_md +#SBATCH --time=0-16:0:0 +#SBATCH --partition=batch +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 # 1 node has 128 cores +#SBATCH --cpus-per-task=1 + +conda activate env3.10 +export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK +ulimit -s unlimited +''' + +command = "abiml.py md" +arr_options = ["--help", "--version"] +job_array = SlurmJobArray(header, command, arr_options) +print(job_array) +queue_id = job_array.sbatch("job.sh") + """ + + def __init__(self, header, command, arr_options): + self.command = str(command) + self.header = str(header) + self.arr_options = arr_options + self.arr_options_str = "\n".join(arr_options) + + def __str__(self): + # Add slurm array section. + lines = self.header.splitlines() + for il, line in enumerate(lines): + if line.startswith("#SBATCH"): + break + else: + raise ValueError("Cannot find line starting with #SBATCH") + lines.insert(il, f"#SBATCH --array=0:{len(self.arr_options)-1}") + header = "\n".join(lines) + + select_opts = r""" +# multiline string +OPTS_STRING=" +%s +" + +# Convert the multiline string to an array +IFS=$'\n' read -rd '' -a OPTS_LIST <<< "$OPTS_STRING" + +# Index of the entry you want (0-based) +index=0 +#index=${SLURM_ARRAY_TASK_ID} + +# Check if the index is within the range of the array +if (( index >= 0 && index < ${#OPTS_LIST[@]} )); then + OPTS="${OPTS_LIST[index]}" + echo "Selected entry at index $index: $OPTS" +else + echo "Index $index is out of range" + exit 1 +fi + +""" % (self.arr_options_str) + + end = f"{self.command} ${{OPTS}}" + return header + select_opts + end + + def sbatch(self, slurm_filepath: PathLike, dry_run=False) -> int: + with open(slurm_filepath, "wt") as fh: + fh.write(str(self)) + + queue_id = slurm_submit(slurm_filepath) + with open(slurm_filepath + ".qid", "wt") as fh: + fh.write("# Slurm job id") + fh.write(str(queue_id)) + return queue_id + + + +def slurm_submit(script_file) -> int: + """ + Submit a job script to the queue with sbatch + """ + from subprocess import Popen, PIPE + # need string not bytes so must use universal_newlines + process = Popen(['sbatch', script_file], stdout=PIPE, stderr=PIPE, universal_newlines=True) + out, err = process.communicate() + + # grab the returncode. SLURM returns 0 if the job was successful + if process.returncode == 0: + try: + # output should of the form '2561553.sdb' or '352353.jessup' - just grab the first part for job id + queue_id = int(out.split()[3]) + print('Job submission was successful and queue_id is {}'.format(queue_id)) + return queue_id + except Exception as exc: + # probably error parsing job code + print('Could not parse job id following slurm...') + raise exc + else: + raise RuntimeError(f"Error while submitting {script_file=}")