diff --git a/hyakvnc/ApptainerInstanceInfo.py b/hyakvnc/ApptainerInstanceInfo.py new file mode 100644 index 0000000..1ec6568 --- /dev/null +++ b/hyakvnc/ApptainerInstanceInfo.py @@ -0,0 +1,45 @@ +import base64 +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Union + + +@dataclass +class ApptainerInstanceInfo: + pid: int + ppid: int + name: str + user: str + image: str + userns: bool + cgroup: bool + ip: Optional[str] = None + logErrPath: Optional[str] = None + logOutPath: Optional[str] = None + checkpoint: Optional[str] = None + config: Optional[dict] = None + instance_path: Optional[Path] = None + instance_name: Optional[str] = None + + @staticmethod + def from_json(path: Union[str, Path], read_config: bool = False) -> "ApptainerInstanceInfo": + """ + Loads a ApptainerInstanceInfo from a JSON file. + :param path: path to JSON file + :param read_config: whether to read the config from the JSON file + :return: ApptainerInstanceInfo loaded from JSON file + """ + path = Path(path).expanduser() + if not path.is_file(): + raise ValueError(f"Invalid path to instance file: {path}") + + with open(path, "r") as f: + contents = json.load(f) + if not read_config: + contents.pop("config", None) + else: + contents['config'] = json.loads(base64.b64decode(contents['config']).decode('utf-8')) + contents['instance_path'] = path + contents['instance_name'] = path.stem + return ApptainerInstanceInfo(**contents) diff --git a/hyakvnc/HyakVncInstance.py b/hyakvnc/HyakVncInstance.py new file mode 100644 index 0000000..d3a76d9 --- /dev/null +++ b/hyakvnc/HyakVncInstance.py @@ -0,0 +1,154 @@ +import logging +import os +import re +from pathlib import Path +from typing import Optional, Union + +from .ApptainerInstanceInfo import ApptainerInstanceInfo +from .slurmutil import get_job, cancel_job +from .util import check_remote_pid_exists_and_port_open, check_remote_pid_exists, check_remote_port_open + + +class HyakVncInstance: + def __init__(self, apptainer_instance_info: ApptainerInstanceInfo, instance_prefix: str = None, + apptainer_config_dir: Optional[Union[str, Path]] = None): + self.apptainer_instance_info = apptainer_instance_info + apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer") + self.apptainer_config_dir = Path(apptainer_config_dir).expanduser() + self.vnc_port = None + self.vnc_log_file_path = None + self.vnc_pid_file_path = None + self.instance_prefix = instance_prefix + self.job_id = None + + app_dir = self.apptainer_config_dir / 'instances' / 'app' + assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}" + + self.compute_node = self.apptainer_instance_info.instance_path.relative_to(app_dir).parts[0] + try: + name_meta = re.match(rf'(?P{instance_prefix})-(?P\d+)-(?P.*)', + self.apptainer_instance_info.instance_name).groupdict() + except AttributeError: + raise ValueError(f"Could not parse instance name from {self.apptainer_instance_info.instance_name}") + + try: + self.job_id = int(name_meta['jobid']) + except (ValueError, IndexError, TypeError): + raise ValueError(f"Could not parse jobid from {self.apptainer_instance_info.instance_name}") + + logOutPath = self.apptainer_instance_info.logOutPath + if not logOutPath: + logging.warning("No logOutPath for apptainer instance") + return + logOutPath = Path(logOutPath).expanduser() + if not logOutPath.is_file(): + logging.warning(f"Could not find log file at {logOutPath}") + return + + with open(logOutPath, 'r') as lf: + logOutFile_contents = lf.read() + rfbports = re.findall(r'\s+-rfbport\s+(?P\d+)\b', logOutFile_contents) + try: + vnc_port = int(rfbports[-1]) + except (ValueError, IndexError, TypeError): + logging.warning(f"Could not parse VNC port from log file at {logOutPath}") + return + self.vnc_port = vnc_port + + vnc_log_file_paths = re.findall( + rf'(?m)Log file is\s*(?P.*/{self.compute_node}.*:{self.vnc_port}\.log)$', + logOutFile_contents) + + try: + vnc_log_file_path = Path(vnc_log_file_paths[-1]).expanduser() + except (ValueError, IndexError, TypeError): + logging.warning(f"Could not parse VNC log file path from log file at {logOutPath}") + return + if not vnc_log_file_path.is_file(): + logging.debug(f"Could not find vnc log file at {vnc_log_file_path}") + return + self.vnc_log_file_path = vnc_log_file_path + vnc_pid_file_path = self.vnc_log_file_path.parent / (str(self.vnc_log_file_path.stem) + '.pid') + if not vnc_pid_file_path.is_file(): + logging.debug(f"Could not find vnc PID file at {vnc_pid_file_path}") + return + self.vnc_pid_file_path = vnc_pid_file_path + return + + def vnc_pid_file_exists(self): + return self.vnc_pid_file_path.is_file() + + def is_alive(self): + return self.vnc_pid_file_exists() and check_remote_pid_exists_and_port_open(self.compute_node, + self.apptainer_instance_info.pid, + self.vnc_port) + + def instance_is_running(self): + return check_remote_pid_exists(self.compute_node, self.apptainer_instance_info.pid) + + def port_is_open(self): + return check_remote_port_open(self.compute_node, self.vnc_port) + + def get_openssh_connection_string(self, login_host: str, port_on_client: Optional[int] = None, + debug_connection: Optional[bool] = False, + apple_rdp: Optional[bool] = False) -> str: + port_on_node = self.vnc_port + assert port_on_node is not None, "Could not find VNC port" + compute_node = self.compute_node + assert compute_node is not None, "Could not find compute node" + port_on_client = port_on_client or port_on_node + assert port_on_client is not None, "Could not determine a port to open on the client" + s_base = f"ssh -v -f -o StrictHostKeyChecking=no -J {login_host} {compute_node} -L {port_on_client}:localhost:{port_on_node}" + s = f"{s_base} sleep 10; vncviewer localhost:{port_on_client}" if not apple_rdp else s = f"{s_base} sleep 10; open rdp://localhost:{port_on_client}" + return s + + def cancel(self): + cancel_job(self.job_id) + + @staticmethod + def load_instance(instance_prefix: str, instance_name: Optional[str] = None, + path: Optional[Union[str, Path]] = None, read_apptainer_config: Optional[bool] = False, + apptainer_config_dir: Optional[Union[str, Path]] = None) -> Union["HyakVncInstance", None]: + assert ((instance_name is not None) ^ (path is not None)), "Must specify either instance name or path" + if instance_name: + apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer").expanduser() + path = Path( + apptainer_config_dir).expanduser() / 'instances' / 'app' / instance_name / f"{instance_name}.json" + else: + apptainer_config_dir = apptainer_config_dir or Path(path).expanduser().parent.parent.parent + + assert apptainer_config_dir.is_dir(), f"Could not find apptainer config dir at {apptainer_config_dir}" + app_dir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app' + assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}" + path = Path(path).expanduser() + + assert path.is_file(), f"Could not find apptainer instance file at {path}" + + apptainer_instance_info = ApptainerInstanceInfo.from_json(path, read_config=read_apptainer_config) + hyakvnc_instance = HyakVncInstance(apptainer_instance_info=apptainer_instance_info, + instance_prefix=instance_prefix, apptainer_config_dir=apptainer_config_dir) + return hyakvnc_instance + + @staticmethod + def find_running_instances(instance_prefix: str, apptainer_config_dir: Optional[Union[str, Path]] = None, + user: str = os.getlogin()) -> list["HyakVncInstance"]: + apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer").expanduser() + app_dir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app' + assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}" + + active_jobs = get_job() + outs = [] + active_compute_nodes = set([node for nodes in [job.node_list for job in active_jobs] for node in nodes]) + compute_directories = [(Path(app_dir) / node / user) for node in active_compute_nodes] + all_instance_files = set( + [f for fs in [p.rglob(instance_prefix + '*.json') for p in compute_directories] for f in fs]) + vnc_instance_files = set([p for p in all_instance_files if re.match(rf"^{instance_prefix}-\d+", p.name)]) + for p in vnc_instance_files: + instance_info = ApptainerInstanceInfo.from_json(p) + instance = HyakVncInstance(instance_info, instance_prefix=instance_prefix, + apptainer_config_dir=apptainer_config_dir) + if instance.is_alive(): + outs.append(instance) + else: + logging.debug(f"Found instance {instance.apptainer_instance_info.instance_name} but it is not alive") + return outs diff --git a/hyakvnc/__main__.py b/hyakvnc/__main__.py index cb278bf..a938a53 100644 --- a/hyakvnc/__main__.py +++ b/hyakvnc/__main__.py @@ -2,108 +2,30 @@ # -*- coding: utf-8 -*- import argparse -import base64 -import json import logging import os +import pprint import re import shlex import subprocess import time from dataclasses import asdict +from datetime import datetime from pathlib import Path from typing import Optional, Union -import pprint + +from .HyakVncInstance import HyakVncInstance from .config import HyakVncConfig -from .slurmutil import wait_for_job_status, get_job, SlurmJob, get_job_status -from .util import check_remote_pid_exists_and_port_open, wait_for_file +from .slurmutil import wait_for_job_status, get_job, get_historical_job, cancel_job +from .util import wait_for_file, repeat_until from .version import VERSION - app_config = HyakVncConfig() +app_started = datetime.now() +app_job_ids = [] -def get_apptainer_vnc_instances(read_apptainer_config: bool = False): - app_dir = Path(app_config.apptainer_config_dir).expanduser() / 'instances' / 'app' - assert app_dir.exists(), f"Could not find apptainer instances dir at {app_dir}" - - needed_keys = {'pid', 'user', 'name', 'image'} - if read_apptainer_config: - needed_keys.add('config') - - all_instance_json_files = app_dir.rglob(app_config.apptainer_instance_prefix + '*.json') - - running_hyakvnc_json_files = {p: r.groupdict() for p in all_instance_json_files if ( - r := re.match(rf'(?P{app_config.apptainer_instance_prefix})-(?P\d+)-(?P.*)\.json', - p.name))} - outs = [] - # frr := re.search(r'\s+-rfbport\s+(?P\d+\b', fr) - - for p, name_meta in running_hyakvnc_json_files.items(): - with open(p, 'r') as f: - d = json.load(f) - assert needed_keys <= d.keys(), f"Missing keys {needed_keys - d.keys()} in {d}" - - logOutPath = Path(d['logOutPath']).expanduser() - if not logOutPath.exists(): - continue - - if not read_apptainer_config: - d.pop("config", None) - else: - d['config'] = json.loads(base64.b64decode(d['config']).decode('utf-8')) - d['slurm_compute_node'] = p.relative_to(app_dir).parts[0] - d['slurm_job_id'] = name_meta['jobid'] - - with open(logOutPath, 'r') as lf: - logOutFile_contents = lf.read() - rfbports = re.findall(r'\s+-rfbport\s+(?P\d+)\b', logOutFile_contents) - if not rfbports: - continue - - vnc_port = rfbports[-1] - - vnc_log_file_paths = re.findall( - rf'(?m)Log file is\s*(?P.*/{d["slurm_compute_node"]}.*:{vnc_port}\.log)$', - logOutFile_contents) - if not vnc_log_file_paths: - continue - vnc_log_file_path = Path(vnc_log_file_paths[-1]) - if not vnc_log_file_path.exists(): - logging.debug(f"Could not find vnc log file at {vnc_log_file_path}") - continue - - vnc_pid_file_path = Path(str(vnc_log_file_path).rstrip(".log") + '.pid') - if not vnc_pid_file_path.exists(): - logging.debug(f"Could not find vnc pid file at {vnc_pid_file_path}") - continue - - d['vnc_log_file_path'] = vnc_log_file_path - d['vnc_pid_file_path'] = vnc_pid_file_path - d['vnc_port'] = vnc_port - - logging.debug( - f"Checking port open on {d['slurm_compute_node']}:{vnc_port} for apptainer instance file {p}") - - if not check_remote_pid_exists_and_port_open(d['slurm_compute_node'], d['pid'], vnc_port): - logging.debug( - f"Could not find open port running on node {d['slurm_compute_node']}:{vnc_port} for apptainer instance file {p}") - continue - - outs.append(d) - return outs - - -def get_openssh_connection_string_for_instance(instance: dict, login_host: str, - port_on_client: Optional[int] = None) -> str: - port_on_node = instance["vnc_port"] - compute_node = instance["slurm_compute_node"] - port_on_client = port_on_client or port_on_node - s = f"ssh -v -f -o StrictHostKeyChecking=no -J {login_host} {compute_node} -L {port_on_client}:localhost:{port_on_node} sleep 10; vncviewer localhost:{port_on_client}" - return s - - -def cmd_create(container_path: Union[str, Path], dry_run=False) -> SlurmJob: +def cmd_create(container_path: Union[str, Path], dry_run=False) -> Union[HyakVncInstance, None]: """ Allocates a compute node, starts a container, and launches a VNC session on it. :param container_path: Path to container to run @@ -125,22 +47,30 @@ def cmd_create(container_path: Union[str, Path], dry_run=False) -> SlurmJob: cmds = ["sbatch", "--parsable", "--job-name", app_config.job_prefix + '-' + container_name] + cmds += ["--output", app_config.sbatch_output_path] + sbatch_optinfo = {"account": "-A", "partition": "-p", "gpus": "-G", "timelimit": "--time", "mem": "--mem", "cpus": "-c"} - sbatch_options = [str(item )for pair in [(sbatch_optinfo[k], v) for k, v in asdict(app_config).items() if - k in sbatch_optinfo.keys() and v is not None] for item in pair] + sbatch_options = [str(item) for pair in [(sbatch_optinfo[k], v) for k, v in asdict(app_config).items() if + k in sbatch_optinfo.keys() and v is not None] for item in pair] cmds += sbatch_options + # Set up the environment variables to pass to Apptainer: apptainer_env_vars_quoted = [f"{k}={shlex.quote(v)}" for k, v in app_config.apptainer_env_vars.items()] apptainer_env_vars_string = "" if not apptainer_env_vars_quoted else (" ".join(apptainer_env_vars_quoted) + " ") - # needs to match rf'(?P{app_config.apptainer_instance_prefix})(?P\d+)-(?P.*)'): + # Template to name the apptainer instance: apptainer_instance_name = f"{app_config.apptainer_instance_prefix}-$SLURM_JOB_ID-{container_name}" + # Command to start the apptainer instance: apptainer_cmd = f"apptainer instance start {container_path} {apptainer_instance_name}" + + # Command to start the apptainer instance and keep it running: apptainer_cmd_with_rest = apptainer_env_vars_string + f"{apptainer_cmd} && while true; do sleep 10; done" - cmds += ["--wrap",apptainer_cmd_with_rest] + + # The sbatch wrap functionality allows submitting commands without an sbatch script:t + cmds += ["--wrap", apptainer_cmd_with_rest] # Launch sbatch process: logging.info("Launching sbatch process with command:\n" + repr(cmds)) @@ -157,7 +87,8 @@ def cmd_create(container_path: Union[str, Path], dry_run=False) -> SlurmJob: raise RuntimeError(f"No sbatch output") try: - job_id = int(res.stdout.strip().split(":")[0]) + job_id = int(res.stdout.strip().split(";")[0]) + app_job_ids.append(job_id) except (ValueError, IndexError, TypeError): raise RuntimeError(f"Could not parse jobid from sbatch output: {res.stdout}") @@ -168,45 +99,69 @@ def cmd_create(container_path: Union[str, Path], dry_run=False) -> SlurmJob: wait_for_job_status(job_id, states=["RUNNING"], timeout=app_config.sbatch_post_timeout, poll_interval=app_config.sbatch_post_poll_interval) except TimeoutError: - raise TimeoutError(f"Job {job_id} did not start running within {app_config.sbatch_post_timeout} seconds") - - job = get_job(job_id) + job = get_historical_job(job_id=job_id) + state = "unknown" + if job and len(job) > 0: + job = job[0] + state = job.state + raise TimeoutError( + f"Job {job_id} did not start running within {app_config.sbatch_post_timeout} seconds. Last state was {state}") + + job = get_job(jobs=job_id) if not job: - raise RuntimeError(f"Could not get job {job_id} after it started running") + job = get_historical_job(job_id=job_id) + state = "unknown" + if job and len(job) > 0: + job = job[0] + state = job.state + raise RuntimeError(f"Job {job_id} is not running. Last state was {state}") logging.info(f"Job {job_id} is now running") - instance_file = '/mmfs1/home/altan/.apptainer/instances/a/g3071/altan/hyakvnc-14673571-ubuntu22.04_xubuntu.err' real_instance_name = f"{app_config.apptainer_instance_prefix}-{job.job_id}-{container_name}" - instance_file = (Path(app_config.apptainer_config_dir) / 'instances' / 'app' / job.node_list[0] / job.user_name / real_instance_name / f"{real_instance_name}.json").expanduser() + instance_file = (Path(app_config.apptainer_config_dir) / 'instances' / 'app' / job.node_list[ + 0] / job.user_name / real_instance_name / f"{real_instance_name}.json").expanduser() if wait_for_file(str(instance_file), timeout=app_config.sbatch_post_timeout): - time.sleep(10) # sleep to wait for apptainer to actually start vncserver - instances = { instance["name"]: instance for instance in get_apptainer_vnc_instances() } - if real_instance_name not in instances: - raise TimeoutError(f"Could not find VNC session for job {job_id}") - instance = instances[real_instance_name] - print(get_openssh_connection_string_for_instance(instance, app_config.ssh_host)) + time.sleep(10) # sleep to wait for apptainer to actually start vncserver + + instance = HyakVncInstance.load_instance(instance_prefix=app_config.apptainer_instance_prefix, + path=instance_file, read_apptainer_config=False) + if not repeat_until(lambda: instance.is_alive(), lambda alive: alive, timeout=app_config.sbatch_post_timeout): + logging.info("Could not find a running VNC session for the instance {instance}") + instance.cancel() + raise RuntimeError(f"Could not find a running VNC session for the instance {instance}") + else: + print("OpenSSH string for VNC session:") + print(" " + instance.get_openssh_connection_string(login_host=app_config.ssh_host, apple_rdp=False)) + print("OpenSSH string for VNC session using the built-in viewer on macOS:") + print(" " + instance.get_openssh_connection_string(login_host=app_config.ssh_host, apple_rdp=True)) + return instance else: - logging.info(f"Could not find VNC session for job {job_id}") - + logging.info(f"Could not find instance file at {instance_file} before timeout") + cancel_job(job_id) + logging.info(f"Canceled job {job_id} before timeout") + raise TimeoutError(f"Could not find instance file at {instance_file} before timeout") def cmd_stop(job_id: Optional[int] = None, stop_all: bool = False): + assert ((job_id is not None) ^ (stop_all)), "Must specify either a job id or stop all" if stop_all: - vnc_instances = get_apptainer_vnc_instances() - for vnc_instance in vnc_instances: - subprocess.run(["scancel", str(vnc_instance['slurm_job_id'])]) - return - - if job_id: - subprocess.run(["scancel", str(job_id)]) - return + vnc_instances = HyakVncInstance.find_running_instances(instance_prefix=app_config.apptainer_instance_prefix, + apptainer_config_dir=app_config.apptainer_config_dir) + for instance in vnc_instances: + instance.cancel() + print(f"Canceled job {instance.job_id}") + else: + cancel_job(job_id) + print(f"Canceled job {job_id}") def cmd_status(): - vnc_instances = get_apptainer_vnc_instances(read_apptainer_config=True) - pprint.pp(json.dumps(vnc_instances, indent=2)) + vnc_instances = HyakVncInstance.find_running_instances(instance_prefix=app_config.apptainer_instance_prefix, + apptainer_config_dir=app_config.apptainer_config_dir) + for instance in vnc_instances: + pprint.pp(instance, indent=2) def create_arg_parser(): @@ -270,7 +225,11 @@ def create_arg_parser(): exit(0) if args.command == 'create': - cmd_create(args.container, dry_run=args.dry_run) + try: + cmd_create(args.container, dry_run=args.dry_run) + except (TimeoutError, RuntimeError) as e: + logging.error(f"Error: {e}") + exit(1) exit(0) if args.command == 'status': diff --git a/hyakvnc/config.py b/hyakvnc/config.py index 536ae19..3975a61 100644 --- a/hyakvnc/config.py +++ b/hyakvnc/config.py @@ -7,6 +7,7 @@ from .slurmutil import get_default_cluster, get_default_account, get_default_partition + def get_first_env(env_vars: Iterable[str], default: Optional[str] = None, allow_blank: bool = False) -> str: """ Gets the first environment variable that is set, or the default value if none are set. @@ -43,6 +44,7 @@ class HyakVncConfig: apptainer_env_vars: Optional[dict[str]] = None # environment variables to set for apptainer instances sbatch_post_timeout: float = 120.0 # timeout for waiting for sbatch to return sbatch_post_poll_interval: float = 1.0 # poll interval for waiting for sbatch to return + sbatch_output_path: Optional[str] = None # path to write sbatch output to # ssh config ssh_host = "klone.hyak.uw.edu" # intermediate host address between local machine and compute node @@ -61,17 +63,21 @@ def __post_init__(self): Post-initialization hook for HyakVncConfig. Sets default values for unset attributes. :return: None """ - self.cluster = self.cluster or get_first_env(["HYAKVNC_SLURM_CLUSTER", "SBATCH_CLUSTER"], default=get_default_cluster()) + self.cluster = self.cluster or get_first_env(["HYAKVNC_SLURM_CLUSTER", "SBATCH_CLUSTER"], + default=get_default_cluster()) self.account = self.account or get_first_env(["HYAKVNC_SLURM_ACCOUNT", "SBATCH_ACCOUNT"], get_default_account(cluster=self.cluster)) self.partition = self.partition or get_first_env(["HYAKVNC_SLURM_PARTITION", "SBATCH_PARTITION"], get_default_partition(cluster=self.cluster, - account=self.account)) + account=self.account)) self.gpus = self.gpus or get_first_env(["HYAKVNC_SLURM_GPUS", "SBATCH_GPUS"], None) self.timelimit = self.timelimit or get_first_env(["HYAKVNC_SLURM_TIMELIMIT", "SBATCH_TIMELIMIT"], None) self.mem = self.mem or get_first_env(["HYAKVNC_SLURM_MEM", "SBATCH_MEM"], None) self.cpus = int(self.cpus or get_first_env(["HYAKVNC_SLURM_CPUS", "SBATCH_CPUS_PER_TASK"])) + self.sbatch_output_path = self.sbatch_output_path or get_first_env( + ["HYAKVNC_SBATCH_OUTPUT_PATH", "SBATCH_OUTPUT"], "/dev/stdout") + self.apptainer_env_vars = self.apptainer_env_vars or dict() all_apptainer_env_vars = {x: os.environ.get(x, "") for x in os.environ.keys() if x.startswith("APPTAINER_") or x.startswith("APPTAINERENV_") or x.startswith( diff --git a/hyakvnc/slurmutil.py b/hyakvnc/slurmutil.py index 3fa4d32..96bb31a 100755 --- a/hyakvnc/slurmutil.py +++ b/hyakvnc/slurmutil.py @@ -4,7 +4,8 @@ import subprocess import time from dataclasses import dataclass, fields, field -from typing import Optional, Union, Container +from datetime import datetime, timedelta +from typing import Optional, Union def get_default_cluster() -> str: @@ -100,22 +101,22 @@ def node_range_to_list(s: str) -> list[str]: @dataclass class SlurmJob: - job_id: int = field(metadata={"squeue_field": "%i"}) - job_name: str = field(metadata={"squeue_field": "%j"}) - account: str = field(metadata={"squeue_field": "%a"}) - partition: str = field(metadata={"squeue_field": "%P"}) - user_name: str = field(metadata={"squeue_field": "%u"}) - state: str = field(metadata={"squeue_field": "%T"}) - time_used: str = field(metadata={"squeue_field": "%M"}) - time_limit: str = field(metadata={"squeue_field": "%l"}) - cpus: int = field(metadata={"squeue_field": "%C"}) - min_memory: str = field(metadata={"squeue_field": "%m"}) - num_nodes: int = field(metadata={"squeue_field": "%D"}) - node_list: str = field(metadata={"squeue_field": "%N"}) - command: str = field(metadata={"squeue_field": "%o"}) + job_id: int = field(metadata={"squeue_field": "%i", "sacct_field": "JobID"}) + job_name: str = field(metadata={"squeue_field": "%j", "sacct_field": "JobName"}) + account: str = field(metadata={"squeue_field": "%a", "sacct_field": "Account"}) + partition: str = field(metadata={"squeue_field": "%P", "sacct_field": "Partition"}) + user_name: str = field(metadata={"squeue_field": "%u", "sacct_field": "User"}) + state: str = field(metadata={"squeue_field": "%T", "sacct_field": "State"}) + time_used: str = field(metadata={"squeue_field": "%M", "sacct_field": "Elapsed"}) + time_limit: str = field(metadata={"squeue_field": "%l", "sacct_field": "Timelimit"}) + cpus: int = field(metadata={"squeue_field": "%C", "sacct_field": "AllocCPUS"}) + min_memory: str = field(metadata={"squeue_field": "%m", "sacct_field": "ReqMem"}) + num_nodes: int = field(metadata={"squeue_field": "%D", "sacct_field": "NNodes"}) + node_list: str = field(metadata={"squeue_field": "%N", "sacct_field": "NodeList"}) + command: str = field(metadata={"squeue_field": "%o", "sacct_field": "SubmitLine"}) @staticmethod - def from_squeue_line(line: str, field_order=None) -> "SlurmJob": + def from_squeue_line(line: str, field_order=None, delimiter: Optional[str] = None) -> "SlurmJob": """ Creates a SlurmJob from an squeue command :param line: output line from squeue command @@ -126,9 +127,18 @@ def from_squeue_line(line: str, field_order=None) -> "SlurmJob": valid_field_names = [x.name for x in fields(SlurmJob)] if field_order is None: field_order = valid_field_names - all_fields_dict = {field_order[i]: x for i, x in enumerate(line.split())} + + if delimiter is None: + all_fields_dict = {field_order[i]: x for i, x in enumerate(line.split())} + else: + all_fields_dict = {field_order[i]: x for i, x in enumerate(line.split(delimiter))} + field_dict = {k: v for k, v in all_fields_dict.items() if k in valid_field_names} + try: + field_dict["job_id"] = int(field_dict["job_id"]) + except (ValueError, TypeError, KeyError): + field_dict["job_id"] = None try: field_dict["num_nodes"] = int(field_dict["num_nodes"]) except (ValueError, TypeError, KeyError): @@ -226,3 +236,72 @@ def wait_for_job_status(job_id: int, states: list[str], timeout: Optional[float] return res time.sleep(poll_interval) raise TimeoutError(f"Timed out waiting for job {job_id} to be in one of the following states: {states}") + + +def get_historical_job(after: Optional[Union[datetime, timedelta]] = None, + before: Optional[Union[datetime, timedelta]] = None, job_id: Optional[int] = None, + user: Optional[str] = os.getlogin(), + cluster: Optional[str] = None) -> list[SlurmJob]: + """ + Gets the slurm jobs since the specified time. + :param after: Time after which to get jobs + :param before: Time before which to get jobs + :param job_id: Job id to get + :param user: User to get jobs for + :param cluster: Cluster to get jobs for + :return: the slurm jobs since the specified time as a list of SlurmJobs + """ + now = datetime.now() + assert isinstance(after, (datetime, timedelta, type(None))), "after must be a datetime or timedelta or None" + assert isinstance(before, (datetime, timedelta, type(None))), "before must be a datetime or timedelta or None" + + after_abs = now - after if isinstance(after, timedelta) else after + before_abs = now - before if isinstance(before, timedelta) else before + + cmds: list[str] = ['sacct', '--noheader', '-X', '--parsable2'] + if user: + cmds += ['--user', user] + if cluster: + cmds += ['--clusters', cluster] + if after_abs: + cmds += ["--starttime", after_abs.isoformat(timespec="seconds")] + if before_abs: + cmds += ["--endtime", before_abs.isoformat(timespec="seconds")] + if job_id: + cmds += ["--jobs", str(job_id)] + + sacct_format_fields = ",".join([f.metadata.get("sacct_field", "") for f in fields(SlurmJob)]) + cmds += ['--format', sacct_format_fields] + res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) + if res.returncode != 0: + raise ValueError(f"Could not get slurm jobs via `sacct`:\n{res.stderr}") + + jobs = [SlurmJob.from_squeue_line(line, delimiter="|") for x in res.stdout.splitlines() if (line := x.strip())] + return jobs + + +def cancel_job(jobs: Optional[Union[int, list[int]]] = None, + user: Optional[str] = os.getlogin(), + cluster: Optional[str] = None + ): + """ + Cancels the specified jobs. + :param jobs: Jobs to cancel + :param user: User to cancel jobs for + :param cluster: Cluster to cancel jobs for + :return: None + """ + assert jobs or user or cluster, "Must specify at least one of jobs, user, or cluster" + cmds = ["scancel"] + if user: + cmds += ['--user', user] + if cluster: + cmds += ['--clusters', cluster] + if jobs: + if isinstance(jobs, int): + jobs = [jobs] + cmds += [str(x) for x in jobs] + + res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) + if res.returncode != 0: + raise ValueError(f"Could not cancel jobs {jobs}:\n{res.stderr}") diff --git a/hyakvnc/util.py b/hyakvnc/util.py index 913260e..78d5cb0 100644 --- a/hyakvnc/util.py +++ b/hyakvnc/util.py @@ -1,8 +1,10 @@ -import time -from typing import Callable, Optional, Union import logging -from pathlib import Path import subprocess +import time +from pathlib import Path +from typing import Callable, Optional, Union + + def repeat_until(func: Callable, condition: Callable[[int], bool], timeout: Optional[float] = None, poll_interval: float = 1.0): begin_time = time.time() @@ -18,7 +20,7 @@ def repeat_until(func: Callable, condition: Callable[[int], bool], timeout: Opti def wait_for_file(path: Union[Path, str], timeout: Optional[float] = None, - poll_interval: float = 1.0): + poll_interval: float = 1.0): """ Waits for the specified file to be present. """ @@ -26,7 +28,20 @@ def wait_for_file(path: Union[Path, str], timeout: Optional[float] = None, logging.debug(f"Waiting for file `{path}` to exist") return repeat_until(lambda: path.exists(), lambda exists: exists, timeout=timeout, poll_interval=poll_interval) + def check_remote_pid_exists_and_port_open(host: str, pid: int, port: int) -> bool: cmd = f"ssh {host} ps -p {pid} && nc -z localhost {port}".split() res = subprocess.run(cmd, shell=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return res.returncode == 0 + + +def check_remote_pid_exists(host: str, pid: int) -> bool: + cmd = f"ssh {host} ps -p {pid}".split() + res = subprocess.run(cmd, shell=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return res.returncode == 0 + + +def check_remote_port_open(host: str, port: int) -> bool: + cmd = f"ssh {host} nc -z localhost {port}".split() + res = subprocess.run(cmd, shell=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return res.returncode == 0