Skip to content

Commit

Permalink
Merge pull request #3358 from matthewrmshin/unittest-batch_sys_handle…
Browse files Browse the repository at this point in the history
…rs-slurm-and-lsf

Unit test batch sys handlers slurm and lsf
  • Loading branch information
hjoliver authored Sep 17, 2019
2 parents 0d8a299 + 8e20d45 commit 287ead8
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 92 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ Changed the `suite.rc` schema:
restarts, and reloads). Impact of the speedup is most noticeable when dealing
with suite configurations that contain tasks with many task outputs.

[#3358](https://github.com/cylc/cylc-flow/pull/3358) - on submitting jobs to
SLURM or LSF, the job names will now follow the pattern `task.cycle.suite`
(instead of `suite.task.cycle`), for consistency with jobs on PBS.

[#3356](https://github.com/cylc/cylc-flow/pull/3356) - default job name length
maximum for PBS is now 236 characters (i.e. assuming PBS 13 or newer). If you
are still using PBS 12 or older, you should add a site configuration to
Expand Down
15 changes: 1 addition & 14 deletions cylc/flow/batch_sys_handlers/at.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from subprocess import PIPE


class AtCommandHandler(object):
class AtCommandHandler():
"""Logic to submit jobs to the "at" batch system.
Submit the task job script to the simple 'at' scheduler. Note that
Expand Down Expand Up @@ -98,19 +98,6 @@ def filter_submit_output(self, out, err):
new_err += line
return out, new_err

@classmethod
def filter_poll_output(cls, out, job_id):
"""Return True if job_id is in the queueing system."""
# "atq" returns something like this:
# 5347 2013-11-22 10:24 a daisy
# 499 2013-12-22 16:26 a daisy
# "jid" is in queue if it matches column 1 of a row.
for line in out.splitlines():
items = line.strip().split(None, 1)
if items and items[0] == job_id:
return True
return False

@classmethod
def get_submit_stdin(cls, job_file_path, submit_opts):
"""Return proc_stdin_arg, proc_stdin_value."""
Expand Down
23 changes: 1 addition & 22 deletions cylc/flow/batch_sys_handlers/loadleveler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import re


class LoadlevelerHandler(object):
class LoadlevelerHandler():

"""Loadleveler job submission"""

Expand Down Expand Up @@ -68,27 +68,6 @@ def filter_submit_output(self, out, err):
new_err += line + "\n"
return out, new_err

@classmethod
def filter_poll_output(cls, out, job_id):
"""Return True if job_id is in the queueing system."""
# "llq -f%id JOB_ID" returns 0 whether JOB_ID is in the system or not.
# Therefore, we need to parse its output.
# "llq -f%id JOB_ID" returns EITHER something like:
# Step Id
# ------------------------
# a001.3274552.0
#
# 1 job step(s) in query, ...
# OR:
# llq: There is currently no job status to report.
# "jid" is in queue if it matches a stripped row.
for line in out.splitlines():
items = line.strip().split(None, 1)
if (items and
(items[0] == job_id or items[0].startswith(job_id + "."))):
return True
return False

@classmethod
def filter_poll_many_output(cls, out):
"""Return a list of job IDs still in the batch system.
Expand Down
24 changes: 7 additions & 17 deletions cylc/flow/batch_sys_handlers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,31 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""IBM Platform LSF bsub job submission"""

import math
import re


class LSFHandler(object):
class LSFHandler():
"""IBM Platform LSF bsub job submission"""

DIRECTIVE_PREFIX = "#BSUB "
FAIL_SIGNALS = ("EXIT", "ERR", "XCPU", "TERM", "INT", "SIGUSR2")
KILL_CMD_TMPL = "bkill '%(job_id)s'"
POLL_CMD = "bjobs"
REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P<id>\d+)>")
SUBMIT_CMD_TMPL = "bsub"

@classmethod
def filter_poll_output(cls, out, job_id):
"""Return True if job_id is in the queueing system."""
entries = out.strip().split()
return (len(entries) >= 3 and entries[0] == job_id and
entries[2] not in ["DONE", "EXIT"])

@classmethod
def format_directives(cls, job_conf):
"""Format the job directives for a job file."""
job_file_path = re.sub(r"\$HOME/", "", job_conf["job_file_path"])
directives = job_conf["directives"].__class__()
directives["-J"] = job_conf["suite_name"] + "." + job_conf["task_id"]
directives["-J"] = job_conf["task_id"] + "." + job_conf["suite_name"]
directives["-o"] = job_file_path + ".out"
directives["-e"] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
directives.get("-W") is None):
directives["-W"] = "%d" % (job_conf["execution_time_limit"] / 60)
if job_conf["execution_time_limit"] and directives.get("-W") is None:
directives["-W"] = str(math.ceil(
job_conf["execution_time_limit"] / 60))
for key, value in list(job_conf["directives"].items()):
directives[key] = value
lines = []
Expand All @@ -57,11 +52,6 @@ def format_directives(cls, job_conf):
lines.append("%s%s" % (cls.DIRECTIVE_PREFIX, key))
return lines

@classmethod
def get_fail_signals(cls, _):
"""Return a list of failure signal names to trap."""
return ["EXIT", "ERR", "XCPU", "TERM", "INT", "SIGUSR2"]

@classmethod
def get_submit_stdin(cls, job_file_path, _):
"""Return proc_stdin_arg, proc_stdin_value."""
Expand Down
39 changes: 12 additions & 27 deletions cylc/flow/batch_sys_handlers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@
import shlex


class SLURMHandler(object):
class SLURMHandler():
"""SLURM job submission and manipulation."""

DIRECTIVE_PREFIX = "#SBATCH "
# Do not include SIGTERM trapping, as SLURM tries to kill the
# parent script directly with SIGTERM rather than the process
# group as a whole. In these circumstances, this signal does
# not get handled. Bash waits for the (unsignalled) child to
# complete. This does not apply to jobs with proper 'steps'
# (e.g. using srun within an sbatch script), which are properly
# signalled.
# XCPU isn't used by SLURM at the moment, but it's a valid way
# to manually signal jobs using scancel or sbatch --signal.
FAIL_SIGNALS = ("EXIT", "ERR", "XCPU")
KILL_CMD_TMPL = "scancel '%(job_id)s'"
# N.B. The "squeue -j JOB_ID" command returns 1 if JOB_ID is no longer in
# the system, so there is no need to filter its output.
Expand All @@ -33,20 +43,13 @@ class SLURMHandler(object):
r"\ASubmitted\sbatch\sjob\s(?P<id>\d+)")
SUBMIT_CMD_TMPL = "sbatch '%(job)s'"

@classmethod
def filter_poll_output(cls, out, _):
"""Return True if job_id is in the queueing system."""
# squeue -h -j JOB_ID when JOB_ID has stopped can either exit with
# non-zero exit code or return blank text.
return out.strip()

@classmethod
def format_directives(cls, job_conf):
"""Format the job directives for a job file."""
job_file_path = re.sub(r'\$HOME/', '', job_conf['job_file_path'])
directives = job_conf['directives'].__class__()
directives['--job-name'] = (
job_conf['suite_name'] + '.' + job_conf['task_id'])
job_conf['task_id'] + '.' + job_conf['suite_name'])
directives['--output'] = job_file_path + ".out"
directives['--error'] = job_file_path + ".err"
if (job_conf["execution_time_limit"] and
Expand All @@ -64,24 +67,6 @@ def format_directives(cls, job_conf):
lines.append("%s%s" % (cls.DIRECTIVE_PREFIX, key))
return lines

@staticmethod
def get_fail_signals(_):
"""Return a list of failure signal names to trap.
Do not include SIGTERM trapping, as SLURM tries to kill the
parent script directly with SIGTERM rather than the process
group as a whole. In these circumstances, this signal does
not get handled. Bash waits for the (unsignalled) child to
complete. This does not apply to jobs with proper 'steps'
(e.g. using srun within an sbatch script), which are properly
signalled.
XCPU isn't used by SLURM at the moment, but it's a valid way
to manually signal jobs using scancel or sbatch --signal.
"""
return ["EXIT", "ERR", "XCPU"]

@classmethod
def get_poll_many_cmd(cls, job_ids):
"""Return the poll command for a list of job IDs."""
Expand Down
21 changes: 10 additions & 11 deletions cylc/flow/batch_sys_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@
job file directives are relevant for the batch system. The argument
"job_conf" is a dict containing the job configuration.
batch_sys.get_fail_signals(job_conf) => list of strings
* Return a list of names of signals to trap for reporting errors. Default
is ["EXIT", "ERR", "TERM", "XCPU"]. ERR and EXIT are always recommended.
EXIT is used to report premature stopping of the job script, and its trap
is unset at the end of the script.
batch_sys.get_poll_many_cmd(job-id-list) => list
* Return a list containing the shell command to poll the jobs in the
argument list.
Expand All @@ -73,6 +67,12 @@
batch_sys.manip_job_id(job_id) => job_id
* Modify the job ID that is returned by the job submit command.
batch_sys.FAIL_SIGNALS => tuple<str>
* A tuple containing the names of signals to trap for reporting errors.
Default is ("EXIT", "ERR", "TERM", "XCPU"). ERR and EXIT are always
recommended. EXIT is used to report premature stopping of the job
script, and its trap is unset at the end of the script.
batch_sys.KILL_CMD_TMPL
* A Python string template for getting the batch system command to remove
and terminate a job ID. The command is formed using the logic:
Expand Down Expand Up @@ -135,7 +135,7 @@
from cylc.flow.parsec.OrderedDict import OrderedDict


class JobPollContext(object):
class JobPollContext():
"""Context object for a job poll."""
CONTEXT_ATTRIBUTES = (
'job_log_dir', # cycle/task/submit_num
Expand Down Expand Up @@ -187,7 +187,7 @@ def get_summary_str(self):
return '%s|%s' % (self.job_log_dir, json.dumps(ret))


class BatchSysManager(object):
class BatchSysManager():
"""Job submission, poll and kill.
Manage the importing of job submission method modules.
Expand All @@ -198,6 +198,7 @@ class BatchSysManager(object):
CYLC_BATCH_SYS_JOB_ID = "CYLC_BATCH_SYS_JOB_ID"
CYLC_BATCH_SYS_JOB_SUBMIT_TIME = "CYLC_BATCH_SYS_JOB_SUBMIT_TIME"
CYLC_BATCH_SYS_EXIT_POLLED = "CYLC_BATCH_SYS_EXIT_POLLED"
FAIL_SIGNALS = ("EXIT", "ERR", "TERM", "XCPU")
LINE_PREFIX_BATCH_SYS_NAME = "# Job submit method: "
LINE_PREFIX_BATCH_SUBMIT_CMD_TMPL = "# Job submit command template: "
LINE_PREFIX_EXECUTION_TIME_LIMIT = "# Execution time limit: "
Expand Down Expand Up @@ -243,9 +244,7 @@ def format_directives(self, job_conf):
def get_fail_signals(self, job_conf):
"""Return a list of failure signal names to trap in the job file."""
batch_sys = self._get_sys(job_conf['batch_system_name'])
if hasattr(batch_sys, "get_fail_signals"):
return batch_sys.get_fail_signals(job_conf)
return ["EXIT", "ERR", "TERM", "XCPU"]
return getattr(batch_sys, "FAIL_SIGNALS", self.FAIL_SIGNALS)

def get_vacation_signal(self, job_conf):
"""Return the vacation signal name for a job file."""
Expand Down
75 changes: 75 additions & 0 deletions cylc/flow/tests/batch_sys_handlers/test_lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import pytest

from cylc.flow.batch_sys_handlers.lsf import BATCH_SYS_HANDLER


@pytest.mark.parametrize(
'job_conf,lines',
[
( # basic
{
'batch_system_conf': {},
'directives': {},
'execution_time_limit': 180,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
'suite_name': 'chop',
'task_id': 'axe.1',
},
[
'#BSUB -J axe.1.chop',
'#BSUB -o cylc-run/chop/log/job/1/axe/01/job.out',
'#BSUB -e cylc-run/chop/log/job/1/axe/01/job.err',
'#BSUB -W 3',
],
),
( # some useful directives
{
'batch_system_conf': {},
'directives': {
'-q': 'forever',
'-B': '',
'-ar': '',
},
'execution_time_limit': 200,
'job_file_path': '$HOME/cylc-run/chop/log/job/1/axe/01/job',
'suite_name': 'chop',
'task_id': 'axe.1',
},
[
'#BSUB -J axe.1.chop',
'#BSUB -o cylc-run/chop/log/job/1/axe/01/job.out',
'#BSUB -e cylc-run/chop/log/job/1/axe/01/job.err',
'#BSUB -W 4',
'#BSUB -q forever',
'#BSUB -B',
'#BSUB -ar',
],
),
],
)
def test_format_directives(job_conf: dict, lines: list):
assert BATCH_SYS_HANDLER.format_directives(job_conf) == lines


def test_get_submit_stdin():
outs = BATCH_SYS_HANDLER.get_submit_stdin(__file__, None)
assert outs[0].name == __file__
assert outs[1] is None
Loading

0 comments on commit 287ead8

Please sign in to comment.