From 229152ef1db41722ec58564f0d99f8242c2882f4 Mon Sep 17 00:00:00 2001 From: Altan Orhon Date: Wed, 4 Oct 2023 11:52:18 -0700 Subject: [PATCH] Removed Python code --- hyakvnc/__init__.py | 4 - hyakvnc/__main__.py | 429 ------------------------------ hyakvnc/apptainer.py | 219 --------------- hyakvnc/config.py | 173 ------------ hyakvnc/slurmutil.py | 603 ------------------------------------------ hyakvnc/util.py | 94 ------- hyakvnc/version.py | 1 - hyakvnc/vncsession.py | 284 -------------------- pyproject.toml | 33 --- 9 files changed, 1840 deletions(-) delete mode 100644 hyakvnc/__init__.py delete mode 100644 hyakvnc/__main__.py delete mode 100644 hyakvnc/apptainer.py delete mode 100644 hyakvnc/config.py delete mode 100755 hyakvnc/slurmutil.py delete mode 100644 hyakvnc/util.py delete mode 100644 hyakvnc/version.py delete mode 100644 hyakvnc/vncsession.py delete mode 100644 pyproject.toml diff --git a/hyakvnc/__init__.py b/hyakvnc/__init__.py deleted file mode 100644 index 05267d8..0000000 --- a/hyakvnc/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -import logging - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) diff --git a/hyakvnc/__main__.py b/hyakvnc/__main__.py deleted file mode 100644 index a1e2570..0000000 --- a/hyakvnc/__main__.py +++ /dev/null @@ -1,429 +0,0 @@ -#! /usr/bin/env python3 - -import argparse -import logging -import os -import re -import shlex -import shutil -import signal -import time -from datetime import datetime -from pathlib import Path -from typing import Optional, Union - -from hyakvnc.apptainer import ApptainerInstanceInfo -from .vncsession import HyakVncSession -from .config import HyakVncConfig -from .slurmutil import ( - wait_for_job_status, - get_job_info, - get_historical_job_infos, - cancel_job, - SbatchCommand, - get_job_status, - wait_for_job_running, -) -from .util import wait_for_file, repeat_until -from .version import VERSION -from . import logger - -app_config = None -# Record time app started in case we need to clean up some jobs: -app_started = datetime.now() - -# Keep track of job ids so we can clean up if necessary: -app_job_ids = [] - - -def cmd_create(container_path: Union[str, Path], dry_run=False): - """ - Allocates a compute node, starts a container, and launches a VNC session on it. - :param container_path: Path to container to run - :param dry_run: Whether to do a dry run (do not actually submit job) - :return: None - """ - - def kill_self(sig=signal.SIGTERM): - os.kill(os.getpid(), sig) - - def cancel_created_jobs(): - for x in app_job_ids: - logger.info(f"Cancelling job {x}") - try: - cancel_job(x) - except (ValueError, RuntimeError): - logger.error(f"Could not cancel job {x}") - else: - logger.info(f"Cancelled job {x}") - - def create_node_signal_handler(signal_number, frame): - """ - Pass SIGINT to subprocess and exit program. - """ - logger.debug(f"hyakvnc create: Caught signal: {signal_number}. Cancelling jobs: {app_job_ids}") - cancel_created_jobs() - exit(1) - - signal.signal(signal.SIGINT, create_node_signal_handler) - signal.signal(signal.SIGTSTP, create_node_signal_handler) - signal.signal(signal.SIGTERM, create_node_signal_handler) - - container_path = Path(container_path) - container_name = container_path.stem - - if not container_path.is_file(): - container_path = str(container_path) - assert re.match( - r"(?Plibrary|docker|shub|oras)://(?P.*)", container_path - ), f"Container path {container_path} is not a valid URI" - - else: - container_path = container_path.expanduser() - assert container_path.exists(), f"Container path {container_path} does not exist" - assert container_path.is_file(), f"Container path {container_path} is not a file" - - cmds = ["sbatch", "--parsable", "--job-name", app_config.job_prefix + "-" + container_name] - - cmds += ["--output", app_config.sbatch_output_path] - - sbatch_opts = { - "parsable": None, - "job_name": app_config.job_prefix + "-" + container_name, - "output": app_config.sbatch_output_path, - } - if app_config.account: - sbatch_opts["account"] = app_config.account - if app_config.partition: - sbatch_opts["partition"] = app_config.partition - if app_config.cluster: - sbatch_opts["clusters"] = app_config.cluster - if app_config.gpus: - sbatch_opts["gpus"] = app_config.gpus - if app_config.timelimit: - sbatch_opts["time"] = app_config.timelimit - if app_config.mem: - sbatch_opts["mem"] = app_config.mem - if app_config.cpus: - sbatch_opts["cpus_per_task"] = app_config.cpus - - # Set up the environment variables to pass to Apptainer: - apptainer_env_vars_quoted = [f"{k}={shlex.quote(v)}" for k, v in app_config.apptainer_env_vars.items()] - apptainer_env_vars_string = "" if not apptainer_env_vars_quoted else (" ".join(apptainer_env_vars_quoted) + " ") - - # Template to name the apptainer instance: - apptainer_instance_name = f"{app_config.apptainer_instance_prefix}-$SLURM_JOB_ID-{container_name}" - # Command to start the apptainer instance: - apptainer_cmd = ( - "apptainer instance start " - + str(container_path) - + " " - + str(apptainer_instance_name) - + " && while true; do sleep 2; done" - ) - - # Command to start the apptainer instance and keep it running: - apptainer_cmd_with_rest = apptainer_env_vars_string + apptainer_cmd - - # The sbatch wrap functionality allows submitting commands without an sbatch script:t - sbatch_opts["wrap"] = apptainer_cmd_with_rest - - sbatch_command = SbatchCommand(sbatch_options=sbatch_opts) - - if dry_run: - print(f"Would have launched sbatch process with command:\n\t{' '.join(sbatch_command.command_list)}") - exit(0) - - job_id = None - try: - job_id, job_cluster = sbatch_command() - app_job_ids.append(job_id) - except RuntimeError as e: - logger.error(f"Could not submit sbatch job: {e}") - kill_self() - - logger.info( - f"Launched sbatch job {job_id} with account {app_config.account} on partition {app_config.partition}. Waiting for job to start running" - ) - - if not wait_for_job_running( - job_id, timeout=app_config.sbatch_post_timeout, poll_interval=app_config.sbatch_post_poll_interval - ): - logger.error(f"Job {job_id} did not start running within {app_config.sbatch_post_timeout} seconds") - try: - job = get_historical_job_infos(job_id=job_id) - except (LookupError, RuntimeError) as e: - logger.error(f"Could not get historical info for job {job_id}: {e}") - else: - if job and len(job) > 0: - job = job[0] - state = job.state - logger.warning(f"Job {job_id} was last in state ({state})") - finally: - cancel_created_jobs() - kill_self() - - real_instance_name = f"{app_config.apptainer_instance_prefix}-{job_id}-{container_name}" - job = get_job_info(job_id=job_id) - instance_file = ( - Path(app_config.apptainer_config_dir) - / "instances" - / "app" - / job.node_list[0] - / job.user_name - / real_instance_name - / f"{real_instance_name}.json" - ).expanduser() - - logger.info(f"Job is running on nodes {job.node_list}. Waiting for Apptainer instance to start running.") - if not wait_for_file(str(instance_file), timeout=app_config.sbatch_post_timeout): - logger.error(f"Could not find instance file at {instance_file} before timeout") - kill_self() - logger.info("Apptainer instance started running. Waiting for VNC session to start") - time.sleep(5) - try: - instance_info = ApptainerInstanceInfo.from_json(instance_file) - sesh = HyakVncSession(job_id, instance_info, app_config) - except (ValueError, FileNotFoundError, RuntimeError) as e: - logger.error("Could not parse instance file: {instance_file}") - kill_self() - else: - time.sleep(1) - try: - sesh.parse_vnc_info() - except RuntimeError as e: - logger.error(f"Could not parse VNC info: {e}") - sesh.stop() - kill_self() - time.sleep(1) - if Path(sesh.vnc_log_file_path).expanduser().is_file(): - if not Path(sesh.vnc_pid_file_path).expanduser().is_file(): - logger.error(f"Could not find PID file for job at {sesh.vnc_pid_file_path}") - with open(sesh.vnc_log_file_path, "r") as f: - log_contents = f.read() - logger.error(f"VNC session for SLURM job {job_id} failed to start. Log contents:\n{log_contents}") - sesh.stop() - kill_self() - if not sesh.wait_until_alive(timeout=app_config.sbatch_post_timeout): - logger.error(f"VNC session for SLURM job {job_id} doesn't seem to be alive") - sesh.stop() - kill_self() - print_connection_string(session=sesh) - exit(0) - - -def cmd_stop(job_id: Optional[int] = None, stop_all: bool = False): - assert (job_id is not None) ^ (stop_all), "Must specify either a job id or stop all" - vnc_sessions = HyakVncSession.find_running_sessions(app_config, job_id=job_id) - for sesh in vnc_sessions: - sesh.stop() - print(f"Canceled job {sesh.job_id}") - - -def cmd_status(): - def signal_handler(signal_number, frame): - exit(1) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTSTP, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - logger.info("Finding running VNC jobs...") - - vnc_sessions = HyakVncSession.find_running_sessions(app_config) - if len(vnc_sessions) == 0: - logger.info("No running VNC jobs found") - else: - logger.info(f"Found {len(vnc_sessions)} running VNC jobs:") - for session in vnc_sessions: - print( - f"Session {session.apptainer_instance_info.name} running as", - f"SLURM job {session.job_id} with VNC on port {session.vnc_port}", - ) - - -def print_connection_string( - job_id: Optional[int] = None, session: Optional[HyakVncSession] = None, platform: Optional[str] = None -): - def signal_handler(signal_number, frame): - exit(1) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTSTP, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - assert (job_id is not None) ^ (session is not None), "Must specify either a job id or session" - - if job_id: - sessions = HyakVncSession.find_running_sessions(app_config, job_id=job_id) - if len(sessions) == 0: - logger.error(f"Could not find session with job id {job_id}") - return None - session = sessions[0] - if not sessions: - logger.error(f"Could not find session with job id {job_id}") - return None - - strings = session.get_connection_strings() - if not strings: - logger.error("Could not find connection strings for job id {job_id}") - return None - - manual = strings.pop("manual", None) - terminal_width, terminal_height = shutil.get_terminal_size() - line_width = max(1, terminal_width - 2) - - os_instructions_v = [f"# {v.get('title', '')}:\n\t{v.get('instructions', '')}" for v in strings.values()] - print("=" * line_width) - print("NOTE: The default VNC password is 'password'.\n") - if len(os_instructions_v) > 0: - os_instructions = ("\n\n" + ("-" * (line_width // 2)) + "\n\n").join(os_instructions_v) - print(f"Copy and paste the generated command into your terminal depending on your operating system.") - print() - print(os_instructions) - print("\n") - - if manual: - print("-" * line_width) - print(f"If you need to connect by another method, use the following information:\n\n") - print(manual.get("instructions", "")) - print("\n") - print("=" * line_width) - - -def print_config(): - print(app_config.to_json()) - - -def create_arg_parser(): - parser = argparse.ArgumentParser(description="HyakVNC: VNC on Hyak", prog="hyakvnc") - subparsers = parser.add_subparsers(dest="command") - - # general arguments - parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="Enable debug logging") - - parser.add_argument( - "-v", "--version", dest="print_version", action="store_true", help="Print program version and exit" - ) - - # command: create - parser_create = subparsers.add_parser("create", help="Create VNC session") - parser_create.add_argument("--dry-run", dest="dry_run", action="store_true", help="Dry run (do not submit job)") - parser_create.add_argument( - "-p", "--partition", dest="partition", metavar="", help="Slurm partition", type=str - ) - parser_create.add_argument("-A", "--account", dest="account", metavar="", help="Slurm account", type=str) - parser_create.add_argument( - "--timeout", - dest="timeout", - metavar="", - help="[default: 120] Slurm node allocation and VNC startup timeout length (in seconds)", - default=120, - type=int, - ) - parser_create.add_argument( - "-t", "--time", dest="time", metavar="", help="Subnode reservation time (in hours)", type=int - ) - parser_create.add_argument( - "-c", "--cpus", dest="cpus", metavar="", help="Subnode cpu count", default=1, type=int - ) - parser_create.add_argument( - "-G", "--gpus", dest="gpus", metavar="[type:]", help="Subnode gpu count", default="0", type=str - ) - parser_create.add_argument( - "--mem", dest="mem", metavar="", help="Subnode memory amount with units", type=str - ) - parser_create.add_argument( - "--container", - dest="container", - metavar="", - help="Path to VNC Apptainer/Singularity Container (.sif)", - required=True, - type=str, - ) - - # status command - parser_status = subparsers.add_parser( # noqa: F841 - "status", help="Print details of all VNC jobs with given job name and exit" - ) - - # kill command - parser_stop = subparsers.add_parser("stop", help="Stop specified job") - - parser_stop.add_argument( - "job_id", metavar="", help="Kill specified VNC session, cancel its VNC job, and exit", type=int - ) - - subparsers.add_parser("stop-all", help="Stop all VNC sessions and exit") # noqa: F841 - subparsers.add_parser("print-config", help="Print app configuration and exit") - parser_print_connection_string = subparsers.add_parser( - "print-connection-string", help="Print connection string for job and exit" - ) - parser_print_connection_string.add_argument( - "job_id", metavar="", help="Job ID of session to connect to", type=int - ) - return parser - - -arg_parser = create_arg_parser() -args = arg_parser.parse_args() - -os.environ.setdefault("HYAKVNC_LOG_LEVEL", "INFO") -if args.debug: - os.environ["HYAKVNC_LOG_LEVEL"] = "DEBUG" - -log_level = logging.__dict__.get(os.getenv("HYAKVNC_LOG_LEVEL").upper(), logging.INFO) - -log_handler_console = logging.StreamHandler() -log_handler_console.setFormatter(logging.Formatter("%(levelname)s - %(message)s")) -log_handler_console.setLevel(log_level) -logger.addHandler(log_handler_console) -app_config = HyakVncConfig.load_app_config() - - -def main(): - if args.print_version: - print(VERSION) - exit(0) - - # Check SLURM version and print a warning if it's not 22.x: - # check_slurm_version() - - if args.command == "create": - if args.mem: - app_config.mem = args.mem - if args.cpus: - app_config.cpus = args.cpus - if args.time: - app_config.timelimit = f"{args.time}:00:00" - if args.gpus: - app_config.gpus = args.gpus - if args.timeout: - app_config.sbatch_post_timeout = float(args.timeout) - try: - cmd_create(args.container, dry_run=args.dry_run) - except (TimeoutError, RuntimeError) as e: - logger.error(f"Error: {e}") - exit(1) - - elif args.command == "status": - cmd_status() - - elif args.command == "stop": - cmd_stop(args.job_id) - - elif args.command == "stop-all": - cmd_stop(stop_all=True) - - elif args.command == "print-connection-string": - print_connection_string(args.job_id) - - elif args.command == "print-config": - print_config() - - else: - arg_parser.print_help() - - -if __name__ == "__main__": - main() diff --git a/hyakvnc/apptainer.py b/hyakvnc/apptainer.py deleted file mode 100644 index 169619c..0000000 --- a/hyakvnc/apptainer.py +++ /dev/null @@ -1,219 +0,0 @@ -import base64 -import json -import subprocess -import sys -from pathlib import Path -from typing import Union, Dict, Any, Optional, List -import re -import inspect - - -class ApptainerInstanceInfo: - def __init__( - self, - pid: int, - name: str, - image: Union[str, Path], - logErrPath: Union[str, Path], - logOutPath: Union[str, Path], - instance_metadata_path: Optional[Union[str, Path]] = None, - instance_host: Optional[str] = None, - ppid: Optional[int] = None, - user: Optional[str] = None, - userns: Optional[bool] = None, - cgroup: Optional[bool] = None, - ip: Optional[str] = None, - checkpoint: Optional[str] = None, - config: Optional[Dict[str, Any]] = None, - ): - """ - Information about an apptainer instance. - :param pid: pid of the instance - :param name: name of the instance - :param image: image of the instance - :param logErrPath: path to stderr log of the instance - :param logOutPath: path to stdout log of the instance - :param instance_metadata_path: path to instance metadata file - :param instance_host: host of the instance - :param ppid: parent pid of the instance - :param user: user of the instance - :param userns: whether userns is enabled for the instance - :param cgroup: whether cgroup is enabled for the instance - :param ip: ip address of the instance - :param checkpoint: checkpoint of the instance - :param config: config of the instance - """ - self.pid = pid - self.ppid = ppid - self.name = name - self.user = user - self.image = image - self.userns = userns - self.cgroup = cgroup - self.ip = ip - self.logErrPath = Path(logErrPath) if logErrPath else None - self.logOutPath = Path(logOutPath) if logOutPath else None - self.checkpoint = checkpoint - self.config = config - self.instance_metadata_path = Path(instance_metadata_path) if instance_metadata_path else None - self.instance_host = instance_host - - p = self.logOutPath or self.logErrPath - if p: - if p.match("instances/logs/*/*/*") and len(p.parts) >= 5: - ps = p.parts[-3:-1] - if not self.instance_host: - self.instance_host = ps[0] if not self.instance_host else self.instance_host - if not self.user: - self.user = ps[1] - if not self.instance_metadata_path: - new_parts = list(p.parent.parts) - if new_parts[-3] == "logs" and new_parts[-4] == "instances": - new_parts[-3] = "app" - new_parts += [self.name, self.name + ".json"] - candidate_instance_metadata_path = Path(*new_parts) - if candidate_instance_metadata_path.is_file(): - self.instance_metadata_path = candidate_instance_metadata_path - - def __str__(self): - return repr(self) - - def __repr__(self): - d = self.__dict__.copy() - for k, v in d.items(): - if isinstance(v, Path): - d[k] = str(v) - s = ", ".join([f"{k}={repr(v)}" for k, v in d.items() if v is not None]) - return f"ApptainerInstanceInfo({s})" - - @staticmethod - def from_json(path: Union[str, Path], read_config: bool = False) -> "ApptainerInstanceInfo": - """ - Loads a ApptainerInstanceInfo from a JSON file. - :param path: path to JSON file - :param read_config: whether to read the config from the JSON file - :return: ApptainerInstanceInfo loaded from JSON file - :raises ValueError: if the JSON file is invalid - :raises FileNotFoundError: if the JSON file does not exist - """ - path = Path(path).expanduser() - try: - with open(path, "r") as f: - contents = json.load(f) - if not read_config: - contents.pop("config", None) - else: - if "config" in contents: - contents["config"] = json.loads(base64.b64decode(contents["config"]).decode("utf-8")) - contents["instance_metadata_path"] = str(path) - contents["name"] = str(path.stem) - - args_to_use = inspect.getfullargspec(ApptainerInstanceInfo.__init__).args - dct = {k: v for k, v in contents.items() if k in args_to_use} - return ApptainerInstanceInfo(**dct) - except (json.JSONDecodeError, ValueError, TypeError) as e: - raise ValueError(f"Cannot create from JSON: {path} due to {e}") - except FileNotFoundError as e: - raise FileNotFoundError(f"Could not find instance file: {path} due to {e}") - - @staticmethod - def from_apptainer_instance_list_json(s: str, instance_host: Optional[str] = None) -> List["ApptainerInstanceInfo"]: - """ - Loads a ApptainerInstanceInfo from a JSON description. - :param s: JSON description - :param instance_host: host of the instance - """ - try: - contents = json.loads(s, strict=False) - instances = [ - ApptainerInstanceInfo( - pid=int(instance_dct["pid"]), - name=instance_dct["instance"], - image=instance_dct["img"], - logErrPath=instance_dct["logErrPath"], - logOutPath=instance_dct["logOutPath"], - ip=instance_dct.get("ip", None), - instance_host=instance_host, - ) - for instance_dct in contents["instances"] - ] - return instances - except (json.JSONDecodeError, ValueError, TypeError) as e: - raise ValueError(f"Cannot create from JSON: {s} due to {e}") - - -def apptainer_instance_list( - slurm_job_id: Optional[int] = None, host: Optional[str] = None -) -> List[ApptainerInstanceInfo]: - """ - Lists all apptainer instances locally or running on a slurm job. - """ - cmdv = list() - if slurm_job_id: - cmdv += ["srun", "--jobid", str(slurm_job_id)] - elif host: - cmdv += ["ssh", host] - cmdv += ["apptainer", "instance", "list", "--json"] - res = subprocess.run( - cmdv, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - universal_newlines=True, - encoding=sys.getdefaultencoding(), - ) - if res.returncode != 0 or not res.stdout: - return list() - instances = ApptainerInstanceInfo.from_apptainer_instance_list_json(res.stdout) - return instances - - -def apptainer_instance_stop( - instance: Union[str, None] = None, - stop_all: bool = False, - force: bool = False, - signal_to_send: Optional[str] = None, - timeout: Optional[int] = None, - slurm_job_id: Optional[int] = None, - host: Optional[str] = None, -): - cmdv = list() - if slurm_job_id: - cmdv += ["srun", "--jobid", str(slurm_job_id)] - elif host: - cmdv += ["ssh", host] - cmdv += ["apptainer", "instance", "stop"] - if force: - cmdv += ["--force"] - if signal_to_send: - cmdv += ["--signal", signal_to_send] - if timeout: - cmdv += ["--timeout", str(timeout)] - if stop_all: - cmdv += ["--all"] - else: - if instance: - cmdv += [instance] - else: - raise ValueError("Must specify either `instance` or `all`") - - res = subprocess.run( - cmdv, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - encoding=sys.getdefaultencoding(), - ) - if res.returncode != 0: - return list() - - if res.stderr: - stopped = list() - for line in res.stderr.splitlines(): - m = re.match(r".*INFO:\s*Stopping (?P\S+) instance of (?P\S+) \(PID=(?P\d+)\)", line) - if m: - d = m.groupdict() - d["pid"] = int(d["pid"]) - stopped.append(d) - return stopped diff --git a/hyakvnc/config.py b/hyakvnc/config.py deleted file mode 100644 index 38bd44a..0000000 --- a/hyakvnc/config.py +++ /dev/null @@ -1,173 +0,0 @@ -import json -import logging -import os -from pathlib import Path -from typing import Optional, Iterable, Union, Dict - -from . import logger -from .slurmutil import get_default_cluster, get_default_account, get_default_partition - - -def get_first_env(env_vars: Iterable[str], default: Optional[str] = None, allow_blank: bool = False) -> str: - """ - Gets the first environment variable that is set, or the default value if none are set. - :param env_vars: list of environment variables to check - :param default: default value to return if no environment variables are set - :param allow_blank: whether to allow blank environment variables - :return: the first environment variable that is set, or the default value if none are set - """ - logger.debug(rf"Checking environment variables {env_vars}") - for x in env_vars: - res = os.environ.get(x, None) - if res is not None: - if allow_blank or res: - logger.debug(rf"Using environment variable {x}={res}") - return res - logger.debug(rf"Using default value {default}") - return default - - -class HyakVncConfig: - """ - Configuration for hyakvnc. - """ - - def __init__( - self, - job_prefix: str = "hyakvnc", # prefix for job names - log_path: str = "~/.hyakvnc.log", # path to log file - apptainer_bin: str = "apptainer", # path to apptainer binary - apptainer_config_dir: str = "~/.apptainer", # directory where apptainer config files are stored - apptainer_instance_prefix: str = "hyakvnc", # prefix for apptainer instance names - apptainer_use_writable_tmpfs: bool = True, # whether to mount a writable tmpfs for apptainer instances - apptainer_cleanenv: bool = True, # whether to use clean environment for apptainer instances - apptainer_set_bind_paths: str = None, # comma-separated list of paths to bind mount for apptainer instances - apptainer_env_vars: Dict[str, str] = None, # environment variables to set for apptainer instances - sbatch_post_timeout: float = 120.0, # timeout for waiting for sbatch to return - sbatch_post_poll_interval: float = 1.0, # poll interval for waiting for sbatch to return - sbatch_output_path: Optional[str] = None, # path to write sbatch output to - ssh_host: Optional[ - str - ] = "klone.hyak.uw.edu", # intermediate host address between local machine and compute node - account: Optional[str] = None, # account to use for sbatch jobs | -A, --account, SBATCH_ACCOUNT - partition: Optional[str] = None, # partition to use for sbatch jobs | -p, --partition, SBATCH_PARTITION - cluster: Optional[str] = None, # cluster to use for sbatch jobs | --clusters, SBATCH_CLUSTERS - gpus: Optional[str] = None, # number of gpus to use for sbatch jobs | -G, --gpus, SBATCH_GPUS - timelimit: Optional[str] = None, # time limit for sbatch jobs | --time, SBATCH_TIMELIMIT - mem: Optional[str] = None, # memory limit for sbatch jobs | --mem, SBATCH_MEM - cpus: Optional[ - int - ] = 4, # number of cpus to use for sbatch jobs | -c, --cpus-per-task (not settable by env var) - ): - self.log_path = str(Path(log_path).expanduser()) - log_handler_file = logging.FileHandler(self.log_path, mode="a") - log_handler_file.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s %(message)s")) - log_handler_file.setLevel(logging.DEBUG) - logger.addHandler(log_handler_file) - logger.debug("Loading config") - self.job_prefix = job_prefix - self.apptainer_bin = apptainer_bin - - self.cluster = cluster - if self.cluster: - logger.debug(rf"Using cluster {self.cluster}") - if not self.cluster: - self.cluster = cluster or get_first_env( - ["HYAKVNC_SLURM_CLUSTER", "SBATCH_CLUSTER"], default=get_default_cluster() - ) - self.account = account or get_first_env( - ["HYAKVNC_SLURM_ACCOUNT", "SBATCH_ACCOUNT"], get_default_account(cluster=self.cluster) - ) - self.partition = partition or get_first_env( - ["HYAKVNC_SLURM_PARTITION", "SBATCH_PARTITION"], - get_default_partition(cluster=self.cluster, account=self.account), - ) - self.gpus = gpus or get_first_env(["HYAKVNC_SLURM_GPUS", "SBATCH_GPUS"], None) - self.timelimit = timelimit or get_first_env(["HYAKVNC_SLURM_TIMELIMIT", "SBATCH_TIMELIMIT"], "1:00:00") - self.mem = mem or get_first_env(["HYAKVNC_SLURM_MEM", "SBATCH_MEM"], "2G") - self.cpus = int(cpus or get_first_env(["HYAKVNC_SLURM_CPUS", "SBATCH_CPUS_PER_TASK"], default="2")) - self.sbatch_output_path = sbatch_output_path or get_first_env( - ["HYAKVNC_SBATCH_OUTPUT_PATH", "SBATCH_OUTPUT"], "/dev/stdout" - ) - self.apptainer_config_dir = apptainer_config_dir - self.apptainer_instance_prefix = apptainer_instance_prefix - self.apptainer_use_writable_tmpfs = apptainer_use_writable_tmpfs - self.apptainer_cleanenv = apptainer_cleanenv - self.apptainer_set_bind_paths = apptainer_set_bind_paths - self.sbatch_post_timeout = sbatch_post_timeout - self.sbatch_post_poll_interval = sbatch_post_poll_interval - self.ssh_host = ssh_host - - self.apptainer_env_vars = apptainer_env_vars or dict() - all_apptainer_env_vars = { - x: os.environ.get(x, "") - for x in os.environ.keys() - if x.startswith("APPTAINER_") - or x.startswith("APPTAINERENV_") - or x.startswith("SINGULARITY_") - or x.startswith("SINGULARITYENV_") - } - self.apptainer_env_vars.update(all_apptainer_env_vars) - - if self.apptainer_use_writable_tmpfs: - self.apptainer_env_vars["APPTAINER_WRITABLE_TMPFS"] = "1" if self.apptainer_use_writable_tmpfs else "0" - - if self.apptainer_cleanenv: - self.apptainer_env_vars["APPTAINER_CLEANENV"] = "1" if self.apptainer_cleanenv else "0" - - if self.apptainer_set_bind_paths: - self.apptainer_env_vars["APPTAINER_BINDPATH"] = self.apptainer_set_bind_paths - - def to_json(self) -> str: - """ - Converts this configuration to a JSON string. - :return: JSON string representation of this configuration - """ - return json.dumps({k: v for k, v in self.__dict__.items() if v is not None}) - - @staticmethod - def from_json(path: Union[str, Path]) -> "HyakVncConfig": - """ - Loads a HyakVncConfig from a JSON file. - :param path: path to JSON file - :return: HyakVncConfig loaded from JSON file - """ - if not Path(path).is_file(): - raise RuntimeError(f"Invalid path to configuration file: {path}") - - try: - with open(path, "r") as f: - contents = json.load(f) - return HyakVncConfig(**contents) - except (json.JSONDecodeError, ValueError, TypeError) as e: - raise RuntimeError(f"Invalid JSON in configuration file: {path}") from e - - def __str(self): - return self.to_json() - - def __repr__(self): - return self.to_json() - - @staticmethod - def load_app_config(path: Optional[Union[str, Path]] = None) -> "HyakVncConfig": - """ - Loads a HyakVncConfig from a path to a JSON file. If the path is not specified, the default path is used. - The default path can be modified with the HYAKVNC_CONFIG_PATH environment variable; otherwise, - it is "~/.config/hyakvnc/config.json". If it cannot load either file, it returns a default configuration. - - :param path: path to JSON file - :return: HyakVncConfig loaded from JSON file - """ - paths = [] - if path: - path = Path(path).expanduser() - paths += [path] - default_path = Path(os.environ.setdefault("HYAKVNC_CONFIG_PATH", "~/.config/hyakvnc/config.json")).expanduser() - paths += [default_path] - for p in paths: - if p.is_file(): - try: - return HyakVncConfig.from_json(path=p) - except Exception as e: - logger.debug(f"Could not load config from {p}: {e}") - return HyakVncConfig() diff --git a/hyakvnc/slurmutil.py b/hyakvnc/slurmutil.py deleted file mode 100755 index 84e95ac..0000000 --- a/hyakvnc/slurmutil.py +++ /dev/null @@ -1,603 +0,0 @@ -import os -import subprocess -import sys -import time -from datetime import datetime, timedelta -from typing import Optional, Union, List, Dict, Tuple - -from . import logger - - -def get_default_cluster() -> str: - """ - Gets the default SLURM cluster. - :return: the default SLURM cluster - :raises LookupError: if no default cluster could be found - """ - cmd = "sacctmgr show cluster -nPs format=Cluster".split() - res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - if res.returncode == 0: - clusters = res.stdout.splitlines() - if clusters: - return clusters[0] - raise LookupError("Could not find default cluster") - - -def get_default_account(user: Optional[str] = None, cluster: Optional[str] = None) -> str: - """ - Gets the default SLURM account for the specified user on the specified SLURM cluster. - :param user: User to get default account for - :param cluster: SLURM cluster to get default account for - :return: the default SLURM account for the specified user on the specified cluster - :raises LookupError: if no default account could be found - """ - user = user or os.getlogin() - cluster = cluster or get_default_cluster() - - cmd = f"sacctmgr show user -nPs {user} format=defaultaccount where cluster={cluster}" - res = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - if res.returncode == 0: - accounts = res.stdout.splitlines() - for x in accounts: - if x: - return x - raise LookupError(f"Could not find default account for user '{user}' on cluster '{cluster}'") - - -def get_partitions( - user: Optional[str] = None, account: Optional[str] = None, cluster: Optional[str] = None -) -> List[str]: - """ - Gets the SLURM partitions for the specified user and account on the specified cluster. - - :param user: user to get partitions for - :param account: SLURM account to get partitions for - :param cluster: SLURM cluster to get partitions for - :return: the SLURM partitions for the specified user and account on the specified cluster - :raises LookupError: if no partitions could be found - """ - user = user or os.getlogin() - cluster = cluster or get_default_cluster() - account = account or get_default_account(user=user, cluster=cluster) - cmd = f"sacctmgr show -nPs user {user} format=qos where account={account} cluster={cluster}" - res = subprocess.run( - cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True - ).stdout.splitlines() - for partitions in res: - if partitions: - return sorted([x.strip(f"{account}-") for x in partitions.split(",")]) - else: - raise LookupError(f"Could not find partitions for user '{user}' and account '{account}' on cluster '{cluster}'") - - -def get_default_partition( - user: Optional[str] = None, account: Optional[str] = None, cluster: Optional[str] = None -) -> str: - """ - Gets the default SLURM partition for the specified user and account on the specified cluster. - - :param user: user to get partitions for - :param account: SLURM account to get partitions for - :param cluster: SLURM cluster to get partitions for - :return: the partition for the specified user and account on the specified cluster - :raises LookupError: if no partitions could be found - """ - partitions = get_partitions(user=user, account=account, cluster=cluster) - for p in partitions: - if p: - return p - raise LookupError( - f"Could not find default partition for user '{user}' and account '{account}' on cluster '{cluster}'" - ) - - -def node_range_to_list(s: str) -> List[str]: - """ - Converts a node range to a list of nodes. - :param s: node range - :return: list of SLURM nodes - :raises ValueError: if the node range could not be converted to a list of nodes - """ - cmds = ["scontrol", "show", "hostnames", s] - output = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - if output.returncode != 0: - raise ValueError(f"Could not convert node range '{s}' to list of nodes:\n{output.stderr}") - return output.stdout.rstrip().splitlines() - - -class SlurmJobInfo: - fields: Dict[str, Dict[str, Union[str, Dict[str, str]]]] = { - "job_id": {"squeue_field": "%i", "sacct_field": "JobID"}, - "job_name": {"squeue_field": "%j", "sacct_field": "JobName"}, - "account": {"squeue_field": "%a", "sacct_field": "Account"}, - "partition": {"squeue_field": "%P", "sacct_field": "Partition"}, - "user_name": {"squeue_field": "%u", "sacct_field": "User"}, - "state": {"squeue_field": "%T", "sacct_field": "State"}, - "time_used": {"squeue_field": "%M", "sacct_field": "Elapsed"}, - "time_limit": {"squeue_field": "%l", "sacct_field": "Timelimit"}, - "cpus": {"squeue_field": "%C", "sacct_field": "AllocCPUS"}, - "min_memory": {"squeue_field": "%m", "sacct_field": "ReqMem"}, - "num_nodes": {"squeue_field": "%D", "sacct_field": "NNodes"}, - "node_list": {"squeue_field": "%N", "sacct_field": "NodeList"}, - "command": {"squeue_field": "%o", "sacct_field": "SubmitLine"}, - } - - def __init__( - self, - job_id: int = None, - job_name: str = None, - account: str = None, - partition: str = None, - user_name: str = None, - state: str = None, - time_used: str = None, - time_limit: str = None, - cpus: int = None, - min_memory: str = None, - num_nodes: int = None, - node_list: str = None, - command: str = None, - ): - self.job_id = job_id - self.job_name = job_name - self.account = account - self.partition = partition - self.user_name = user_name - self.state = state - self.time_used = time_used - self.time_limit = time_limit - self.cpus = cpus - self.min_memory = min_memory - self.num_nodes = num_nodes - self.node_list = node_list - self.command = command - - @staticmethod - def from_squeue_line(line: str, field_order=None, delimiter: Optional[str] = None) -> "SlurmJobInfo": - """ - Creates a SlurmJobInfo from an squeue command - :param line: output line from squeue command - :param field_order: order of fields in line (defaults to order in SlurmJobInfo) - :param delimiter: delimiter for fields in line - :return: SlurmJobInfo created from line - """ - - valid_field_names = list(SlurmJobInfo.fields.keys()) - if field_order is None: - field_order = valid_field_names - - if delimiter is None: - all_fields_dict = {field_order[i]: x for i, x in enumerate(line.split())} - else: - all_fields_dict = {field_order[i]: x for i, x in enumerate(line.split(delimiter))} - - field_dict = {k: v for k, v in all_fields_dict.items() if k in valid_field_names} - - try: - field_dict["job_id"] = int(field_dict["job_id"]) - except (ValueError, TypeError, KeyError): - field_dict["job_id"] = None - try: - field_dict["num_nodes"] = int(field_dict["num_nodes"]) - except (ValueError, TypeError, KeyError): - field_dict["num_nodes"] = None - - try: - field_dict["cpus"] = int(field_dict["cpus"]) - except (ValueError, TypeError, KeyError): - field_dict["cpus"] = None - - if field_dict.get("node_list") == "(null)": - field_dict["node_list"] = None - else: - try: - field_dict["node_list"] = node_range_to_list(field_dict["node_list"]) - except (ValueError, TypeError, KeyError, FileNotFoundError): - logger.debug(f"Could not convert node range '{field_dict['node_list']}' to list of nodes") - field_dict["node_list"] = None - - if field_dict.get("command") == "(null)": - field_dict["command"] = None - - return SlurmJobInfo(**field_dict) - - -def get_job_infos( - jobs: Optional[Union[int, List[int]]] = None, user: Optional[str] = os.getlogin(), cluster: Optional[str] = None -) -> Union[SlurmJobInfo, List[SlurmJobInfo], None]: - """ - Gets the specified slurm job(s). - :param user: User to get jobs for - :param jobs: Job(s) to get - :param cluster: Cluster to get jobs for - :return: the specified slurm job(s) as a SlurmJobInfo object or list of SlurmJobInfos, or None if no jobs were found - :raises LookupError: if the specified job(s) could not be found - """ - cmds: List[str] = ["squeue", "--noheader"] - if user: - cmds += ["--user", user] - if cluster: - cmds += ["--clusters", cluster] - - job_is_int = isinstance(jobs, int) - - if jobs: - if job_is_int: - jobs = [jobs] - - jobs = ",".join([str(x) for x in jobs]) - cmds += ["--jobs", jobs] - - squeue_format_fields = "\t".join([v.get("squeue_field", "") for k, v in SlurmJobInfo.fields.items()]) - cmds += ["--format", squeue_format_fields] - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=False) - if res.returncode != 0: - raise LookupError(f"Could not get slurm jobs:\n{res.stderr}") - - jobs = [SlurmJobInfo.from_squeue_line(line.strip()) for line in res.stdout.splitlines() if line.strip()] - if job_is_int: - if len(jobs) > 0: - return jobs[0] - else: - return None - else: - return jobs - - -def get_job_info(job_id: int, cluster: Optional[str] = None) -> Union[SlurmJobInfo, None]: - """ - Gets the specified SLURM job. - :param job_id: Job to get - :param cluster: Cluster to get jobs for - :return: the specified slurm job(s) as a SlurmJobInfo object - :raises LookupError: if the specified job(s) could not be found - """ - cmds: List[str] = ["squeue", "--noheader"] - if cluster: - cmds += ["--clusters", cluster] - cmds += ["--jobs", str(job_id)] - squeue_format_fields = "\t".join([v.get("squeue_field", "") for k, v in SlurmJobInfo.fields.items()]) - cmds += ["--format", squeue_format_fields] - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=False) - if res.returncode != 0: - return None - jobs = [SlurmJobInfo.from_squeue_line(line.strip()) for line in res.stdout.splitlines() if line.strip()] - job = jobs[0] if len(jobs) > 0 else None - return job - - -def get_job_status(job_id: int) -> str: - cmd = f"squeue -j {job_id} -h -o %T" - res = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) - if res.returncode != 0: - raise RuntimeError(f"Could not get status for job {job_id}:\n{res.stderr}") - return res.stdout.strip() - - -def wait_for_job_status( - job_id: int, states: List[str], timeout: Optional[float] = None, poll_interval: float = 1.0 -) -> str: - """ - Waits for the specified job to be in one of the specified states. - :param job_id: job id to wait for - :param states: list of states to wait for - :param timeout: timeout for waiting for job to be in one of the specified states - :param poll_interval: poll interval for waiting for job to be in one of the specified states - :return: True if the job is in one of the specified states, False otherwise - :raises TimeoutError: if the job is not in one of the specified states after the timeout - """ - begin_time = time.time() - assert isinstance(job_id, int), "Job id must be an integer" - assert (timeout is None) or (timeout > 0), "Timeout must be greater than zero" - assert poll_interval > 0, "Poll interval must be greater than zero" - timeout = timeout or -1.0 - while time.time() < begin_time + timeout: - res = get_job_status(job_id) - if res in states: - return res - time.sleep(poll_interval) - raise TimeoutError(f"Timed out waiting for job {job_id} to be in one of the following states: {states}") - - -slurm_states_active = { - "SIGNALING", - "CONFIGURING", - "STAGE_OUT", - "SUSPENDED", - "REQUEUE_HOLD", - "REQUEUE_FED", - "PENDING", - "RESV_DEL_HOLD", - "STOPPED", - "RUNNING", - "RESIZING", - "REQUEUED", -} -slurm_states_success = {"COMPLETED", "COMPLETING"} -slurm_states_cancelled = {"CANCELLED", "REVOKED"} -slurm_states_timeout = {"DEADLINE", "TIMEOUT"} -slurm_states_failed = {"PREEMPTED", "OUT_OF_MEMORY", "FAILED", "NODE_FAIL", "BOOT_FAIL"} - - -def wait_for_job_running(job_id: int, timeout: Optional[float] = None, poll_interval: float = 1.0) -> bool: - """ - Waits for the specified job to be in one of the specified states. - :param job_id: job id to wait for - :param timeout: timeout for waiting for job to be in one of the specified states - :param poll_interval: poll interval for waiting for job to be in one of the specified states - :return: True if the job is in one of the specified states, False otherwise - :raises TimeoutError: if the job is not in one of the specified states after the timeout - """ - begin_time = time.time() - assert isinstance(job_id, int), "Job id must be an integer" - assert (timeout is None) or (timeout > 0), "Timeout must be greater than zero" - assert poll_interval > 0, "Poll interval must be greater than zero" - timeout = timeout or -1.0 - while time.time() < begin_time + timeout: - try: - res = get_job_status(job_id) - except (RuntimeError, LookupError): - return False - else: - if res == "RUNNING": - return True - elif res not in slurm_states_active: - return False - time.sleep(poll_interval) - return False - - -def get_historical_job_infos( - after: Optional[Union[datetime, timedelta]] = None, - before: Optional[Union[datetime, timedelta]] = None, - job_id: Optional[int] = None, - user: Optional[str] = os.getlogin(), - cluster: Optional[str] = None, -) -> List[SlurmJobInfo]: - """ - Gets the slurm jobs since the specified time. - :param after: Time after which to get jobs - :param before: Time before which to get jobs - :param job_id: Job id to get - :param user: User to get jobs for - :param cluster: Cluster to get jobs for - :return: the slurm jobs since the specified time as a list of SlurmJobInfos - :raises LookupError: if the slurm jobs could not be found - """ - now = datetime.now() - assert isinstance(after, (datetime, timedelta, type(None))), "after must be a datetime or timedelta or None" - assert isinstance(before, (datetime, timedelta, type(None))), "before must be a datetime or timedelta or None" - - after_abs = now - after if isinstance(after, timedelta) else after - before_abs = now - before if isinstance(before, timedelta) else before - - cmds: List[str] = ["sacct", "--noheader", "-X", "--parsable2"] - if user: - cmds += ["--user", user] - if cluster: - cmds += ["--clusters", cluster] - if after_abs: - cmds += ["--starttime", after_abs.isoformat(timespec="seconds")] - if before_abs: - cmds += ["--endtime", before_abs.isoformat(timespec="seconds")] - if job_id: - cmds += ["--jobs", str(job_id)] - - sacct_format_fields = ",".join([v.get("sacct_field", "") for k, v in SlurmJobInfo.fields.items()]) - cmds += ["--format", sacct_format_fields] - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=False) - if res.returncode != 0: - raise LookupError(f"Could not get slurm jobs via `sacct`:\n{res.stderr}") - - jobs = [ - SlurmJobInfo.from_squeue_line(line.strip(), delimiter="|") for line in res.stdout.splitlines() if line.strip() - ] - return jobs - - -def cancel_job(job: Optional[int] = None, user: Optional[str] = os.getlogin(), cluster: Optional[str] = None): - """ - Cancels the specified jobs. - :param job: Job to cancel - :param user: User to cancel jobs for - :param cluster: Cluster to cancel jobs for - :return: None - :raises ValueError: if no job, user, or cluster is specified - :raises RuntimeError: if the jobs could not be cancelled - """ - if job is None and user is None and cluster is None: - raise ValueError("Must specify at least one of job, user, or cluster") - cmds = ["scancel"] - if user: - cmds += ["--user", user] - if cluster: - cmds += ["--clusters", cluster] - if job: - cmds += [str(job)] - res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=False) - if res.returncode != 0: - raise RuntimeError(f"Could not cancel jobs with commands {cmds}: {res.stderr}") - - -def get_slurm_version_tuple(): - # Get SLURM version: - res = subprocess.run( - ["sinfo", "--version"], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False - ) - if res.returncode != 0: - raise RuntimeError(f"Could not get SLURM version:\n{res.stderr})") - try: - v = res.stdout - assert isinstance(v, str), "Could not parse SLURM version" - va = v.split(v) - assert len(va) >= 2 - vt = tuple(va[1].split(".")) - assert len(vt) >= 2 - return vt - except (ValueError, IndexError, TypeError, AssertionError): - raise RuntimeError(f"Could not parse SLURM version from string {res.stdout}") - - -sbatch_option_info = { - "account": "--account", # [charge job to specified account] - "acctg_freq": "--acctg-freq", # [job accounting and profiling sampling intervals in seconds] - "array": "--array", # [job array index values] - "batch": "--batch", # [specify a list of node constraints] - "bb": "--bb", # [burst buffer specifications] - "bbf": "--bbf", # [burst buffer specification file] - "begin": "--begin", # [defer job until HH:MM MM/DD/YY] - "chdir": "--chdir", # [set working directory for batch script] - "cluster_constraint": "--cluster-constraint", # [specify a list of cluster constraints] - "clusters": "--clusters", # [comma separated list of clusters to issue] - "comment": "--comment", # [arbitrary comment] - "constraint": "--constraint", # [specify a list of constraints] - "container": "--container", # [path to OCI container bundle] - "contiguous": "--contiguous", # [demand a contiguous range of nodes] - "core_spec": "--core-spec", # [count of reserved cores] - "cores_per_socket": "--cores-per-socket", # [number of cores per socket to allocate] - "cpu_freq": "--cpu-freq", # [requested cpu frequency (and governor)] - "cpus_per_gpu": "--cpus-per-gpu", # [number of CPUs required per allocated GPU] - "cpus_per_task": "--cpus-per-task", # [number of cpus required per task] - "deadline": "--deadline", # [remove the job if no ending possible before] - "delay_boot": "--delay-boot", # [delay boot for desired node features] - "dependency": "--dependency", # [defer job until condition on jobid is satisfied] - "distribution": "--distribution", # [distribution method for processes to nodes] - "error": "--error", # [file for batch scripts standard error] - "exclude": "--exclude", # [exclude a specific list of hosts] - "exclusive": "--exclusive", # [allocate nodes in exclusive mode when] - "export": "--export", # [specify environment variables to export] - "export_file": "--export-file", # [specify environment variables file or file] - "extra_node_info": "--extra-node-info", # [combine request of sockets per node] - "get_user_env": "--get-user-env", # [load environment from local cluster] - "gid": "--gid", # [group ID to run job as (user root only)] - "gpu_bind": "--gpu-bind", # [task to gpu binding options] - "gpu_freq": "--gpu-freq", # [frequency and voltage of GPUs] - "gpus": "--gpus", # [count of GPUs required for the job] - "gpus_per_node": "--gpus-per-node", # [number of GPUs required per allocated node] - "gpus_per_socket": "--gpus-per-socket", # [number of GPUs required per allocated socket] - "gpus_per_task": "--gpus-per-task", # [number of GPUs required per spawned task] - "gres": "--gres", # [required generic resources] - "gres_flags": "--gres-flags", # [flags related to GRES management] - "hint": "--hint", # [bind tasks according to application hints] - "hold": "--hold", # [submit job in held state] - "ignore_pbs": "--ignore-pbs", # [ignore #PBS and #BSUB options in the batch script] - "input": "--input", # [file for batch scripts standard input] - "job_name": "--job-name", # [name of job] - "kill_on_invalid_dep": "--kill-on-invalid-dep", # [terminate job if invalid dependency] - "licenses": "--licenses", # [required license, comma separated] - "mail_type": "--mail-type", # [notify on state change: BEGIN, END, FAIL or ALL] - "mail_user": "--mail-user", # [who to send email notification for job state] - "mcs_label": "--mcs-label", # [mcs label if mcs plugin mcs/group is used] - "mem": "--mem", # [minimum amount of real memory] - "mem_bind": "--mem-bind", # [bind memory to locality domains (ldom)] - "mem_per_cpu": "--mem-per-cpu", # [maximum amount of real memory per allocated] - "mem_per_cpu,__mem": "--mem-per-cpu,--mem", # [specified.] - "mem_per_gpu": "--mem-per-gpu", # [real memory required per allocated GPU] - "mincpus": "--mincpus", # [minimum number of logical processors (threads)] - "network": "--network", # [specify information pertaining to the switch or network] - "nice": "--nice", # [decrease scheduling priority by value] - "no_kill": "--no-kill", # [do not kill job on node failure] - "no_requeue": "--no-requeue", # [if set, do not permit the job to be requeued] - "nodefile": "--nodefile", # [request a specific list of hosts] - "nodelist": "--nodelist", # [request a specific list of hosts] - "nodes": "--nodes", # [number of nodes on which to run (N = min\[-max\])] - "ntasks": "--ntasks", # [number of tasks to run] - "ntasks_per_core": "--ntasks-per-core", # [number of tasks to invoke on each core] - "ntasks_per_gpu": "--ntasks-per-gpu", # [number of tasks to invoke for each GPU] - "ntasks_per_node": "--ntasks-per-node", # [number of tasks to invoke on each node] - "ntasks_per_socket": "--ntasks-per-socket", # [number of tasks to invoke on each socket] - "open_mode": "--open-mode", # [ {append|truncate} output and error file} - "output": "--output", # [file for batch scripts standard output] - "overcommit": "--overcommit", # [overcommit resources] - "oversubscribe": "--oversubscribe", # [over subscribe resources with other jobs] - "parsable": "--parsable", # [outputs only the jobid and cluster name (if present)] - "partition": "--partition", # [partition requested] - "power": "--power", # [power management options] - "prefer": "--prefer", # [features desired but not required by job] - "priority": "--priority", # [set the priority of the job to value] - "profile": "--profile", # [enable acct_gather_profile for detailed data] - "propagate": "--propagate", # [propagate all \[or specific list of\] rlimits] - "qos": "--qos", # [quality of service] - "quiet": "--quiet", # [quiet mode (suppress informational messages)] - "reboot": "--reboot", # [reboot compute nodes before starting job] - "requeue": "--requeue", # [if set, permit the job to be requeued] - "reservation": "--reservation", # [allocate resources from named reservation] - "signal": "--signal", # [@time\] send signal when time limit within time seconds] - "sockets_per_node": "--sockets-per-node", # [number of sockets per node to allocate] - "spread_job": "--spread-job", # [spread job across as many nodes as possible] - "switches": "--switches", # [{@max-time-to-wait}] - "test_only": "--test-only", # [validate batch script but do not submit] - "thread_spec": "--thread-spec", # [count of reserved threads] - "threads_per_core": "--threads-per-core", # [number of threads per core to allocate] - "time": "--time", # [time limit] - "time_min": "--time-min", # [minimum time limit (if distinct)] - "tmp": "--tmp", # [minimum amount of temporary disk] - "uid": "--uid", # [user ID to run job as (user root only)] - "use_min_nodes": "--use-min-nodes", # [if a range of node counts is given, prefer the] - "verbose": "--verbose", # [verbose mode (multiple -vs increase verbosity)] - "wait": "--wait", # [wait for completion of submitted job] - "wait_all_nodes": "--wait-all-nodes", - # [wait for all nodes to be allocated if 0 (default) or wait until all nodes ready (1)]] - "wckey": "--wckey", # [wckey to run job under] - "wrap": "--wrap", # [wrap command string in a sh script and submit] -} - - -class SbatchCommand: - def __init__( - self, - sbatch_options: Optional[Dict[str, Union[str, None]]] = None, - sbatch_args: Optional[List[str]] = None, - sbatch_executable: str = "sbatch", - ): - """ - :param sbatch_options: sbatch options - :param sbatch_args: sbatch arguments - :param sbatch_executable: sbatch executable - """ - command_list = [sbatch_executable] - for k, v in sbatch_options.items(): - if k in sbatch_option_info: - command_list += [sbatch_option_info[k]] - command_list += [v] if v is not None else [] - else: - raise KeyError(f"Unrecognized sbatch option {k}") - if sbatch_args: - command_list += sbatch_args - - self.command_list = [str(s) for s in command_list] - self.sbatch_executable = sbatch_executable - self.sbatch_options = sbatch_options - self.sbatch_args = sbatch_args - - def __call__(self, **run_kwargs) -> Tuple[int, Union[str, None]]: - """ - Submits a job to SLURM using sbatch with the list of commands specified in the constructor. - :param run_args: args to pass to subprocess.run - :param run_kwargs: kwargs to pass to subprocess.run - """ - run_kwargs = run_kwargs or dict() - run_kwargs.setdefault("stdout", subprocess.PIPE) - run_kwargs.setdefault("stderr", subprocess.PIPE) - run_kwargs.setdefault("shell", False) - run_kwargs.setdefault("universal_newlines", True) - run_kwargs.setdefault("encoding", sys.getdefaultencoding()) - logger.debug(f"Running sbatch command with args\n\t{self.command_list}") - res = subprocess.run(self.command_list, **run_kwargs) - if res.returncode != 0: - raise RuntimeError(f"Could not launch sbatch job:\n{res.stderr}") - if not res.stdout: - raise RuntimeError("No sbatch output") - try: - out = res.stdout.strip().split(";") - job_id, cluster_name = None, None - if len(out) < 1: - raise RuntimeError(f"Could not parse jobid from sbatch output: {res.stdout}") - job_id = int(out[0]) - if len(out) > 1: - cluster_name = out[1] - if len(out) > 2: - logger.warning(f"Unexpected sbatch output: {res.stdout}") - return job_id, cluster_name - except (ValueError, IndexError, TypeError, AttributeError): - raise RuntimeError(f"Could not parse jobid from sbatch output: {res.stdout}") diff --git a/hyakvnc/util.py b/hyakvnc/util.py deleted file mode 100644 index bc7c7c4..0000000 --- a/hyakvnc/util.py +++ /dev/null @@ -1,94 +0,0 @@ -import subprocess -import sys -import time -from pathlib import Path -from typing import Callable, Optional, Union, Any - - -def repeat_until( - func: Callable, - condition: Callable, - timeout: Optional[float] = None, - poll_interval: float = 1.0, - max_iter: Optional[int] = None, -) -> Union[Any, None]: - begin_time = time.time() - assert timeout is None or timeout > 0, "Timeout must be greater than zero" - assert poll_interval > 0, "Poll interval must be greater than zero" - timeout = timeout or -1.0 - i = 0 - while time.time() < begin_time + timeout: - if max_iter: - if i >= max_iter: - return None - res = func() - if condition(res): - return res - time.sleep(poll_interval) - i += 1 - return None - - -def wait_for_file(path: Union[Path, str], timeout: Optional[float] = None, poll_interval: float = 1.0): - """ - Waits for the specified file to be present. - """ - path = Path(path) - return repeat_until(lambda: path.exists(), lambda exists: exists, timeout=timeout, poll_interval=poll_interval) - - -def check_remote_pid_exists_and_port_open( - pid: int, port: int, slurm_job_id: Optional[int] = None, host: Optional[str] = None -) -> bool: - cmdv = list() - if slurm_job_id: - cmdv += ["srun", "--jobid", str(slurm_job_id)] - elif host: - cmdv += ["ssh", host] - - cmdv += ["ps", "--no-headers", "-p", str(pid), "&&", "nc", "-z", "localhost", str(port)] - res = subprocess.run( - cmdv, - shell=False, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - universal_newlines=True, - encoding=sys.getdefaultencoding(), - ) - return res.returncode == 0 - - -def check_remote_pid_exists(pid: int, slurm_job_id: Optional[int] = None, host: Optional[str] = None) -> bool: - cmdv = list() - if slurm_job_id: - cmdv += ["srun", "--jobid", str(slurm_job_id)] - elif host: - cmdv += ["ssh", host] - cmdv += ["ps", "--no-headers", "-p", str(pid)] - res = subprocess.run( - cmdv, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - encoding=sys.getdefaultencoding(), - ) - return res.returncode == 0 - - -def check_remote_port_open(port: int, slurm_job_id: Optional[int] = None, host: Optional[str] = None) -> bool: - cmdv = list() - if slurm_job_id: - cmdv += ["srun", "--jobid", str(slurm_job_id)] - elif host: - cmdv += ["ssh", host] - cmdv += ["nc", "-z", "localhost", str(port)] - res = subprocess.run( - cmdv, - shell=False, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - universal_newlines=True, - encoding=sys.getdefaultencoding(), - ) - return res.returncode == 0 diff --git a/hyakvnc/version.py b/hyakvnc/version.py deleted file mode 100644 index 9ffd7e3..0000000 --- a/hyakvnc/version.py +++ /dev/null @@ -1 +0,0 @@ -VERSION = 2.0 diff --git a/hyakvnc/vncsession.py b/hyakvnc/vncsession.py deleted file mode 100644 index a960da5..0000000 --- a/hyakvnc/vncsession.py +++ /dev/null @@ -1,284 +0,0 @@ -import pprint -import re -import time -from pathlib import Path -from typing import Optional, Union, List, Dict - -from . import logger -from .apptainer import ApptainerInstanceInfo, apptainer_instance_list, apptainer_instance_stop -from .config import HyakVncConfig -from .slurmutil import get_job_infos, cancel_job -from .util import check_remote_pid_exists_and_port_open, check_remote_pid_exists, check_remote_port_open, repeat_until - - -class HyakVncSession: - def __init__( - self, - job_id: int, - apptainer_instance_info: ApptainerInstanceInfo, - app_config: HyakVncConfig, - ): - """ - Represents a VNC instance on Hyak. - :param job_id: SLURM job ID of the instance - :param apptainer_instance_info: ApptainerInstanceInfo for the instance - :param app_config: HyakVncConfig for the instance - :raises ValueError: if the instance name cannot be parsed - :raises FileNotFoundError: if the log file cannot be found - """ - self.job_id = job_id - self.apptainer_instance_info = apptainer_instance_info - self.app_config = app_config - self.vnc_port = None - self.vnc_log_file_path = "" - self.vnc_pid_file_path = "" - - def parse_vnc_info(self) -> None: - logOutPath = self.apptainer_instance_info.logOutPath - with open(logOutPath, "r") as lfp: - contents = lfp.read() - if not contents: - raise RuntimeError(f"Log file at {logOutPath} is empty") - rfbports = re.findall(r"\s+-rfbport\s+(?P\d+)\b", contents) - if not rfbports: - raise RuntimeError(f"Could not find VNC port from log file at {logOutPath}") - try: - vnc_port = int(rfbports[-1]) - except (ValueError, IndexError, TypeError): - raise RuntimeError(f"Could not parse VNC port from log file at {logOutPath}") - - self.vnc_port = vnc_port - - vnc_log_file_paths = re.findall( - rf"(?m)Log file is\s*(?P.*/" - + rf"{self.apptainer_instance_info.instance_host}.*:{self.vnc_port}\.log)$", - contents, - ) - if not vnc_log_file_paths: - raise RuntimeError(f"Could not find VNC log file from Apptainer log file at {logOutPath}") - self.vnc_log_file_path = Path(vnc_log_file_paths[-1]).expanduser() - if not self.vnc_log_file_path.is_file(): - logger.debug(f"Could not find vnc log file at {self.vnc_log_file_path}") - self.vnc_pid_file_path = self.vnc_log_file_path.with_suffix(".pid") - - def vnc_pid_file_exists(self) -> bool: - if not self.vnc_pid_file_path: - vnc_log_file_path = self.vnc_log_file_path or "None" - logger.debug("No PID file path set. Log file: {vnc_log_file_path}") - return False - p = Path(self.vnc_pid_file_path).expanduser() - if p.is_file(): - logger.debug(f"Found PID file {self.vnc_pid_file_path}") - return True - else: - logger.debug(f"Could not find PID file {self.vnc_pid_file_path}") - return False - - def is_alive(self) -> bool: - return self.apptainer_instance_is_running() and self.port_is_open() - - def apptainer_instance_is_running(self) -> bool: - running = check_remote_pid_exists(slurm_job_id=self.job_id, pid=self.apptainer_instance_info.pid) - if not running: - logger.debug( - f"Instance {self.apptainer_instance_info.name} is not running (pid {self.apptainer_instance_info.pid} not found)" - ) - return False - else: - logger.debug( - f"Instance {self.apptainer_instance_info.name} is running (pid {self.apptainer_instance_info.pid} found)" - ) - return True - - def wait_until_alive(self, timeout: Optional[float] = 300.0, poll_interval: float = 1.0): - """ - Waits until the session is alive. - """ - return repeat_until(lambda: self.is_alive(), lambda alive: alive, timeout=timeout, poll_interval=poll_interval) - - def port_is_open(self) -> bool: - if not self.vnc_port: - logger.debug(f"Could not find VNC port for session {self.apptainer_instance_info.name}. Port is not open.") - return False - if not check_remote_port_open(slurm_job_id=self.job_id, port=self.vnc_port): - logger.debug(f"Session {self.apptainer_instance_info.name} does not have an open port on {self.vnc_port}") - return False - else: - logger.debug(f"Session {self.apptainer_instance_info.name} has an open port on {self.vnc_port}") - return True - - def get_connection_strings(self, debug: Optional[bool] = False) -> Dict[str, Dict[str, str]]: - generators = { - "linux": LinuxConnectionStringGenerator, - "macos": MacOsConnectionStringGenerator, - "manual": ManualConnectionStringGenerator, - } - result = { - k: { - "title": g.title, - "instructions": str( - g(self.app_config.ssh_host, self.apptainer_instance_info.instance_host, self.vnc_port) - ), - } - for k, g in generators.items() - } - return result - - def stop(self) -> None: - if not self.job_id: - raise ValueError("Could not find job ID") - logger.info(f"Cancelling job {self.job_id}") - logger.info(f"Stopping Apptainer instance {self.apptainer_instance_info.name} on job {self.job_id}") - apptainer_instance_stop(instance=self.apptainer_instance_info.name, slurm_job_id=self.job_id) - logger.info(f"Stopping SLURM job {self.job_id}") - cancel_job(self.job_id) - logger.info(f"Job {self.job_id} cancelled") - if Path(self.vnc_pid_file_path).expanduser().is_file(): - logger.info(f"Removing PID file {self.vnc_pid_file_path}") - try: - Path(self.vnc_pid_file_path).expanduser().unlink() - except (PermissionError, FileNotFoundError, TypeError): - logger.warning(f"Could not remove PID file {self.vnc_pid_file_path}") - - def __str__(self): - dct = { - "apptainer_instance_info": str(self.apptainer_instance_info), - "apptainer_instance_prefix": str(self.app_config.apptainer_instance_prefix), - "vnc_port": self.vnc_port, - "vnc_log_file_path": str(self.vnc_log_file_path), - "vnc_pid_file_path": str(self.vnc_pid_file_path), - "job_id": str(self.job_id), - } - s = pprint.pformat(dct, indent=2, width=80) - return f"{self.__class__.__name__}:\n{s}" - - def __repr__(self): - return self.__str__() - - @staticmethod - def find_running_sessions(app_config: HyakVncConfig, job_id: Optional[int] = None) -> List["HyakVncSession"]: - """ - Finds all running sessions. - :param app_config: HyakVncConfig to use - :param job_id: job ID to search for - :return: list of running sessions - :raises LookupError: if no running sessions are found for a specific job - """ - outs = list() - try: - if job_id: - active_jobs = get_job_infos(jobs=[job_id]) - else: - active_jobs = get_job_infos() - except LookupError as e: - logger.debug(f"Could not find any running apptainer instances on job {job_id}") - return [] - - for job_info in active_jobs: - if job_info.job_name.startswith(app_config.job_prefix): - logger.debug(f"Found job {job_info.job_id} with name {job_info.job_name}") - running_instances = apptainer_instance_list(slurm_job_id=job_info.job_id) - if not running_instances: - logger.debug(f"Could not find any running apptainer instances on job {job_info.job_id}") - return outs - prefix = app_config.apptainer_instance_prefix + "-" + str(job_info.job_id) + "-" - for instance in running_instances: - if instance.name.startswith(prefix): - logger.debug(f"Found apptainer instance {instance.name} with pid {instance.pid}") - sesh = HyakVncSession(job_info.job_id, instance, app_config) - try: - sesh.parse_vnc_info() - except RuntimeError as e: - logger.debug(f"Could not parse VNC info for session {sesh}: {e}") - else: - if sesh.is_alive(): - logger.debug(f"Session {sesh} is alive") - outs.append(sesh) - else: - logger.debug(f"Session {sesh} not alive") - return outs - - -class ConnectionStringGenerator: - title = "Connection instructions" - - def __init__( - self, - login_node: str, - compute_node: str, - port_on_compute_node: int, - port_on_client: Optional[int] = None, - *args, - **kwargs, - ): - self.login_node = login_node - self.compute_node = compute_node - self.port_on_compute_node = port_on_compute_node - self.port_on_client = port_on_client or port_on_compute_node - - -class OpenSSHConnectionStringGenerator(ConnectionStringGenerator): - title = "OpenSSH-based clients" - - def __init__( - self, - login_node: str, - compute_node: str, - port_on_compute_node: int, - port_on_client: Optional[int] = None, - debug_connection: Optional[bool] = False, - fork_ssh: Optional[bool] = True, - strict_host_key_checking: Optional[bool] = False, - ): - super().__init__(login_node, compute_node, port_on_compute_node, port_on_client) - self.debug_connection = debug_connection - self.fork_ssh = fork_ssh - self.strict_host_key_checking = strict_host_key_checking - - def __str__(self): - cmdv = ["ssh"] - if self.debug_connection: - cmdv += ["-v"] - if self.fork_ssh: - cmdv += ["-f"] - else: - cmdv += ["-N"] - if self.strict_host_key_checking: - cmdv += ["-o", "StrictHostKeyChecking=no"] - - # Set up jump host: - cmdv += ["-J", self.login_node, self.compute_node] - - # Set up port forwarding: - cmdv += ["-L", f"{self.port_on_client}:localhost:{self.port_on_compute_node}"] - return " ".join(cmdv) - - -class LinuxConnectionStringGenerator(OpenSSHConnectionStringGenerator): - title = "Linux terminal (bash/zsh)" - - def __str__(self): - cmd = super().__str__() - return f"{cmd} sleep 10; vncviewer localhost:{self.port_on_client}" - - -class MacOsConnectionStringGenerator(OpenSSHConnectionStringGenerator): - title = "macOS Terminal.app (bash/zsh)" - - def __str__(self): - cmd = super().__str__() - apple_bundles = ["com.tigervnc.tigervnc", "com.realvnc.vncviewer"] - apple_cmds = [ - f"open -b {bundle} --args localhost:{self.port_on_client} 2>/dev/null" for bundle in apple_bundles - ] - apple_cmds += ["echo 'Cannot find an installed VNC viewer on macOS. Please install TigerVNC or RealVNC."] - apple_cmds_pasted = " || ".join(apple_cmds) - return f"{cmd} sleep 10; {apple_cmds_pasted}" - - -class ManualConnectionStringGenerator(ConnectionStringGenerator): - title = "Manual" - - def __str__(self): - out = f"Configure your SSH client to connect to the address '{self.compute_node}' through the \"jump host\" '{self.login_node}' with local port forwarding from port {self.port_on_client} on your machine ('localhost' or 127.0.0.1) to port {self.port_on_compute_node} on the remote host. In your VNC client, connect to 'localhost' or 127.0.0.1 on port {self.port_on_client}." - return out diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index fe5a9c4..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,33 +0,0 @@ -[project] -name = "hyakvnc" -version = "2.0" -description = "Create and manage VNC sessions on HYAK Klone." -authors = [ - { name = "Hansem Ro", email = "hansem7@uw.edu" }, - { name = "Altan Orhon", email = "altan@uw.edu" }, -] -dependencies = [] -requires-python = ">=3.6" -license = { text = "MIT" } -readme = "README.md" - -[project.optional-dependencies] -dev = [ "black", "flake8"] - -[build-system] -requires = ["pdm-pep517"] -build-backend = "pdm.pep517.api" - -[tool.setuptools.packages.find] -where = ["hyakvnc"] - -[project.scripts] -hyakvnc = "hyakvnc:__main__.main" - -[project.urls] -homepage = "https://github.com/uw-psych/hyakvnc" - -[tool.black] -line-length = 120 -target-version = ['py36'] -include = '\.pyi?$'