Skip to content

Commit

Permalink
saving work on script
Browse files Browse the repository at this point in the history
  • Loading branch information
maouw committed Sep 20, 2023
1 parent e4874d3 commit 949762c
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 201 deletions.
272 changes: 169 additions & 103 deletions hyakvnc/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,90 +9,80 @@
import re
import logging
import subprocess
from copy import deepcopy
from pprint import pformat
import shlex
import time
import tenacity
import argparse
from dataclasses import dataclass, asdict

from slurmutil import get_slurm_cluster, get_slurm_partitions, get_slurm_default_account, get_slurm_job_details

# Base VNC port cannot be changed due to vncserver not having a stable argument
# interface:
BASE_VNC_PORT = os.environ.setdefault("HYAKVNC_BASE_VNC_PORT", "5900")

# List of Klone login node hostnames
LOGIN_NODE_LIST = os.environ.get("HYAKVNC_LOGIN_NODES", "klone-login01,klone1.hyak.uw.edu,klone2.hyak.uw.edu").split(
",")
from .slurmutil import get_default_cluster, get_default_account, get_partitions, wait_for_job_status
from .util import repeat_until, wait_for_file, check_remote_pid_exists_and_port_open

# Name of Apptainer binary (formerly Singularity)
APPTAINER_BIN = os.environ.setdefault("HYAKVNC_APPTAINER_BIN", "apptainer")

# Checked to see if klone is authorized for intracluster access
AUTH_KEYS_FILEPATH = Path(os.environ.setdefault("HYAKVNC_AUTH_KEYS_FILEPATH", "~/.ssh/authorized_keys")).expanduser()

# Apptainer bindpaths can be overwritten if $APPTAINER_BINDPATH is defined.
# Bindpaths are used to mount storage paths to containerized environment.
APPTAINER_BINDPATH = os.environ.setdefault("APPTAINER_BINDPATH",
os.environ.get("HYAKVNC_APPTAINER_BINDPATH",
os.environ.get("SINGULARITY_BINDPATH",
"/tmp,$HOME,$PWD,/gscratch,/opt,/:/hyak_root,/sw,/mmfs1")))

APPTAINER_CONFIGDIR = Path(os.getenv("APPTAINER_CONFIGDIR", "~/.apptainer")).expanduser()
APPTAINER_INSTANCES_DIR = APPTAINER_CONFIGDIR / "instances"

# # SLURM UTILS

# Slurm configuration variables:
SLURM_CLUSTER = os.getenv("HYAKVNC_SLURM_CLUSTER", os.getenv("SBATCH_CLUSTERS", get_slurm_cluster()).split(",")[0])
SLURM_ACCOUNT = os.environ.get("HYAKVNC_SLURM_ACCOUNT", os.environ.setdefault("SBATCH_ACCOUNT",
get_slurm_default_account(
cluster=SLURM_CLUSTER)))
SLURM_GPUS = os.environ.setdefault("SBATCH_GPUS", "0")
SLURM_CPUS_PER_TASK = os.environ.setdefault("HYAKVNC_SLURM_CPUS_PER_TASK", "1")
SBATCH_GPUS = os.environ.setdefault("SBATCH_GPUS", "0")
SBATCH_TIMELIMIT = os.environ.setdefault("SBATCH_TIMELIMIT", "1:00:00")

HYAKVNC_SLURM_JOBNAME_PREFIX = os.getenv("HYAKVNC_SLURM_JOBNAME_PREFIX", "hyakvnc-")
HYAKVNC_APPTAINER_INSTANCE_PREFIX = os.getenv("HYAKVNC_APPTAINER_INSTANCE_PREFIX", HYAKVNC_APPTAINER_INSTANCE_PREFIX + "vncserver-")


SBATCH_CLUSTERS = os.environ.setdefault("SBATCH_CLUSTERS", SLURM_CLUSTER)
@dataclass
class HyakVncConfig:
# script attributes
job_prefix: str = "hyakvnc-"
# apptainer config
apptainer_config_dir: str = "~/.apptainer"
apptainer_instance_prefix: str = "hyakvnc-"
apptainer_env_vars: Optional[dict] = None

found_sbatch_partitions = get_slurm_partitions(account=SBATCH_ACCOUNT, cluster=SBATCH_CLUSTERS)
if found_sbatch_partitions:
HYAKVNC_SLURM_PARTITION = os.environ.get("HYAKVNC_SLURM_PARTITION", os.environ.setdefault("SBATCH_ACCOUNT",
get_slurm_default_account(
cluster=SLURM_CLUSTER)))
sbatch_post_timeout: float = 120.0
sbatch_post_poll_interval: float = 1.0

SB
# ssh config
ssh_host = "klone.hyak.uw.edu"

if any(SBATCH_PARTITION := x for x in get_slurm_partitions(account=SBATCH_ACCOUNT, cluster=SBATCH_CLUSTERS)):
os.environ.setdefault("SBATCH_PARTITION", SBATCH_PARTITION)
# slurm attributes
## sbatch environment variables
account: Optional[str] = None # -a, --account
partition: Optional[str] = None # -p, --partition
cluster: Optional[str] = None # --clusters, SBATCH_CLUSTERS
gpus: Optional[str] = None # -G, --gpus, SBATCH_GPUS
timelimit: Optional[str] = None # -t, --time, SBATCH_TIMELIMIT
mem: Optional[str] = None # --mem, SBATCH_MEM
cpus: Optional[int] = None # -c, --cpus-per-task (not settable by environment variable)

SBATCH_GPUS = os.environ.setdefault("SBATCH_GPUS", "0")
SBATCH_TIMELIMIT = os.environ.setdefault("SBATCH_TIMELIMIT", "1:00:00")
SBATCH_MEM = os.environ.setdefault("SBATCH_MEM", "8G")
def to_json(self):
return json.dumps({k: v for k, v in asdict(self).items() if v is not None})
@staticmethod
def from_json(path):
if not Path(path).is_file():
raise ValueError(f"Invalid path to configuration file: {path}")

HYAKVNC_SLURM_JOBNAME_PREFIX = os.getenv("HYAKVNC_SLURM_JOBNAME_PREFIX", "hyakvnc-")
HYAKVNC_APPTAINER_INSTANCE_PREFIX = os.getenv("HYAKVNC_APPTAINER_INSTANCE_PREFIX", "hyakvnc-vncserver-")
with open(path, "r") as f:
contents = json.load(f)
return HyakVncConfig(**contents)
@staticmethod
def from_jsons(s: str):
return HyakVncConfig(**json.loads(s))

def check_remote_pid_exists_and_port_open(host: str, pid: int, port: int) -> bool:
cmd = f"ssh {host} ps -p {pid} && nc -z localhost {port}".split()
res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return res.returncode == 0


def get_apptainer_vnc_instances(apptainer_config_dir="~/.apptainer", instance_prefix: str ="hyakvnc-",
read_apptainer_config: bool = False):
def get_apptainer_vnc_instances(cfg: HyakVncConfig, read_apptainer_config: bool = False):
appdir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app'
assert appdir.exists(), f"Could not find apptainer instances dir at {appdir}"

needed_keys = {'pid', 'user', 'name', 'image', }
if read_apptainer_config:
needed_keys.add('config')

all_instance_json_files = appdir.rglob(instance_prefix + '*.json')
all_instance_json_files = appdir.rglob(cfg.apptainer_instance_prefix + '*.json')

running_hyakvnc_json_files = {p: r.groupdict() for p in all_instance_json_files if (
r := re.match(rf'(?P<prefix>{instance_prefix})(?P<jobid>\d+)-(?P<appinstance>.*)\.json', p.name))
r := re.match(rf'(?P<prefix>{cfg.apptainer_instance_prefix})(?P<jobid>\d+)-(?P<appinstance>.*)\.json', p.name))
}
outs = []
# frr := re.search(r'\s+-rfbport\s+(?P<rfbport>\d+\b', fr)
Expand Down Expand Up @@ -163,19 +153,7 @@ def get_openssh_connection_string_for_instance(instance: dict, login_host: str,




APPTAINER_WRITABLE_TMPFS = os.environ.setdefault("APPTAINER_WRITABLE_TMPFS", "1")
APPTAINER_CLEANENV = os.environ.setdefault("APPTAINER_CLEANENV", "1")

def get_slurm_job_status(jobid: int):
cmd = ["squeue", "-j", str(jobid), "-ho", "%T"]
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
if res.returncode != 0:
raise ValueError(f"Could not get status for job {jobid}:\n{res.stderr}")
return res.stdout.strip()

def create_job_with_container(container_path: str, cpus_per_task: Optional[int] = None, squeue_poll_interval: int = 3):

def create_job_with_container(container_path, cfg: HyakVncConfig):
if re.match(r"(?P<container_type>library|docker|shub|oras)://(?P<container_path>.*)", container_path):
container_name = Path(container_path).stem

Expand All @@ -185,53 +163,141 @@ def create_job_with_container(container_path: str, cpus_per_task: Optional[int]
assert container_path.exists(), f"Could not find container at {container_path}"


cmds = ["sbatch", "--parsable"]
if cpus_per_task:
cmds += ["-c", str(cpus_per_task)]
cmds = ["sbatch", "--parsable", "--job-name", cfg.job_prefix + container_name]

sbatch_optinfo = {"account": "-A", "partition": "-p", "gpus": "-G", "timelimit": "--time", "mem": "--mem", "cpus": "-c"}
sbatch_options = [item for pair in [(sbatch_optinfo[k], v) for k, v in asdict(cfg).items() if k in sbatch_optinfo.keys() and v is not None]
for item in pair]

os.environ["SBATCH_JOB_NAME"] = f"{HYAKVNC_SLURM_JOBNAME_PREFIX}{container_name}"
cmds += sbatch_options

# Set up apptainer variables and command:
APPTAINER_CLEANENV = os.environ.setdefault("APPTAINER_CLEANENV", "1")
APPTAINER_WRITABLE_TMPFS = os.environ.setdefault("APPTAINER_WRITABLE_TMPFS", "1")
apptainer_env_vars = {k: v for k, v in os.environ.items() if k.startswith("APPTAINER_") or k.startswith("SINGULARITY_") or k.startswith("SINGULARITYENV_") or k.startswith("APPTAINERENV_")}
apptainer_env_vars_str = [ f"{k}={shlex.quote(v)}" for k, v in apptainer_env_vars.items()]
apptainer_cmd = f"{apptainer_env_vars_str} apptainer instance start --cleanenv --writable-tmpfs {container_path} && while true; do sleep 10; done"

apptainer_cmd = f"{apptainer_env_vars_str} apptainer instance start {container_path} && while true; do sleep 10; done"
cmds += ["--wrap", apptainer_cmd]



# Launch sbatch process:
res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
if res.returncode != 0:
raise ValueError(f"Could not create job with container {container_path}:\n{res.stderr}")
logging.info("Launching sbatch process with command:\n" + " ".join(cmds))
res = subprocess.run(cmds, stdout=subprocess.PIPE, stderr=subprocess.hPIPE, encoding="utf-8")

try:
jobid = res.stdout.strip().split(":")
except (ValueError, IndexError):
job_id = int(res.stdout.strip().split(":")
except (ValueError, IndexError, TypeError):
raise RuntimeError(f"Could not parse jobid from sbatch output: {res.stdout}")

while True:
job_status = get_slurm_job_status(jobid)
if job_status == "RUNNING":
vnc_instances = get_apptainer_vnc_instances()
inst_name = f"{HYAKVNC_APPTAINER_INSTANCE_PREFIX}{container_name}"
vinst = [ x for x in vnc_instances if x['name'] == f"{HYAKVNC_APPTAINER_INSTANCE_PREFIX}{container_name}"]

elif job_status in [ "CONFIGURING",
"PENDING",
"RESV_DEL_HOLD",
"REQUEUE_FED",
"REQUEUE_HOLD",
"REQUEUED",
"RESIZING",
"SIGNALING"]:
logging.debug(f"Job {jobid} is {job_status} - waiting {squeue_poll_interval} seconds")
time.sleep(squeue_poll_interval)
else:
raise RuntimeError(f"Job {jobid} unable to grant launch request - status is {job_status}")




res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return res.returncode == 0
s = wait_for_job_status(job_id, states= { "RUNNING" }, timeout=cfg.sbatch_post_timeout, cfg.sbatch_post_poll_interval)
if not s:
raise RuntimeError(f"Job {job_id} did not start running within {cfg.sbatch_post_timeout} seconds")






def kill(jobid: Optional[int] = None, all: bool = False):
if all:
vnc_instances = get_apptainer_vnc_instances()
for vnc_instance in vnc_instances:
subprocess.run(["scancel", str(vnc_instance['slurm_job_id'])])
return

if jobid:
subprocess.run(["scancel", str(jobid)])
return

raise ValueError("Must specify either --all or <jobid>")

def create_arg_parser():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='command')

# general arguments
parser.add_argument('-d', '--debug',
dest='debug',
action='store_true',
help='Enable debug logging')

parser.add_argument('-v', '--version',
dest='print_version',
action='store_true',
help='Print program version and exit')

# command: create
parser_create = subparsers.add_parser('create',
help='Create VNC session')
parser_create.add_argument('-p', '--partition',
dest='partition',
metavar='<partition>',
help='Slurm partition',
type=str)
parser_create.add_argument('-A', '--account',
dest='account',
metavar='<account>',
help='Slurm account',
type=str)
parser_create.add_argument('--timeout',
dest='timeout',
metavar='<time_in_seconds>',
help='[default: 120] Slurm node allocation and VNC startup timeout length (in seconds)',
default=120,
type=int)
parser_create.add_argument('-t', '--time',
dest='time',
metavar='<time_in_hours>',
help='Subnode reservation time (in hours)',
type=int)
parser_create.add_argument('-c', '--cpus',
dest='cpus',
metavar='<num_cpus>',
help='Subnode cpu count',
default=1,
type=int)
parser_create.add_argument('-G', '--gpus',
dest='gpus',
metavar='[type:]<num_gpus>',
help='Subnode gpu count',
default="0"
type=str)
parser_create.add_argument('--mem',
dest='mem',
metavar='<NUM[K|M|G|T]>',
help='Subnode memory amount with units',
type=str)
parser_create.add_argument('--container',
dest='sing_container',
metavar='<path_to_container.sif>',
help='Path to VNC Apptainer/Singularity Container (.sif)',
required=True,
type=str)

# status command
parser_status = subparsers.add_parser('status',
help='Print details of all VNC jobs with given job name and exit')

# kill command
parser_kill = subparsers.add_parser('kill',
help='Kill specified job')

kiLl_group = parser_kill.add_mutually_exclusive_group(required=True)
kiLl_group.add_argument('job_id',
metavar='<job_id>',
help='Kill specified VNC session, cancel its VNC job, and exit',
type=int)

kiLl_group.add_argument('-a', '--all',
action='store_true',
dest='kill_all',
help='Stop all VNC sessions and exit')

parser_kill.set_defaults(func=kill)



arg_parser = create_arg_parser()
args = (arg_parser).parse_args()

print(args.func(*args.operands))
Loading

0 comments on commit 949762c

Please sign in to comment.