Skip to content

Commit

Permalink
Fix infinite recursion, misleading warning "SSH command timeout"
Browse files Browse the repository at this point in the history
  • Loading branch information
albertz committed Nov 27, 2023
1 parent 450ed58 commit eeae612
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 38 deletions.
38 changes: 24 additions & 14 deletions sisyphus/load_sharing_facility_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Author: Wilfried Michel <[email protected]>

from typing import Any
import os
import subprocess

Expand Down Expand Up @@ -42,6 +43,11 @@ def __init__(self, default_rqmt, gateway=None, auto_clean_eqw=True):
self.default_rqmt = default_rqmt
self.auto_clean_eqw = auto_clean_eqw

def _system_call_timeout_warn_msg(self, command: Any) -> str:
if self.gateway:
return f"SSH command timeout: {command!s}"
return f"Command timeout: {command!s}"

def system_call(self, command, send_to_stdin=None):
if self.gateway:
system_command = ["ssh", "-x", self.gateway] + [" ".join(["cd", os.getcwd(), "&&"] + command)]
Expand Down Expand Up @@ -177,14 +183,16 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, rangestring):
+ "\n"
)

try:
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
out, err, retval = self.system_call(bsub_call, command)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.submit_helper(call, logpath, rqmt, name, task_name, rangestring)
while True:
try:
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
out, err, retval = self.system_call(bsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

ref_output = ["Job", "is", "submitted", "to", "queue"]
ref_output = [i.encode() for i in ref_output]
Expand Down Expand Up @@ -238,12 +246,14 @@ def queue_state(self):

# get bjobs output
system_command = ["bjobs", "-w"]
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.queue_state()
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

task_infos = defaultdict(list)
for line in out[1:]:
Expand Down
34 changes: 22 additions & 12 deletions sisyphus/simple_linux_utility_for_resource_management_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Author: Wilfried Michel <[email protected]>

from typing import Any
from collections import defaultdict, namedtuple
from enum import Enum
import getpass # used to get username
Expand Down Expand Up @@ -70,6 +71,11 @@ def __init__(
self.memory_allocation_type = memory_allocation_type
self.job_name_mapping = job_name_mapping

def _system_call_timeout_warn_msg(self, command: Any) -> str:
if self.gateway:
return f"SSH command timeout: {command!s}"
return f"Command timeout: {command!s}"

def system_call(self, command, send_to_stdin=None):
"""
:param list[str] command: qsub command
Expand Down Expand Up @@ -223,12 +229,14 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,
sbatch_call += ["-a", "%i-%i:%i" % (start_id, end_id, step_size)]
command = '"' + " ".join(call) + '"'
sbatch_call += ["--wrap=%s" % " ".join(call)]
try:
out, err, retval = self.system_call(sbatch_call)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size)
while True:
try:
out, err, retval = self.system_call(sbatch_call)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

ref_output = ["Submitted", "batch", "job"]
ref_output = [i.encode() for i in ref_output]
Expand Down Expand Up @@ -285,12 +293,14 @@ def queue_state(self):
"-O",
"arrayjobid,arraytaskid,state,name:1000",
]
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.queue_state()
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

task_infos = defaultdict(list)
for line in out:
Expand Down
34 changes: 22 additions & 12 deletions sisyphus/son_of_grid_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Author: Jan-Thorsten Peter <[email protected]>

from typing import Any
import os
import subprocess

Expand Down Expand Up @@ -73,6 +74,11 @@ def __init__(self, default_rqmt, gateway=None, auto_clean_eqw=True, ignore_jobs=
self.ignore_jobs = ignore_jobs
self.pe_name = pe_name

def _system_call_timeout_warn_msg(self, command: Any) -> str:
if self.gateway:
return f"SSH command timeout: {command!s}"
return f"Command timeout: {command!s}"

def system_call(self, command, send_to_stdin=None):
"""
:param list[str] command: qsub command
Expand Down Expand Up @@ -240,12 +246,14 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,

qsub_call += ["-t", "%i-%i:%i" % (start_id, end_id, step_size)]
command = " ".join(call) + "\n"
try:
out, err, retval = self.system_call(qsub_call, command)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size)
while True:
try:
out, err, retval = self.system_call(qsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

ref_output = ["Your", "job-array", '("%s")' % name, "has", "been", "submitted"]
ref_output = [i.encode() for i in ref_output]
Expand Down Expand Up @@ -305,12 +313,14 @@ def queue_state(self):

# get qstat output
system_command = ["qstat", "-xml", "-u", getpass.getuser()]
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning("SSH command timeout %s" % str(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
return self.queue_state()
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break

xml_data = "".join(i.decode("utf8") for i in out)

Expand Down

0 comments on commit eeae612

Please sign in to comment.