diff --git a/.mailmap b/.mailmap index 71a84fdefc6..7bbf47d2375 100644 --- a/.mailmap +++ b/.mailmap @@ -57,3 +57,4 @@ Utheri Wagura <36386988+uwagura@users.noreply.github.com> github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> github-actions[bot] GitHub Action Diquan Jabbour <165976689+Diquan-BOM@users.noreply.github.com> +Maxime Rio diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a4e77fd82a7..fb0b362e4a5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -96,6 +96,7 @@ requests_). - Diquan Jabbour - Shixian Sheng - Utheri Wagura + - Maxime Rio (All contributors are identifiable with email addresses in the git version diff --git a/changes.d/6081.fix.md b/changes.d/6081.fix.md new file mode 100644 index 00000000000..714321a9311 --- /dev/null +++ b/changes.d/6081.fix.md @@ -0,0 +1,2 @@ +Fix job submission when a batch of jobs is submitted to a runner that does +not return a newline with the job ID (did not affect built-in job runners). diff --git a/cylc/flow/job_runner_handlers/background.py b/cylc/flow/job_runner_handlers/background.py index 2866a839dfb..729b9b74d28 100644 --- a/cylc/flow/job_runner_handlers/background.py +++ b/cylc/flow/job_runner_handlers/background.py @@ -89,7 +89,7 @@ def submit(cls, job_file_path, submit_opts): exc.filename = "nohup" return (1, None, str(exc)) else: - return (0, "%d\n" % (proc.pid), None) + return (0, str(proc.pid), None) JOB_RUNNER_HANDLER = BgCommandHandler() diff --git a/cylc/flow/job_runner_handlers/documentation.py b/cylc/flow/job_runner_handlers/documentation.py index 3b546ab4209..415f031684e 100644 --- a/cylc/flow/job_runner_handlers/documentation.py +++ b/cylc/flow/job_runner_handlers/documentation.py @@ -424,8 +424,7 @@ def submit( ret_code: Subprocess return code. out: - Subprocess standard output, note this should be newline - terminated. + Subprocess standard output. err: Subprocess standard error. diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index f23c6c70104..b8ddaaa0538 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -210,12 +210,10 @@ def jobs_kill(self, job_log_root, job_log_dirs): self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code)) # Note: Print STDERR to STDOUT may look a bit strange, but it # requires less logic for the workflow to parse the output. - if err.strip(): - for line in err.splitlines(True): - if not line.endswith("\n"): - line += "\n" - sys.stdout.write("%s%s|%s|%s" % ( - self.OUT_PREFIX_CMD_ERR, now, job_log_dir, line)) + for line in err.strip().splitlines(): + sys.stdout.write( + f"{self.OUT_PREFIX_CMD_ERR}{now}|{job_log_dir}|{line}\n" + ) def jobs_poll(self, job_log_root, job_log_dirs): """Poll multiple jobs. @@ -303,13 +301,13 @@ def jobs_submit(self, job_log_root, job_log_dirs, remote_mode=False, sys.stdout.write("%s%s|%s|%d|%s\n" % ( self.OUT_PREFIX_SUMMARY, now, job_log_dir, ret_code, job_id)) for key, value in [("STDERR", err), ("STDOUT", out)]: - if value is None or not value.strip(): + if value is None: continue - for line in value.splitlines(True): - if not value.endswith("\n"): - value += "\n" - sys.stdout.write("%s%s|%s|[%s] %s" % ( - self.OUT_PREFIX_COMMAND, now, job_log_dir, key, line)) + for line in value.strip().splitlines(): + sys.stdout.write( + f"{self.OUT_PREFIX_COMMAND}{now}" + f"|{job_log_dir}|[{key}] {line}\n" + ) def job_kill(self, st_file_path): """Ask job runner to terminate the job specified in "st_file_path". diff --git a/tests/integration/test_job_runner_mgr.py b/tests/integration/test_job_runner_mgr.py new file mode 100644 index 00000000000..93663aec892 --- /dev/null +++ b/tests/integration/test_job_runner_mgr.py @@ -0,0 +1,85 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import errno +import logging +from pathlib import Path +import re +from textwrap import dedent + +from cylc.flow.job_runner_mgr import JobRunnerManager +from cylc.flow.pathutil import get_workflow_run_job_dir +from cylc.flow.task_state import TASK_STATUS_RUNNING +from cylc.flow.subprocctx import SubProcContext + + +async def test_kill_error(one, start, test_dir, capsys, log_filter): + """It should report the failure to kill a job.""" + async with start(one) as log: + # make it look like the task is running + itask = one.pool.get_tasks()[0] + itask.submit_num += 1 + itask.state_reset(TASK_STATUS_RUNNING) + + # fake job details + workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow)) + job_id = itask.tokens.duplicate(job='01').relative_id + job_log_dir = Path(workflow_job_log_dir, job_id) + + # create job status file (give it a fake pid) + job_log_dir.mkdir(parents=True) + (job_log_dir / 'job.status').write_text(dedent(''' + CYLC_JOB_RUNNER_NAME=background + CYLC_JOB_ID=99999999 + CYLC_JOB_PID=99999999 + ''')) + + # attempt to kill the job using the jobs-kill script + # (note this is normally run via a subprocess) + capsys.readouterr() + JobRunnerManager().jobs_kill(str(workflow_job_log_dir), [job_id]) + + # the kill should fail, the failure should be written to stdout + # (the jobs-kill callback will read this in and handle it) + out, err = capsys.readouterr() + assert re.search( + # # NOTE: ESRCH = no such process + rf'TASK JOB ERROR.*{job_id}.*Errno {errno.ESRCH}', + out, + ) + + # feed this jobs-kill output into the scheduler + # (as if we had run the jobs-kill script as a subprocess) + one.task_job_mgr._kill_task_jobs_callback( + # mock the subprocess + SubProcContext( + one.task_job_mgr.JOBS_KILL, + ['mock-cmd'], + # provide it with the out/err the script produced + out=out, + err=err, + ), + one.workflow, + [itask], + ) + + # a warning should be logged + assert log_filter( + log, + regex=r'1/one/01:running.*job kill failed', + level=logging.WARNING, + ) + assert itask.state(TASK_STATUS_RUNNING)