diff --git a/hyakvnc.sh b/hyakvnc.sh new file mode 100755 index 0000000..3408b20 --- /dev/null +++ b/hyakvnc.sh @@ -0,0 +1,209 @@ +#! /usr/bin/env bash + +set -f # Disable pathname expansion. +#set -o pipefail # Return exit status of the last command in the pipe that failed. +#set -e # Exit immediately if a command exits with a non-zero status. +[ "$DEBUG" == 1 ] && set -x + +CLUSTER_ADDRESS=${CLUSTER_ADDRESS:=klone.hyak.uw.edu} +PORT_ON_CLIENT=${PORT_ON_CLIENT:=5901} +CURRENT_HOSTNAME=$(hostname) + +# List of Klone login node hostnames +LOGIN_NODE_LIST='klone-login01 klone1.hyak.uw.edu klone2.hyak.uw.edu' + +# Slurm configuration variables: +export SBATCH_ACCOUNT="${SBATCH_ACCOUNT:=escience}" +export SBATCH_DEBUG="${SBATCH_DEBUG:=0}" +export SBATCH_GPUS="${SBATCH_GPUS:=0}" +export SBATCH_PARTITION="${SBATCH_PARTITION:=gpu-a40}" +export SBATCH_TIMELIMIT="${SBATCH_TIMELIMIT:=1:00:00}" +export ARG_SBATCH_NTASKS="${ARG_SBATCH_NTASKS:=4}" + +# Check if command exists: +function _command_exists { + command -v "$1" >/dev/null +} + +# Check if the current host is a login node: +function _is_login_node { + echo "${LOGIN_NODE_LIST[@]}" | grep -q "${CURRENT_HOSTNAME}" +} + +APPTAINER_INSTANCES_DIR=${APPTAINER_INSTANCES_DIR:=${HOME}/.apptainer/instances} + +function _find_vnc_instance_pid_files { + # Make sure globstar is enabled + shopt -s globstar + for f in "$APPTAINER_INSTANCES_DIR"/app/**/*.json; do + logOutPath=$(python3 -c'import json, sys;print(json.load(open(sys.argv[1]))["logOutPath"])' "$f" 2>/dev/null) + [ -z "$logOutPath" ] && continue + [ ! -f "$logOutPath" ] && continue + + vncPidFile=$(sed -E '/Log file is/!d; s/^.*[/]//; s/[.]log$/.pid/' "${logOutPath}") + vncPidPath="${HOME}/.vnc/${vncPidFile}" + if [ ! -f "$vncPidPath" ]; then + echo "Could not find vncPidFile at ${vncPidPath}" + continue + fi + echo $vncPidFile + done + shopt -u globstar +} + +_log() { + _ANSI_BLUE= _ANSI_BOLD= _ANSI_CYAN= _ANSI_GREEN= _ANSI_MAGENTA= _ANSI_RED= _ANSI_RESET= _ANSI_UNDERLINE= _ANSI_YELLOW= + [[ "${BASH_SOURCE[0]:-}" != "${0}" ]] && sourced=1 || sourced=0 + [[ -t 1 ]] && piped=0 || piped=1 # detect if output is piped + if [[ $piped -eq 0 ]]; then + _ANSI_GREEN="\033[92m" + _ANSI_RED="\033[91m" + _ANSI_CYAN="\033[36m" + _ANSI_YELLOW="\033[93m" + _ANSI_MAGENTA="\033[95m" + _ANSI_BLUE="\033[94m" + _ANSI_RESET="\033[0m" + _ANSI_BOLD="\033[1m" + _ANSI_UNDERLINE="\033[4m" + fi + level="${1}" + color="${_ANSI_RESET}" + shift + case "${level}" in + INFO) + color="${_ANSI_CYAN}" + ;; + WARN) + color="${_ANSI_YELLOW}" + ;; + ERROR) + color="${_ANSI_RED}" + ;; + esac + printf "$(date --rfc-3339=s) ${_ANSI_BOLD}${color}${level}${_ANSI_RESET}: ${color}$* ${_ANSI_RESET}\n" +} + +function list_vnc_pids { + for f in $HOME/.vnc/*.pid; do + vnchostfull=$(basename ${f%%.pid}) + vnchost=${vnchostfull%%:*} + vncport=${vnchostfull##*:} + echo $vnchost $vncport + done +} + +function show_connection_string_for_linux_host { + echo 'ssh -f -J klone.hyak.uw.edu g3071 -L 5901:localhost:5901 sleep 10; vncviewer localhost:5901' + vnchost=$1 + vncport=$2 + echo "vncviewer ${vnchost}:${vncport}" +} + +function generate_ssh_command { + echo "ssh -f -N -J \"${CLUSTER_HOSTNAME}\" \${HOSTNAME 5901:localhost:5901" +} + +function status { + _log ERROR "status task not implemented" +} + +function kill { + _log ERROR "kill task not implemented" +} + +function kill-all { + _log ERROR "kill-all task not implemented" +} + +function set-passwd { + _log ERROR "set-passwd task not implemented" +} + +function repair { + _log ERROR "repair task not implemented" +} +function build { + _log ERROR "build task not implemented" +} + +sbatch -A escience --job-name thing2 -p gpu-a40 -c 1 --mem=1G --time=1:00:00 --wrap "apptainer instance start $(realpath ~/code/apptainer-test/looping/looping.sif) thing2 && while true; do sleep 10; done" + +function launch { + ARGS_TO_SBATCH="" + NTASK_ARGUMENT="--ntasks=${ARG_SBATCH_NTASKS}" + for i in "$@"; do + case $i in + --dry-run) + DRY_RUN=1 + shift + ;; + -n=* | --ntasks=*) + NTASK_ARGUMENT="$1" + shift # past argument=value + ;; + -c=* | --cpus-per-task=*) + NTASK_ARGUMENT="$1" + shift # past argument=value + ;; + --sif) + shift + [ -z "$1" ] && _log ERROR "--sif requires a non-empty option argument" && exit 1 + SIF_FILE="$1" + shift # past argument=value + ;; + *) + ARGS_TO_SBATCH="${ARGS_TO_SBATCH} $1" + shift + ;; + esac + done + [ -z "$SIF_FILE" ] && _log ERROR "Requires a --sif argument specifying the path to the container image" && exit 1 + SIF_PATH=$(realpath "${SIF_FILE}") + SIF_BASENAME="$(basename "${SIF_FILE}")" + SIF_NAME="${SIF_BASENAME%.*}" + + [ ! -f "$SIF_PATH" ] && _log ERROR "--sif requires a file to be present at ${SIF_FILE}" && exit 1 + + ARGS_TO_SBATCH="${NTASK_ARGUMENT} ${ARGS_TO_SBATCH}" + ARGS_TO_SBATCH="${ARGS_TO_SBATCH/ /}" # Remove double spaces + + _log INFO "Launching job with sif file ${SIF_FILE} and args ${ARGS_TO_SBATCH}" + + #sbatch -A escience --job-name thing2 -p gpu-a40 -c 1 --mem=1G --time=1:00:00 --wrap "apptainer instance start $(realpath ~/code/apptainer-test/looping/looping.sif) thing2 && while true; do sleep 10; done" + COMMAND_TO_RUN= + _log INFO "Will run the following command:" + echo "${COMMAND_TO_RUN}" + + if [ ! "$DRY_RUN" ]; then + ! _command_exists sbatch && echo "ERROR: sbatch not found" && exit 1 + ! _is_login_node && echo "ERROR: You must run this from a login node. This is not a login node." && exit 1 + sbatch ${ARGS_TO_SBATCH} --job-name ${SIF_NAME} --wrap "apptainer instance start ${SIF_PATH} ${SIF_NAME}-\$SLURM_JOBID && while true; do sleep 10; done" + else + _log INFO "Dry run. Not running sbatch." + _log INFO "Would have run sbatch ${ARGS_TO_SBATCH} --job-name ${SIF_NAME} --wrap \"apptainer instance start ${SIF_PATH} ${SIF_NAME}-\$SLURM_JOBID && while true; do sleep 10; done\"" + fi +} + +function default { + echo "Args:" "$@" + help +} + +function help { + echo "$0 " + echo "Tasks:" + compgen -A function | grep -v '^_' | cat -n +} + +# Set default action: +ACTION=default + +# If the first argument is a function in this file, set it to the action: +if [ $# -gt 0 ] && [ $(compgen -A function | grep '^'"$1"'$') ]; then + [ "$DEBUG" == 1 ] && echo "Setting action to $1" + ACTION="$1" + shift +fi + +# Run the action: +"$ACTION" "$@" diff --git a/hyakvnc/LoginNode.py b/hyakvnc/LoginNode.py new file mode 100644 index 0000000..d0749fc --- /dev/null +++ b/hyakvnc/LoginNode.py @@ -0,0 +1,549 @@ + +from . import Node +from . import SubNode +from .config import BASE_VNC_PORT, APPTAINER_BIN, LOGIN_NODE_LIST +import subprocess +import logging +import os +import signal +from pathlib import Path +class LoginNode(Node): + """ + The LoginNode class specifies Hyak login node for its Slurm and SSH + capabilities. + """ + + def __init__(self, name, sing_container, debug=False): + assert os.path.exists(APPTAINER_BIN) + super().__init__(name, sing_container, debug) + self.subnode = None + + def find_nodes(self, job_name="vnc"): + """ + Returns a set of subnodes with given job name and returns None otherwise + """ + ret = set() + command = f"squeue | grep {os.getlogin()} | grep {job_name}" + proc = self.run_command(command) + while True: + line = str(proc.stdout.readline(), "utf-8") + if self.debug: + logging.debug(f"find_nodes: {line}") + if not line: + if not ret: + return None + return ret + if os.getlogin() in line: + # match against pattern: + # 864877 compute-h vnc hansem7 R 4:05 1 n3000 + # or the following if a node is in the process of being acquired + # 870400 compute-h vnc hansem7 PD 0:00 1 (Resources) + # or the following if a node failed to be acquired and needs to be killed + # 984669 compute-h vnc hansem7 PD 0:00 1 (QOSGrpCpuLimit) + pattern = re.compile( + """ + (\s+) + (?P[0-9]+) + (\s+[^ ]+){6} + (\s+) + (?P[^\s]+) + """, + re.VERBOSE, + ) + match = pattern.match(line) + assert match is not None + name = match.group("subnode_name") + job_id = match.group("job_id") + if "Resources" in name: + # Quit if another node is being allocated (from another process?) + proc.kill() + msg = f"Warning: Already allocating node with job {job_id}" + print(msg) + if self.debug: + logging.info(f"name: {name}") + logging.info(f"job_id: {job_id}") + logging.warning(msg) + elif "QOS" in name: + proc.kill() + msg = f"Warning: job {job_id} needs to be killed" + print(msg) + print(f"Please run this script again with '--kill {job_id}' argument") + if self.debug: + logging.info(f"name: {name}") + logging.info(f"job_id: {job_id}") + logging.warning(msg) + elif self.debug: + msg = f"Found active subnode {name} with job ID {job_id}" + logging.debug(msg) + tmp = SubNode(name, job_id, "", "", self.debug) + ret.add(tmp) + return None + + def check_vnc_password(self): + """ + Returns True if vnc password is set and False otherwise + """ + return os.path.exists(os.path.expanduser("~/.vnc/passwd")) + + def set_vnc_password(self): + """ + Set VNC password + """ + cmd = f"{self.get_sing_exec()} vncpasswd" + self.call_command(cmd) + + def call_command(self, command: str): + """ + Call command (with arguments) on login node (to allow user interaction). + + Args: + command:str : command and its arguments to run on subnode + + Returns command exit status + """ + if self.debug: + msg = f"Calling on {self.name}: {command}" + print(msg) + logging.debug(msg) + return subprocess.call(command, shell=True) + + def run_command(self, command): + """ + Run command (with arguments) on login node. + Commands can be in either str or list format. + + Example: + cmd_str = "echo hi" + cmd_list = ["echo", "hi"] + + Args: + command : command and its arguments to run on login node + + Returns subprocess with stderr->stdout and stdout->PIPE + """ + assert command is not None + if self.debug: + msg = f"Running on {self.name}: {command}" + print(msg) + logging.debug(msg) + if isinstance(command, list): + return subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + elif isinstance(command, str): + return subprocess.Popen( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + + def reserve_node( + self, + res_time=3, + timeout=10, + cpus=8, + gpus="0", + mem="16G", + partition="compute-hugemem", + account="ece", + job_name="vnc", + ): + """ + Reserves a node and waits until the node has been acquired. + + Args: + res_time: Number of hours to reserve sub node + timeout: Number of seconds to wait for node allocation + cpus: Number of cpus to allocate + gpus: Number of gpus to allocate with optional type specifier + (Examples: "a40:2" for NVIDIA A40, "1" for single GPU) + mem: Amount of memory to allocate (Examples: "8G" for 8GiB of memory) + partition: Partition name (see `man salloc` on --partition option for more information) + account: Account name (see `man salloc` on --account option for more information) + job_name: Slurm job name displayed in `squeue` + + Returns SubNode object if it has been acquired successfully and None otherwise. + """ + cmd = [ + "timeout", + str(timeout), + "salloc", + "-J", + job_name, + "--no-shell", + "-p", + partition, + "-A", + account, + "-t", + f"{res_time}:00:00", + "--mem=" + mem, + "--gpus=" + gpus, + "-c", + str(cpus), + ] + proc = self.run_command(cmd) + + alloc_stat = False + subnode_job_id = None + subnode_name = None + + def __reserve_node_irq_handler__(signalNumber, frame): + """ + Pass SIGINT to subprocess and exit program. + """ + if self.debug: + msg = f"reserve_node: Caught signal: {signalNumber}" + print(msg) + logging.info(msg) + proc.send_signal(signal.SIGINT) + print("Cancelled node allocation. Exiting...") + exit(1) + + # Stop allocation when SIGINT (CTRL+C) and SIGTSTP (CTRL+Z) signals + # are detected. + signal.signal(signal.SIGINT, __reserve_node_irq_handler__) + signal.signal(signal.SIGTSTP, __reserve_node_irq_handler__) + + print( + f"Allocating node with {cpus} CPU(s), {gpus.split(':').pop()} GPU(s), and {mem} RAM for {res_time} hours..." + ) + while proc.poll() is None and not alloc_stat: + print("...") + line = str(proc.stdout.readline(), "utf-8").strip() + if self.debug: + msg = f"reserve_node: {line}" + logging.debug(msg) + if "Pending" in line or "Granted" in line: + # match against pattern: + # salloc: Pending job allocation 864875 + # salloc: Granted job allocation 864875 + pattern = re.compile( + """ + (salloc:\s) + ((Granted)|(Pending)) + (\sjob\sallocation\s) + (?P[0-9]+) + """, + re.VERBOSE, + ) + match = pattern.match(line) + if match is not None: + subnode_job_id = match.group("job_id") + elif "are ready for job" in line: + # match against pattern: + # salloc: Nodes n3000 are ready for job + pattern = re.compile( + """ + (salloc:\sNodes\s) + (?P[ngz][0-9]{4}) + (\sare\sready\sfor\sjob) + """, + re.VERBOSE, + ) + match = pattern.match(line) + if match is not None: + subnode_name = match.group("node_name") + alloc_stat = True + break + elif self.debug: + msg = f"Skipping line: {line}" + print(msg) + logging.debug(msg) + if proc.stdout is not None: + proc.stdout.close() + if proc.stderr is not None: + proc.stderr.close() + + if not alloc_stat: + # check if node actually got reserved + # Background: Sometimes salloc does not print allocated node names + # at the end, so we have to check with squeue + if subnode_job_id is not None: + tmp_nodes = self.find_nodes(job_name) + for tmp_node in tmp_nodes: + if self.debug: + logging.debug( + f"reserve_node: fallback: Checking {tmp_node.name} with Job ID {tmp_node.job_id}" + ) + if tmp_node.job_id == subnode_job_id: + if self.debug: + logging.debug(f"reserve_node: fallback: Match found") + # get subnode name + subnode_name = tmp_node.name + break + else: + return None + if subnode_name is None: + msg = "Error: node allocation timed out." + print(msg) + if self.debug: + logging.error(msg) + return None + + assert subnode_job_id is not None + assert subnode_name is not None + self.subnode = SubNode(subnode_name, subnode_job_id, self.sing_container, self.xstartup) + return self.subnode + + def cancel_job(self, job_id: int): + """ + Cancel specified job ID + + Reference: + See `man scancel` for more information on usage + """ + msg = f"Canceling job ID {job_id}" + print(f"\t{msg}") + if self.debug: + logging.debug(msg) + proc = self.run_command(["scancel", str(job_id)]) + print(str(proc.communicate()[0], "utf-8")) + + def check_port(self, port: int): + """ + Returns True if port is unused and False if used. + """ + if self.debug: + logging.debug(f"Checking if port {port} is used...") + cmd = f"netstat -ant | grep LISTEN | grep {port}" + proc = self.run_command(cmd) + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + if self.debug: + logging.debug(f"netstat line: {line}") + if str(port) in line: + return False + return True + + def get_port(self): + """ + Returns unused port number if found and None if not found. + """ + # 300 is arbitrary limit + for i in range(0, 300): + port = BASE_VNC_PORT + i + if self.check_port(port): + return port + return None + + def create_port_forward(self, login_port: int, subnode_port: int): + """ + Port forward between login node and subnode + + Args: + login_port:int : Login node port number + subnode_port:int : Subnode port number + + Returns True if port forward succeeds and False otherwise. + """ + assert self.subnode is not None + assert self.subnode.name is not None + msg = f"Creating port forward: Login node({login_port})<->Subnode({subnode_port})" + if self.debug: + logging.debug(msg) + else: + print(f"{msg}...", end="", flush=True) + cmd = f"ssh -N -f -L {login_port}:127.0.0.1:{subnode_port} {self.subnode.hostname} &> /dev/null" + status = self.call_command(cmd) + + if status == 0: + # wait (at most ~20 seconds) until port forward succeeds + count = 0 + port_used = not self.check_port(login_port) + while count < 20 and not port_used: + if self.debug: + msg = f"create_port_forward: attempt #{count + 1}: port used? {port_used}" + logging.debug(msg) + port_used = not self.check_port(login_port) + count += 1 + time.sleep(1) + + if port_used: + if self.debug: + msg = f"Successfully created port forward" + logging.info(msg) + else: + print("\x1b[1;32m" + "Success" + "\x1b[0m") + return True + if self.debug: + msg = f"Error: Failed to create port forward" + logging.error(msg) + else: + print("\x1b[1;31m" + "Failed" + "\x1b[0m") + return False + + def get_port_forwards(self, nodes=None): + """ + For each node in the SubNodes set `nodes`, get a port map between login + node port and subnode port, and then fill `vnc_port` and + `vnc_display_number` subnode attributes if None. + + Example: + Suppose we have the following VNC sessions (on a single user account): + n3000 with a login<->subnode port forward from 5900 to 5901, + n3000 with a login<->subnode port forward from 5901 to 5902, + n3042 with a login<->subnode port forward from 5903 to 5901. + + This function returns the following: + { "n3000" : {5901:5900, 5902:5901}, "n3042" : {5901:5903} } + + Args: + nodes : A set of SubNode objects with names to inspect + + Returns a dictionary with node name as keys and + LoginNodePort (value) <-> SubNodePort (key) dictionary as value. + """ + node_port_map = dict() + if nodes is not None: + for node in nodes: + if "(" not in node.name: + port_map = dict() + cmd = f"ps x | grep ssh | grep {node.name}" + proc = self.run_command(cmd) + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + if cmd not in line: + # Match against pattern: + # 1974577 ? Ss 0:20 ssh -N -f -L 5902:127.0.0.1:5902 n3065.hyak.local + pattern = re.compile( + """ + ([^\s]+(\s)+){4} + (ssh\s-N\s-f\s-L\s(?P[0-9]+):127.0.0.1:(?P[0-9]+)) + """, + re.VERBOSE, + ) + match = re.match(pattern, line) + if match is not None: + ln_port = int(match.group("ln_port")) + sn_port = int(match.group("sn_port")) + port_map.update({sn_port: ln_port}) + node_port_map.update({node.name: port_map}) + return node_port_map + + def get_job_port_forward(self, job_id: int, node_name: str, node_port_map: dict): + """ + Returns tuple containing LoginNodePort and SubNodePort for given job ID + and node_name. Returns None on failure. + """ + if self.get_time_left(job_id) is not None: + port_map = node_port_map[node_name] + if port_map is not None: + subnode = SubNode(node_name, job_id, self.sing_container, "", self.debug) + for vnc_port in port_map.keys(): + display_number = vnc_port - BASE_VNC_PORT + if self.debug: + logging.debug( + f"get_job_port_forward: Checking job {job_id} vnc_port {vnc_port}" + ) + # get PID from VNC pid file + pid = subnode.get_vnc_pid(subnode.name, display_number) + if pid is None: + # try long hostname + pid = subnode.get_vnc_pid(subnode.hostname, display_number) + # if PID is active, then we have a hit for a specific job + if pid is not None and subnode.check_pid(pid): + if self.debug: + logging.debug( + f"get_job_port_forward: {job_id} has vnc_port {vnc_port} and login node port {port_map[vnc_port]}" + ) + return (vnc_port, port_map[vnc_port]) + return None + + def get_time_left(self, job_id: int, job_name="vnc"): + """ + Returns the time remaining for given job ID or None if the job is not + present. + """ + cmd = f'squeue -o "%L %.18i %.8j %.8u %R" | grep {os.getlogin()} | grep {job_name} | grep {job_id}' + proc = self.run_command(cmd) + if proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8") + return line.split(" ", 1)[0] + return None + + def print_props(self): + """ + Print all properties (including subnode properties) + """ + print("Login node properties:") + props = vars(self) + for item in props: + msg = f"{item} : {props[item]}" + print(f"\t{msg}") + if self.debug: + logging.debug(msg) + if item == "subnode" and props[item] is not None: + props[item].print_props() + + def print_status(self, job_name: str, node_set=None, node_port_map=None): + """ + Print details of each active VNC job in node_set. VNC port and display + number should be in node_port_map. + """ + print(f"Active {job_name} jobs:") + if node_set is not None: + for node in node_set: + mapped_port = None + if node_port_map and node_port_map[node.name]: + port_forward = self.get_job_port_forward(node.job_id, node.name, node_port_map) + if port_forward: + vnc_port = port_forward[0] + mapped_port = port_forward[1] + node.vnc_display_number = vnc_port + BASE_VNC_PORT + node_port_map.get(node.name).pop(vnc_port) + time_left = self.get_time_left(node.job_id, job_name) + vnc_active = mapped_port is not None + ssh_cmd = f"ssh -N -f -L {mapped_port}:127.0.0.1:{mapped_port} {os.getlogin()}@klone.hyak.uw.edu" + print(f"\tJob ID: {node.job_id}") + print(f"\t\tSubNode: {node.name}") + print(f"\t\tTime left: {time_left}") + print(f"\t\tVNC active: {vnc_active}") + if vnc_active: + print(f"\t\tVNC display number: {vnc_port - BASE_VNC_PORT}") + print(f"\t\tVNC port: {vnc_port}") + print(f"\t\tMapped LoginNode port: {mapped_port}") + print(f"\t\tRun command: {ssh_cmd}") + + def repair_ln_sn_port_forwards(self, node_set=None, node_port_map=None): + """ + Re-creates port forwards missing from vnc jobs. + Useful when LoginNode restarts but VNC jobs remain alive. + """ + if node_set is not None: + for node in node_set: + if node_port_map and node_port_map[node.name]: + print(f"{node.name} with job ID {node.job_id} already has valid port forward") + else: + subnode = SubNode( + node.name, node.job_id, self.sing_container, self.xstartup, self.debug + ) + subnode_pids = subnode.list_pids() + # search for vnc process + proc = subnode.run_command("ps x | grep vnc") + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + pid = int(line.split(" ", 1)[0]) + # match found + if pid in subnode_pids: + if self.debug: + logging.debug( + f"repair_ln_sn_port_forwards: VNC PID {pid} found for job ID {node.job_id}" + ) + pattern = re.compile( + """ + (vnc\s+:) + (?P\d+) + """, + re.VERBOSE, + ) + match = re.search(pattern, line) + assert match is not None + vnc_port = BASE_VNC_PORT + int(match.group("display_number")) + u2h_port = self.get_port() + if self.debug: + logging.debug( + f"repair_ln_sn_port_forwards: LoginNode({u2h_port})<->JobID({vnc_port})" + ) + if u2h_port is None: + print(f"Error: cannot find available/unused port") + continue + else: + self.subnode = subnode + self.create_port_forward(u2h_port, vnc_port) diff --git a/hyakvnc/Node.py b/hyakvnc/Node.py new file mode 100644 index 0000000..e888e6b --- /dev/null +++ b/hyakvnc/Node.py @@ -0,0 +1,24 @@ +from .config import APPTAINER_BIN, APPTAINER_BINDPATH +class Node: + """ + The Node class has the following initial data: bool: debug, string: name. + + debug: Print and log debug messages if True. + name: Shortened hostname of node. + """ + + def __init__(self, name, sing_container, debug=False): + self.debug = debug + self.name = name + self.sing_container = os.path.abspath(sing_container) + + def get_sing_exec(self, args=""): + """ + Added before command to execute inside an apptainer (singularity) container. + + Arg: + args: Optional arguments passed to `apptainer exec` + + Return apptainer exec string + """ + return f"{APPTAINER_BIN} exec {args} -B {APPTAINER_BINDPATH} {self.sing_container}" diff --git a/hyakvnc/SubNode.py b/hyakvnc/SubNode.py new file mode 100644 index 0000000..6defbf3 --- /dev/null +++ b/hyakvnc/SubNode.py @@ -0,0 +1,307 @@ +from . import Node +from .config import BASE_VNC_PORT +class SubNode(Node): + """ + The SubNode class specifies a node requested via Slurm (also known as work + or interactive node). SubNode class is initialized with the following: + bool: debug, string: name, string: hostname, int: job_id. + + SubNode class with active VNC session may contain vnc_display_number and + vnc_port. + + debug: Print and log debug messages if True. + name: Shortened subnode hostname (e.g. n3000) described inside `/etc/hosts`. + hostname: Full subnode hostname (e.g. n3000.hyak.local). + job_id: Slurm Job ID that allocated the node. + vnc_display_number: X display number used for VNC session. + vnc_port: vnc_display_number + BASE_VNC_PORT. + """ + + def __init__(self, name, job_id, sing_container, xstartup, debug=False): + super().__init__(name, sing_container, xstartup, debug) + self.hostname = f"{name}.hyak.local" + self.job_id = job_id + self.vnc_display_number = None + self.vnc_port = None + + def print_props(self): + """ + Print properties of SubNode object. + """ + print("SubNode properties:") + props = vars(self) + for item in props: + msg = f"{item} : {props[item]}" + print(f"\t{msg}") + if self.debug: + logging.debug(msg) + + def run_command(self, command: str, timeout=None): + """ + Run command (with arguments) on subnode + + + Args: + command:str : command and its arguments to run on subnode + timeout : [Default: None] timeout length in seconds + + Returns ssh subprocess with stderr->stdout and stdout->PIPE + """ + assert self.name is not None + cmd = ["ssh", self.hostname, command] + if timeout is not None: + cmd.insert(0, "timeout") + cmd.insert(1, str(timeout)) + if self.debug: + msg = f"Running on {self.name}: {cmd}" + print(msg) + logging.info(msg) + return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + def list_pids(self): + """ + Returns list of PIDs of current job_id using + `scontrol listpids `. + """ + ret = list() + cmd = f"scontrol listpids {self.job_id}" + proc = self.run_command(cmd) + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + if self.debug: + msg = f"list_pids: {line}" + logging.debug(msg) + if "PID" in line: + pass + elif re.match("[0-9]+", line): + pid = int(line.split(" ", 1)[0]) + ret.append(pid) + return ret + + def check_pid(self, pid: int): + """ + Returns True if given pid is active in job_id and False otherwise. + """ + return pid in self.list_pids() + + def get_vnc_pid(self, hostname, display_number): + """ + Returns pid from file :.pid or None if file + does not exist. + """ + if hostname is None: + hostname = self.hostname + if display_number is None: + display_number = self.vnc_display_number + assert hostname is not None + if display_number is not None: + filepaths = glob.glob(os.path.expanduser(f"~/.vnc/{hostname}*:{display_number}.pid")) + for path in filepaths: + try: + f = open(path, "r") + except: + pass + return int(f.readline()) + return None + + def check_vnc(self): + """ + Returns True if VNC session is active and False otherwise. + """ + assert self.name is not None + assert self.job_id is not None + pid = self.get_vnc_pid(self.hostname, self.vnc_display_number) + if pid is None: + pid = self.get_vnc_pid(self.name, self.vnc_display_number) + if pid is None: + return False + if self.debug: + logging.debug(f"check_vnc: Checking VNC PID {pid}") + return self.check_pid(pid) + + def start_vnc(self, display_number=None, extra_args="", timeout=20): + """ + Starts VNC session + + Args: + display_number: Attempt to acquire specified display number if set. + If None, then let vncserver determine display number. + extra_args: Optional arguments passed to `apptainer exec` + timeout: timeout length in seconds + + Returns True if VNC session was started successfully and False otherwise + """ + target = "" + if display_number is not None: + target = f":{display_number}" + vnc_cmd = f"{self.get_sing_exec(extra_args)} vncserver {target} -xstartup {self.xstartup} &" + if not self.debug: + print("Starting VNC server...", end="", flush=True) + proc = self.run_command(vnc_cmd, timeout=timeout) + + # get display number and port number + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + + if line is not None: + if self.debug: + logging.debug(f"start_vnc: {line}") + if "desktop" in line: + # match against the following pattern: + # New 'n3000.hyak.local:1 (hansem7)' desktop at :1 on machine n3000.hyak.local + # New 'n3000.hyak.local:6 (hansem7)' desktop is n3000.hyak.local:6 + pattern = re.compile( + """ + (New\s) + (\'([^:]+:(?P[0-9]+))\s([^\s]+)\s) + """, + re.VERBOSE, + ) + match = re.match(pattern, line) + assert match is not None + self.vnc_display_number = int(match.group("display_number")) + self.vnc_port = self.vnc_display_number + BASE_VNC_PORT + if self.debug: + logging.debug(f"Obtained display number: {self.vnc_display_number}") + logging.debug(f"Obtained VNC port: {self.vnc_port}") + else: + print("\x1b[1;32m" + "Success" + "\x1b[0m") + return True + if self.debug: + logging.error("Failed to start vnc session (Timeout/?)") + else: + print("\x1b[1;31m" + "Timed out" + "\x1b[0m") + return False + + def list_vnc(self): + """ + Returns a list of active and stale vnc sessions on subnode. + """ + active = list() + stale = list() + cmd = f"{self.get_sing_exec()} vncserver -list" + # TigerVNC server sessions: + # + # X DISPLAY # PROCESS ID + #:1 7280 (stale) + #:12 29 (stale) + #:2 83704 (stale) + #:20 30 + #:3 84266 (stale) + #:4 90576 (stale) + pattern = re.compile(r":(?P\d+)\s+\d+(?P\s\(stale\))?") + proc = self.run_command(cmd) + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + match = re.search(pattern, line) + if match is not None: + display_number = match.group("display_number") + if match.group("stale") is not None: + stale.append(display_number) + else: + active.append(display_number) + return (active, stale) + + def __remove_files__(self, filepaths: list): + """ + Removes files on subnode and returns True on success and False otherwise. + + Arg: + filepaths: list of file paths to remove. Each entry must be a file + and not a directory. + """ + cmd = f"rm -f" + for path in filepaths: + cmd = f"{cmd} {path}" + cmd = f"{cmd} &> /dev/null" + if self.debug: + logging.debug(f"Calling ssh {self.hostname} {cmd}") + return subprocess.call(["ssh", self.hostname, cmd]) == 0 + + def __listdir__(self, dirpath): + """ + Returns a list of contents inside directory. + """ + ret = list() + cmd = f"test -d {dirpath} && ls -al {dirpath} | tail -n+4" + pattern = re.compile( + """ + ([^\s]+\s+){8} + (?P.*) + """, + re.VERBOSE, + ) + proc = self.run_command(cmd) + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + match = re.match(pattern, line) + if match is not None: + name = match.group("name") + ret.append(name) + return ret + + def kill_vnc(self, display_number=None): + """ + Kill specified VNC session with given display number or all VNC sessions. + """ + if display_number is None: + active, stale = self.list_vnc() + for entry in active: + if self.debug: + logging.debug(f"kill_vnc: active entry: {entry}") + self.kill_vnc(entry) + for entry in stale: + if self.debug: + logging.debug(f"kill_vnc: stale entry: {entry}") + self.kill_vnc(entry) + # Remove all remaining pid files + pid_list = glob.glob(os.path.expanduser("~/.vnc/*.pid")) + for pid_file in pid_list: + try: + os.remove(pid_file) + except: + pass + # Remove all owned socket files on subnode + # Note: subnode maintains its own /tmp/ directory + x11_unix = "/tmp/.X11-unix" + ice_unix = "/tmp/.ICE-unix" + file_targets = list() + for entry in self.__listdir__(x11_unix): + file_targets.append(f"{x11_unix}/{entry}") + for entry in self.__listdir__(ice_unix): + file_targets.append(f"{x11_unix}/{entry}") + self.__remove_files__(file_targets) + else: + assert display_number is not None + target = f":{display_number}" + if self.debug: + print(f"Attempting to kill VNC session {target}") + logging.debug(f"Attempting to kill VNC session {target}") + cmd = f"{self.get_sing_exec()} vncserver -kill {target}" + proc = self.run_command(cmd) + killed = False + while proc.poll() is None: + line = str(proc.stdout.readline(), "utf-8").strip() + # Failed attempt: + # Can't kill '29': Operation not permitted + # Killing Xtigervnc process ID 29... + # On successful attempt: + # Killing Xtigervnc process ID 29... success! + if self.debug: + logging.debug(f"kill_vnc: {line}") + if "success" in line: + killed = True + if self.debug: + logging.debug(f"kill_vnc: killed? {killed}") + # Remove target's pid file if present + try: + os.remove(os.path.expanduser(f"~/.vnc/{self.hostname}{target}.pid")) + except: + pass + try: + os.remove(os.path.expanduser(f"~/.vnc/{self.name}{target}.pid")) + except: + pass + # Remove associated /tmp/.X11-unix/ socket + socket_file = f"/tmp/.X11-unix/{display_number}" + self.__remove_files__([socket_file]) diff --git a/hyakvnc/__init__.py b/hyakvnc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hyakvnc/__main__.py b/hyakvnc/__main__.py new file mode 100644 index 0000000..926dfa7 --- /dev/null +++ b/hyakvnc/__main__.py @@ -0,0 +1,581 @@ + + +# Quick start guide +# +# 1. Build and install hyakvnc package +# $ python3 -m pip install --upgrade --user pip +# $ cd hyakvnc +# $ python3 -m pip install --user . +# +# 2. To start VNC session (on `ece` computing resources) for 5 hours on a node +# with 16 cores and 32GB of memory, run the following +# $ hyakvnc create -A ece -p compute-hugemem \ +# -t 5 -c 16 --mem 32G \ +# --container /path/to/container.sif \ +# --xstartup /path/to/xstartup +# ... +# ===================== +# Run the following in a new terminal window: +# ssh -N -f -L 5901:127.0.0.1:5901 hansem7@klone.hyak.uw.edu +# then connect to VNC session at localhost:5901 +# ===================== +# +# This should print a command to setup port forwarding. Copy it for the +# following step. +# +# 3. Set up port forward between your computer and HYAK login node. +# On your machine, in a new terminal window, run the the copied command. +# +# In this example, run +# $ ssh -N -f -L 5901:127.0.0.1:5901 hansem7@klone.hyak.uw.edu +# +# 4. Connect to VNC session at instructed address (in example: localhost:5901) +# +# 5. To close VNC session, run the following +# $ hyakvnc kill-all +# +# 6. Kill port forward process from step 3 + + +# Usage: hyakvnc [-h/--help] [OPTIONS] [COMMAND] [COMMAND_ARGS] +# +# Optional arguments: +# -h, --help : print help message and exit +# +# -v, --version : print program version and exit +# +# -d, --debug : [default: disabled] show debug messages and enable debug +# logging at ~/hyakvnc.log +# +# -J : [default: hyakvnc] Override Slurm job name +# +# Commands: +# +# create : Create VNC session +# +# Optional arguments for create: +# +# --timeout : [default: 120] Slurm node allocation and VNC startup timeout +# length (in seconds) +# +# -p , --port : [default: automatically found] override +# User<->LoginNode port +# +# -G [type:], --gpus [type:] : [default: '0'] GPU count +# with optional type specifier +# +# Required arguments for create: +# +# -p , --partition : Slurm partition +# +# -A , --account : Slurm account +# +# -t