From 949762cc25e9f2fcdddf96808e1d5e175ca24a01 Mon Sep 17 00:00:00 2001 From: Altan Orhon Date: Tue, 19 Sep 2023 19:10:07 -0700 Subject: [PATCH] saving work on script --- hyakvnc/__main__.py | 272 +++++++++++++++++++++++++++---------------- hyakvnc/slurmutil.py | 143 +++++++---------------- hyakvnc/util.py | 31 +++++ 3 files changed, 245 insertions(+), 201 deletions(-) create mode 100644 hyakvnc/util.py diff --git a/hyakvnc/__main__.py b/hyakvnc/__main__.py index 04893d1..5300fba 100644 --- a/hyakvnc/__main__.py +++ b/hyakvnc/__main__.py @@ -9,18 +9,17 @@ import re import logging import subprocess +from copy import deepcopy +from pprint import pformat import shlex import time +import tenacity +import argparse +from dataclasses import dataclass, asdict -from slurmutil import get_slurm_cluster, get_slurm_partitions, get_slurm_default_account, get_slurm_job_details -# Base VNC port cannot be changed due to vncserver not having a stable argument -# interface: -BASE_VNC_PORT = os.environ.setdefault("HYAKVNC_BASE_VNC_PORT", "5900") - -# List of Klone login node hostnames -LOGIN_NODE_LIST = os.environ.get("HYAKVNC_LOGIN_NODES", "klone-login01,klone1.hyak.uw.edu,klone2.hyak.uw.edu").split( - ",") +from .slurmutil import get_default_cluster, get_default_account, get_partitions, wait_for_job_status +from .util import repeat_until, wait_for_file, check_remote_pid_exists_and_port_open # Name of Apptainer binary (formerly Singularity) APPTAINER_BIN = os.environ.setdefault("HYAKVNC_APPTAINER_BIN", "apptainer") @@ -28,60 +27,51 @@ # Checked to see if klone is authorized for intracluster access AUTH_KEYS_FILEPATH = Path(os.environ.setdefault("HYAKVNC_AUTH_KEYS_FILEPATH", "~/.ssh/authorized_keys")).expanduser() -# Apptainer bindpaths can be overwritten if $APPTAINER_BINDPATH is defined. -# Bindpaths are used to mount storage paths to containerized environment. -APPTAINER_BINDPATH = os.environ.setdefault("APPTAINER_BINDPATH", - os.environ.get("HYAKVNC_APPTAINER_BINDPATH", - os.environ.get("SINGULARITY_BINDPATH", - "/tmp,$HOME,$PWD,/gscratch,/opt,/:/hyak_root,/sw,/mmfs1"))) - -APPTAINER_CONFIGDIR = Path(os.getenv("APPTAINER_CONFIGDIR", "~/.apptainer")).expanduser() -APPTAINER_INSTANCES_DIR = APPTAINER_CONFIGDIR / "instances" - -# # SLURM UTILS - -# Slurm configuration variables: -SLURM_CLUSTER = os.getenv("HYAKVNC_SLURM_CLUSTER", os.getenv("SBATCH_CLUSTERS", get_slurm_cluster()).split(",")[0]) -SLURM_ACCOUNT = os.environ.get("HYAKVNC_SLURM_ACCOUNT", os.environ.setdefault("SBATCH_ACCOUNT", - get_slurm_default_account( - cluster=SLURM_CLUSTER))) -SLURM_GPUS = os.environ.setdefault("SBATCH_GPUS", "0") -SLURM_CPUS_PER_TASK = os.environ.setdefault("HYAKVNC_SLURM_CPUS_PER_TASK", "1") -SBATCH_GPUS = os.environ.setdefault("SBATCH_GPUS", "0") -SBATCH_TIMELIMIT = os.environ.setdefault("SBATCH_TIMELIMIT", "1:00:00") -HYAKVNC_SLURM_JOBNAME_PREFIX = os.getenv("HYAKVNC_SLURM_JOBNAME_PREFIX", "hyakvnc-") -HYAKVNC_APPTAINER_INSTANCE_PREFIX = os.getenv("HYAKVNC_APPTAINER_INSTANCE_PREFIX", HYAKVNC_APPTAINER_INSTANCE_PREFIX + "vncserver-") -SBATCH_CLUSTERS = os.environ.setdefault("SBATCH_CLUSTERS", SLURM_CLUSTER) +@dataclass +class HyakVncConfig: + # script attributes + job_prefix: str = "hyakvnc-" + # apptainer config + apptainer_config_dir: str = "~/.apptainer" + apptainer_instance_prefix: str = "hyakvnc-" + apptainer_env_vars: Optional[dict] = None -found_sbatch_partitions = get_slurm_partitions(account=SBATCH_ACCOUNT, cluster=SBATCH_CLUSTERS) -if found_sbatch_partitions: - HYAKVNC_SLURM_PARTITION = os.environ.get("HYAKVNC_SLURM_PARTITION", os.environ.setdefault("SBATCH_ACCOUNT", - get_slurm_default_account( - cluster=SLURM_CLUSTER))) + sbatch_post_timeout: float = 120.0 + sbatch_post_poll_interval: float = 1.0 -SB + # ssh config + ssh_host = "klone.hyak.uw.edu" -if any(SBATCH_PARTITION := x for x in get_slurm_partitions(account=SBATCH_ACCOUNT, cluster=SBATCH_CLUSTERS)): - os.environ.setdefault("SBATCH_PARTITION", SBATCH_PARTITION) + # slurm attributes + ## sbatch environment variables + account: Optional[str] = None # -a, --account + partition: Optional[str] = None # -p, --partition + cluster: Optional[str] = None # --clusters, SBATCH_CLUSTERS + gpus: Optional[str] = None # -G, --gpus, SBATCH_GPUS + timelimit: Optional[str] = None # -t, --time, SBATCH_TIMELIMIT + mem: Optional[str] = None # --mem, SBATCH_MEM + cpus: Optional[int] = None # -c, --cpus-per-task (not settable by environment variable) -SBATCH_GPUS = os.environ.setdefault("SBATCH_GPUS", "0") -SBATCH_TIMELIMIT = os.environ.setdefault("SBATCH_TIMELIMIT", "1:00:00") -SBATCH_MEM = os.environ.setdefault("SBATCH_MEM", "8G") + def to_json(self): + return json.dumps({k: v for k, v in asdict(self).items() if v is not None}) + @staticmethod + def from_json(path): + if not Path(path).is_file(): + raise ValueError(f"Invalid path to configuration file: {path}") -HYAKVNC_SLURM_JOBNAME_PREFIX = os.getenv("HYAKVNC_SLURM_JOBNAME_PREFIX", "hyakvnc-") -HYAKVNC_APPTAINER_INSTANCE_PREFIX = os.getenv("HYAKVNC_APPTAINER_INSTANCE_PREFIX", "hyakvnc-vncserver-") + with open(path, "r") as f: + contents = json.load(f) + return HyakVncConfig(**contents) + @staticmethod + def from_jsons(s: str): + return HyakVncConfig(**json.loads(s)) -def check_remote_pid_exists_and_port_open(host: str, pid: int, port: int) -> bool: - cmd = f"ssh {host} ps -p {pid} && nc -z localhost {port}".split() - res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return res.returncode == 0 -def get_apptainer_vnc_instances(apptainer_config_dir="~/.apptainer", instance_prefix: str ="hyakvnc-", - read_apptainer_config: bool = False): +def get_apptainer_vnc_instances(cfg: HyakVncConfig, read_apptainer_config: bool = False): appdir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app' assert appdir.exists(), f"Could not find apptainer instances dir at {appdir}" @@ -89,10 +79,10 @@ def get_apptainer_vnc_instances(apptainer_config_dir="~/.apptainer", instance_pr if read_apptainer_config: needed_keys.add('config') - all_instance_json_files = appdir.rglob(instance_prefix + '*.json') + all_instance_json_files = appdir.rglob(cfg.apptainer_instance_prefix + '*.json') running_hyakvnc_json_files = {p: r.groupdict() for p in all_instance_json_files if ( - r := re.match(rf'(?P{instance_prefix})(?P\d+)-(?P.*)\.json', p.name)) + r := re.match(rf'(?P{cfg.apptainer_instance_prefix})(?P\d+)-(?P.*)\.json', p.name)) } outs = [] # frr := re.search(r'\s+-rfbport\s+(?P\d+\b', fr) @@ -163,19 +153,7 @@ def get_openssh_connection_string_for_instance(instance: dict, login_host: str, - -APPTAINER_WRITABLE_TMPFS = os.environ.setdefault("APPTAINER_WRITABLE_TMPFS", "1") -APPTAINER_CLEANENV = os.environ.setdefault("APPTAINER_CLEANENV", "1") - -def get_slurm_job_status(jobid: int): - cmd = ["squeue", "-j", str(jobid), "-ho", "%T"] - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - if res.returncode != 0: - raise ValueError(f"Could not get status for job {jobid}:\n{res.stderr}") - return res.stdout.strip() - -def create_job_with_container(container_path: str, cpus_per_task: Optional[int] = None, squeue_poll_interval: int = 3): - +def create_job_with_container(container_path, cfg: HyakVncConfig): if re.match(r"(?Plibrary|docker|shub|oras)://(?P.*)", container_path): container_name = Path(container_path).stem @@ -185,53 +163,141 @@ def create_job_with_container(container_path: str, cpus_per_task: Optional[int] assert container_path.exists(), f"Could not find container at {container_path}" - cmds = ["sbatch", "--parsable"] - if cpus_per_task: - cmds += ["-c", str(cpus_per_task)] + cmds = ["sbatch", "--parsable", "--job-name", cfg.job_prefix + container_name] + sbatch_optinfo = {"account": "-A", "partition": "-p", "gpus": "-G", "timelimit": "--time", "mem": "--mem", "cpus": "-c"} + sbatch_options = [item for pair in [(sbatch_optinfo[k], v) for k, v in asdict(cfg).items() if k in sbatch_optinfo.keys() and v is not None] + for item in pair] - os.environ["SBATCH_JOB_NAME"] = f"{HYAKVNC_SLURM_JOBNAME_PREFIX}{container_name}" + cmds += sbatch_options - # Set up apptainer variables and command: - APPTAINER_CLEANENV = os.environ.setdefault("APPTAINER_CLEANENV", "1") - APPTAINER_WRITABLE_TMPFS = os.environ.setdefault("APPTAINER_WRITABLE_TMPFS", "1") apptainer_env_vars = {k: v for k, v in os.environ.items() if k.startswith("APPTAINER_") or k.startswith("SINGULARITY_") or k.startswith("SINGULARITYENV_") or k.startswith("APPTAINERENV_")} apptainer_env_vars_str = [ f"{k}={shlex.quote(v)}" for k, v in apptainer_env_vars.items()] - apptainer_cmd = f"{apptainer_env_vars_str} apptainer instance start --cleanenv --writable-tmpfs {container_path} && while true; do sleep 10; done" + + apptainer_cmd = f"{apptainer_env_vars_str} apptainer instance start {container_path} && while true; do sleep 10; done" cmds += ["--wrap", apptainer_cmd] + + # Launch sbatch process: - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - if res.returncode != 0: - raise ValueError(f"Could not create job with container {container_path}:\n{res.stderr}") + logging.info("Launching sbatch process with command:\n" + " ".join(cmds)) + res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.hPIPE, encoding="utf-8") try: - jobid = res.stdout.strip().split(":") - except (ValueError, IndexError): + job_id = int(res.stdout.strip().split(":") + except (ValueError, IndexError, TypeError): raise RuntimeError(f"Could not parse jobid from sbatch output: {res.stdout}") - while True: - job_status = get_slurm_job_status(jobid) - if job_status == "RUNNING": - vnc_instances = get_apptainer_vnc_instances() - inst_name = f"{HYAKVNC_APPTAINER_INSTANCE_PREFIX}{container_name}" - vinst = [ x for x in vnc_instances if x['name'] == f"{HYAKVNC_APPTAINER_INSTANCE_PREFIX}{container_name}"] - - elif job_status in [ "CONFIGURING", - "PENDING", - "RESV_DEL_HOLD", - "REQUEUE_FED", - "REQUEUE_HOLD", - "REQUEUED", - "RESIZING", - "SIGNALING"]: - logging.debug(f"Job {jobid} is {job_status} - waiting {squeue_poll_interval} seconds") - time.sleep(squeue_poll_interval) - else: - raise RuntimeError(f"Job {jobid} unable to grant launch request - status is {job_status}") - - - - res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return res.returncode == 0 \ No newline at end of file + s = wait_for_job_status(job_id, states= { "RUNNING" }, timeout=cfg.sbatch_post_timeout, cfg.sbatch_post_poll_interval) + if not s: + raise RuntimeError(f"Job {job_id} did not start running within {cfg.sbatch_post_timeout} seconds") + + + + + + +def kill(jobid: Optional[int] = None, all: bool = False): + if all: + vnc_instances = get_apptainer_vnc_instances() + for vnc_instance in vnc_instances: + subprocess.run(["scancel", str(vnc_instance['slurm_job_id'])]) + return + + if jobid: + subprocess.run(["scancel", str(jobid)]) + return + + raise ValueError("Must specify either --all or ") + +def create_arg_parser(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest='command') + + # general arguments + parser.add_argument('-d', '--debug', + dest='debug', + action='store_true', + help='Enable debug logging') + + parser.add_argument('-v', '--version', + dest='print_version', + action='store_true', + help='Print program version and exit') + + # command: create + parser_create = subparsers.add_parser('create', + help='Create VNC session') + parser_create.add_argument('-p', '--partition', + dest='partition', + metavar='', + help='Slurm partition', + type=str) + parser_create.add_argument('-A', '--account', + dest='account', + metavar='', + help='Slurm account', + type=str) + parser_create.add_argument('--timeout', + dest='timeout', + metavar='', + help='[default: 120] Slurm node allocation and VNC startup timeout length (in seconds)', + default=120, + type=int) + parser_create.add_argument('-t', '--time', + dest='time', + metavar='', + help='Subnode reservation time (in hours)', + type=int) + parser_create.add_argument('-c', '--cpus', + dest='cpus', + metavar='', + help='Subnode cpu count', + default=1, + type=int) + parser_create.add_argument('-G', '--gpus', + dest='gpus', + metavar='[type:]', + help='Subnode gpu count', + default="0" + type=str) + parser_create.add_argument('--mem', + dest='mem', + metavar='', + help='Subnode memory amount with units', + type=str) + parser_create.add_argument('--container', + dest='sing_container', + metavar='', + help='Path to VNC Apptainer/Singularity Container (.sif)', + required=True, + type=str) + + # status command + parser_status = subparsers.add_parser('status', + help='Print details of all VNC jobs with given job name and exit') + + # kill command + parser_kill = subparsers.add_parser('kill', + help='Kill specified job') + + kiLl_group = parser_kill.add_mutually_exclusive_group(required=True) + kiLl_group.add_argument('job_id', + metavar='', + help='Kill specified VNC session, cancel its VNC job, and exit', + type=int) + + kiLl_group.add_argument('-a', '--all', + action='store_true', + dest='kill_all', + help='Stop all VNC sessions and exit') + + parser_kill.set_defaults(func=kill) + + + +arg_parser = create_arg_parser() +args = (arg_parser).parse_args() + +print(args.func(*args.operands)) \ No newline at end of file diff --git a/hyakvnc/slurmutil.py b/hyakvnc/slurmutil.py index 8010309..1a7984e 100755 --- a/hyakvnc/slurmutil.py +++ b/hyakvnc/slurmutil.py @@ -1,58 +1,60 @@ # -*- coding: utf-8 -*- -import sys import os import subprocess -import re -from typing import Optional, Iterable, Union, List - -import json -from pathlib import Path - +import sys +from typing import Optional, Union, Container -def get_slurmd_config(): - cmd = f"slurmd -C".split() - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8').stdout.splitlines() - return dict([re.match(r'([^=]+)+=(.*)', k).groups() for k in res.split()]) +from .util import repeat_until -def get_slurm_cluster(): +def get_default_cluster() -> str: cmd = f"sacctmgr show cluster -nPs format=Cluster".split() - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8').stdout.splitlines() - if any(cluster := x for x in res): - return cluster + res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + encoding=sys.getdefaultencoding()).stdout.splitlines() + if res: + return res[0] else: - raise ValueError("Could not find cluster name") + raise LookupError("Could not find default cluster") -def get_slurm_default_account(user: Optional[str] = None, cluster: Optional[str] = None): + +def get_default_account(user: Optional[str] = None, cluster: Optional[str] = None) -> str: user = user or os.getlogin() - cluster = cluster or get_slurm_cluster() - cmd = f"sacctmgr show user -nPs {user} format=defaultaccount where cluster={cluster}".split() - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8').stdout.splitlines() + cluster = cluster or get_default_cluster() + + cmd = f"sacctmgr show user -nPs {user} format=defaultaccount where cluster={cluster}" + + res = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, + encoding=sys.getdefaultencoding()).stdout.splitlines() + if any(default_account := x for x in res): return default_account else: - raise ValueError(f"Could not find default account for user '{user}' on cluster '{cluster}'") + raise LookupError(f"Could not find default account for user '{user}' on cluster '{cluster}'") -def get_slurm_partitions(user: Optional[str] = None, account: Optional[str] = None, cluster: Optional[str] = None): +def get_partitions(user: Optional[str] = None, account: Optional[str] = None, cluster: Optional[str] = None) -> set[ + str]: user = user or os.getlogin() - cluster = cluster or get_slurm_cluster() - account = account or get_slurm_default_account(user=user, cluster=cluster) - cmd = f"sacctmgr show -nPs user {user} format=qos where account={account} cluster={cluster}".split() - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8').stdout.splitlines() + cluster = cluster or get_default_cluster() + account = account or get_default_account(user=user, cluster=cluster) + cmd = f"sacctmgr show -nPs user {user} format=qos where account={account} cluster={cluster}" + res = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, + encoding=sys.getdefaultencoding()).stdout.splitlines() + if any(partitions := x for x in res): return {x.strip(f"{account}-") for x in partitions.split(',')} else: raise ValueError(f"Could not find partitions for user '{user}' and account '{account}' on cluster '{cluster}'") -def get_slurm_job_details(user: Optional[str] = None, jobs: Optional[Union[int, list[int]]] = None, me: bool = True, - cluster: Optional[str] = None, - fields=( - 'JobId', 'Partition', 'Name', 'State', 'TimeUsed', 'TimeLimit', 'NumNodes', 'NodeList')): +def get_job_details(user: Optional[str] = None, jobs: Optional[Union[int, list[int]]] = None, me: bool = True, + cluster: Optional[str] = None, + fields=( + 'JobId', 'Partition', 'Name', 'State', 'TimeUsed', 'TimeLimit', 'NumNodes', + 'NodeList')) -> dict: if me and not user: user = os.getlogin() - cluster = cluster or get_slurm_cluster() + cluster = cluster or get_default_cluster() cmds: list[str] = ['squeue', '--noheader'] fields_str = ','.join(fields) @@ -65,77 +67,22 @@ def get_slurm_job_details(user: Optional[str] = None, jobs: Optional[Union[int, jobs = [jobs] jobs = ','.join([str(x) for x in jobs]) cmds += ['--jobs', jobs] - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8").stdout.splitlines() + res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + encoding=sys.getdefaultencoding()).stdout.splitlines() out = {x["JobId"]: x for x in [dict(zip(fields, line.split())) for line in res if line.strip()]} return out -SLURM_RUNNING_STATES = [ - "CONFIGURING", - "PENDING", - "RESV_DEL_HOLD", - "REQUEUE_FED", - "REQUEUE_HOLD", - "REQUEUED", - "RESIZING", - "SIGNALING", - "STAGE_OUT", - "SUSPENDED", - "STOPPED", -] - -SLURM_SUCCESS_STATES = [ - "CG", - "COMPLETING", - "CD", - "COMPLETED", -] - -SLURM_CANCELLED_STATES = ["CA", "CANCELLED", "RV", "REVOKED"] - -SLURM_TIMEOUT_STATES = ["DL", "DEADLINE", "TO", "TIMEOUT"] - -SLURM_FAILURE_STATES = [ - "BF", - "BOOT_FAIL", - "F", - "FAILED", - "NF", - "NODE_FAIL", - "OOM", - "OUT_OF_MEMORY", - "PR", - "PREEMPTED", -] - -def is_success(status): - return status in SLURM_SUCCESS_STATES - - -def is_failure(status): - return status in SLURM_FAILURE_STATES - - -def is_timeout(status): - return status in SLURM_TIMEOUT_STATES - - -def is_cancelled(status): - return status in SLURM_CANCELLED_STATES - - -def is_complete(status): - return ( - is_success(status) - or is_failure(status) - or is_timeout(status) - or is_cancelled(status) - ) - - -def get_slurm_job_status(jobid: int): - cmd = ["squeue", "-j", jobid, "-ho", "%T"] - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") +def get_job_status(jobid: int) -> str: + cmd = f"squeue -j {jobid} -h -o %T" + res = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding=sys.getdefaultencoding()) if res.returncode != 0: raise ValueError(f"Could not get status for job {jobid}:\n{res.stderr}") return res.stdout.strip() + + +def wait_for_job_status(job_id: int, states: Container[str], timeout: Optional[float] = None, + poll_interval: float = 1.0) -> bool: + """Waits for the specified job state to be reached""" + return repeat_until(lambda: get_job_status(job_id), lambda x: x in states, timeout=timeout, + poll_interval=poll_interval) diff --git a/hyakvnc/util.py b/hyakvnc/util.py new file mode 100644 index 0000000..659a81f --- /dev/null +++ b/hyakvnc/util.py @@ -0,0 +1,31 @@ +import time +from typing import Callable, Optional, Union +import logging +from pathlib import Path +import subprocess +def repeat_until(func: Callable, condition: Callable[[int], bool], timeout: Optional[float] = None, + poll_interval: float = 1.0): + begin_time = time.time() + assert timeout is None or timeout > 0, "Timeout must be greater than zero" + assert poll_interval > 0, "Poll interval must be greater than zero" + timeout = timeout or -1.0 + while time.time() < begin_time + timeout: + if condition(func()): + return True + time.sleep(poll_interval) + return False + + +def wait_for_file(path: Union[Path, str], timeout: Optional[float] = None, + poll_interval: float = 1.0): + """ + Waits for the specified file to be present. + """ + path = Path(path) + logging.debug(f"Waiting for file `{path}` to exist") + return repeat_until(lambda: path.exists(), lambda exists: exists, timeout=timeout, poll_interval=poll_interval) + +def check_remote_pid_exists_and_port_open(host: str, pid: int, port: int) -> bool: + cmd = f"ssh {host} ps -p {pid} && nc -z localhost {port}".split() + res = subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return res.returncode == 0