diff --git a/CHANGES.md b/CHANGES.md index e582a0ff647..0c6e11997c1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -52,6 +52,10 @@ with warning, for scan errors where workflow is stopped. [#5199](https://github.com/cylc/cylc-flow/pull/5199) - Fix a problem with the consolidation tutorial. +[#5195](https://github.com/cylc/cylc-flow/pull/5195) - +Fix issue where workflows can fail to shutdown due to unavailable remote +platforms and make job log retrieval more robust. + ------------------------------------------------------------------------------- ## __cylc-8.0.3 (Released 2022-10-17)__ diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index f36bb8be55d..5418c47e2cf 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -27,7 +27,11 @@ from cylc.flow import LOG from cylc.flow.cfgspec.glbl_cfg import glbl_cfg -from cylc.flow.exceptions import GlobalConfigError, HostSelectException +from cylc.flow.exceptions import ( + GlobalConfigError, + HostSelectException, + NoHostsError, +) from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host from cylc.flow.remote import run_cmd, cylc_server_cmd from cylc.flow.terminal import parse_dirty_json @@ -553,7 +557,11 @@ def _get_metrics(hosts, metrics, data=None): } for host in hosts: if is_remote_host(host): - proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs) + try: + proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs) + except NoHostsError: + LOG.warning(f'Could not contact {host}') + continue else: proc_map[host] = run_cmd(['cylc'] + cmd, **kwargs) diff --git a/cylc/flow/network/ssh_client.py b/cylc/flow/network/ssh_client.py index 58f560b258d..abfddc75ca6 100644 --- a/cylc/flow/network/ssh_client.py +++ b/cylc/flow/network/ssh_client.py @@ -67,6 +67,8 @@ async def async_request(self, command, args=None, timeout=None): 'cylc path': cylc_path, 'use login shell': login_sh, } + # NOTE: this can not raise NoHostsError + # because we have provided the host proc = remote_cylc_cmd( cmd, platform, diff --git a/cylc/flow/remote.py b/cylc/flow/remote.py index 5a4dda77817..ec839a8f2e8 100644 --- a/cylc/flow/remote.py +++ b/cylc/flow/remote.py @@ -199,6 +199,10 @@ def construct_rsync_over_ssh_cmd( platform: contains info relating to platform rsync_includes: files and directories to be included in the rsync + Raises: + NoHostsError: + If there are no hosts available for the requested platform. + Developer Warning: The Cylc Subprocess Pool method ``rsync_255_fail`` relies on ``rsync_cmd[0] == 'rsync'``. Please check that changes to this function @@ -369,6 +373,10 @@ def remote_cylc_cmd( Uses the provided platform configuration to construct the command. For arguments and returns see construct_ssh_cmd and run_cmd. + + Raises: + NoHostsError: If the platform is not contactable. + """ if not host: # no host selected => perform host selection from platform config @@ -405,6 +413,10 @@ def cylc_server_cmd(cmd, host=None, **kwargs): with the localhost platform. For arguments and returns see construct_ssh_cmd and run_cmd. + + Raises: + NoHostsError: If the platform is not contactable. + """ return remote_cylc_cmd( cmd, diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 58d15d00090..6155aa1f714 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -393,6 +393,8 @@ def _distribute(host, workflow_id_raw, workflow_id): cmd.append("--host=localhost") # Re-invoke the command + # NOTE: has the potential to raise NoHostsError, however, this will + # most likely have been raised during host-selection cylc_server_cmd(cmd, host=host) sys.exit(0) diff --git a/cylc/flow/scripts/cat_log.py b/cylc/flow/scripts/cat_log.py index 1a1c9bd05ab..ee8ade943b5 100755 --- a/cylc/flow/scripts/cat_log.py +++ b/cylc/flow/scripts/cat_log.py @@ -405,6 +405,8 @@ def main( # TODO: Add Intelligent Host selection to this with suppress(KeyboardInterrupt): # (Ctrl-C while tailing) + # NOTE: This will raise NoHostsError if the platform is not + # contactable remote_cylc_cmd( cmd, platform, diff --git a/cylc/flow/scripts/check_versions.py b/cylc/flow/scripts/check_versions.py index 7312b5303b3..2aefca4be3e 100755 --- a/cylc/flow/scripts/check_versions.py +++ b/cylc/flow/scripts/check_versions.py @@ -44,6 +44,7 @@ from cylc.flow.cylc_subproc import procopen, PIPE, DEVNULL from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.config import WorkflowConfig +from cylc.flow.exceptions import NoHostsError from cylc.flow.id_cli import parse_id from cylc.flow.platforms import get_platform, get_host_from_platform from cylc.flow.remote import construct_ssh_cmd @@ -101,15 +102,26 @@ def main(_, options: 'Values', *ids) -> None: sys.exit(0) verbose = cylc.flow.flags.verbosity > 0 + versions = check_versions(platforms, verbose) + report_results(platforms, versions, options.error) + +def check_versions(platforms, verbose): # get the cylc version on each platform versions = {} for platform_name in sorted(platforms): platform = get_platform(platform_name) - host = get_host_from_platform( - platform, - bad_hosts=None - ) + try: + host = get_host_from_platform( + platform, + bad_hosts=None + ) + except NoHostsError: + print( + f'Could not connect to {platform["name"]}', + file=sys.stderr + ) + continue cmd = construct_ssh_cmd( ['version'], platform, @@ -127,7 +139,10 @@ def main(_, options: 'Values', *ids) -> None: versions[platform_name] = out.strip() else: versions[platform_name] = f'ERROR: {err.strip()}' + return versions + +def report_results(platforms, versions, exit_error): # report results max_len = max((len(platform_name) for platform_name in platforms)) print(f'{"platform".rjust(max_len)}: cylc version') @@ -136,7 +151,7 @@ def main(_, options: 'Values', *ids) -> None: print(f'{platform_name.rjust(max_len)}: {result}') if all((version == CYLC_VERSION for version in versions.values())): ret_code = 0 - elif options.error: + elif exit_error: ret_code = 1 else: ret_code = 0 diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index f22b7712a77..00d1d9d17a0 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -39,6 +39,7 @@ from cylc.flow import LOG, LOG_LEVELS from cylc.flow.cfgspec.glbl_cfg import glbl_cfg +from cylc.flow.exceptions import NoHostsError from cylc.flow.hostuserutil import get_host, get_user, is_remote_platform from cylc.flow.pathutil import ( get_remote_workflow_run_job_dir, @@ -935,8 +936,29 @@ def _get_events_conf(self, itask, key, default=None): def _process_job_logs_retrieval(self, schd, ctx, id_keys): """Process retrieval of task job logs from remote user@host.""" + # get a host to run retrieval on platform = get_platform(ctx.platform_name) - host = get_host_from_platform(platform, bad_hosts=self.bad_hosts) + try: + host = get_host_from_platform(platform, bad_hosts=self.bad_hosts) + except NoHostsError: + # All of the platforms hosts have been found to be uncontactable. + # Reset the bad hosts to allow retrieval retry to take place. + self.bad_hosts -= set(platform['hosts']) + try: + # Get a new host and try again. + host = get_host_from_platform( + platform, + bad_hosts=self.bad_hosts + ) + except NoHostsError: + # We really can't get a host to try on e.g. no hosts + # configured (shouldn't happen). Nothing more we can do here, + # move onto the next submission retry. + for id_key in id_keys: + self.unset_waiting_event_timer(id_key) + return + + # construct the retrieval command ssh_str = str(platform["ssh command"]) rsync_str = str(platform["retrieve job logs command"]) cmd = shlex.split(rsync_str) + ["--rsh=" + ssh_str] @@ -962,6 +984,8 @@ def _process_job_logs_retrieval(self, schd, ctx, id_keys): ) # Local target cmd.append(get_workflow_run_job_dir(schd.workflow) + "/") + + # schedule command self.proc_pool.put_command( SubProcContext( ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 1188e3cab01..52e941b5419 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -522,9 +522,6 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, cmd, [len(b) for b in itasks_batches]) if remote_mode: - host = get_host_from_platform( - platform, bad_hosts=self.task_remote_mgr.bad_hosts - ) cmd = construct_ssh_cmd( cmd, platform, host ) @@ -944,13 +941,23 @@ def _run_job_cmd( cmd.append(get_remote_workflow_run_job_dir(workflow)) job_log_dirs = [] host = 'localhost' + + ctx = SubProcContext(cmd_key, cmd, host=host) if remote_mode: - host = get_host_from_platform( - platform, bad_hosts=self.task_remote_mgr.bad_hosts - ) - cmd = construct_ssh_cmd( - cmd, platform, host - ) + try: + host = get_host_from_platform( + platform, bad_hosts=self.task_remote_mgr.bad_hosts + ) + cmd = construct_ssh_cmd( + cmd, platform, host + ) + except NoHostsError: + ctx.err = f'No available hosts for {platform["name"]}' + callback_255(ctx, workflow, itasks) + continue + else: + ctx = SubProcContext(cmd_key, cmd, host=host) + for itask in sorted(itasks, key=lambda task: task.identity): job_log_dirs.append( itask.tokens.duplicate( @@ -960,9 +967,7 @@ def _run_job_cmd( cmd += job_log_dirs LOG.debug(f'{cmd_key} for {platform["name"]} on {host}') self.proc_pool.put_command( - SubProcContext( - cmd_key, cmd, host=host - ), + ctx, bad_hosts=self.task_remote_mgr.bad_hosts, callback=callback, callback_args=[workflow, itasks], diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 5fc8ab11768..e2f53173ad2 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -245,6 +245,7 @@ def remote_init( ) self.remote_init_map[ platform['install target']] = REMOTE_INIT_FAILED + # reset the bad hosts to allow remote-init to retry self.bad_hosts -= set(platform['hosts']) self.ready = True else: @@ -266,6 +267,24 @@ def remote_init( callback_255_args=[platform] ) + def construct_remote_tidy_ssh_cmd( + self, platform: Dict[str, Any] + ) -> Tuple[List[str], str]: + """Return a remote-tidy SSH command. + + Rasies: + NoHostsError: If the platform is not contactable. + """ + cmd = ['remote-tidy'] + cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity)) + cmd.append(get_install_target_from_platform(platform)) + cmd.append(get_remote_workflow_run_dir(self.workflow)) + host = get_host_from_platform( + platform, bad_hosts=self.bad_hosts + ) + cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s') + return cmd, host + def remote_tidy(self) -> None: """Remove workflow contact files and keys from initialised remotes. @@ -274,20 +293,6 @@ def remote_tidy(self) -> None: Timeout any incomplete commands after 10 seconds. """ # Issue all SSH commands in parallel - - def construct_remote_tidy_ssh_cmd( - platform: Dict[str, Any] - ) -> Tuple[List[str], str]: - cmd = ['remote-tidy'] - cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity)) - cmd.append(get_install_target_from_platform(platform)) - cmd.append(get_remote_workflow_run_dir(self.workflow)) - host = get_host_from_platform( - platform, bad_hosts=self.bad_hosts - ) - cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s') - return cmd, host - queue: Deque[RemoteTidyQueueTuple] = deque() for install_target, message in self.remote_init_map.items(): if message != REMOTE_FILE_INSTALL_DONE: @@ -298,7 +303,7 @@ def construct_remote_tidy_ssh_cmd( platform = get_random_platform_for_install_target( install_target ) - cmd, host = construct_remote_tidy_ssh_cmd(platform) + cmd, host = self.construct_remote_tidy_ssh_cmd(platform) except (NoHostsError, PlatformLookupError) as exc: LOG.warning( PlatformError( @@ -332,7 +337,7 @@ def construct_remote_tidy_ssh_cmd( timeout = time() + 10.0 self.bad_hosts.add(item.host) try: - retry_cmd, retry_host = construct_remote_tidy_ssh_cmd( + retry_cmd, retry_host = self.construct_remote_tidy_ssh_cmd( item.platform ) except (NoHostsError, PlatformLookupError) as exc: diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index 58e68c8b9ee..cbb20450ee5 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -1077,6 +1077,10 @@ def _remote_clean_cmd( platform: Config for the platform on which to remove the workflow. rm_dirs: Sub dirs to remove instead of the whole run dir. timeout: Number of seconds to wait before cancelling the command. + + Raises: + NoHostsError: If the platform is not contactable. + """ LOG.debug( f"Cleaning {reg} on install target: {platform['install target']} " diff --git a/tests/functional/events/51-task-event-job-logs-retrieve-4.t b/tests/functional/events/51-task-event-job-logs-retrieve-4.t new file mode 100644 index 00000000000..1e09a92210f --- /dev/null +++ b/tests/functional/events/51-task-event-job-logs-retrieve-4.t @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +export REQUIRE_PLATFORM="loc:remote comms:tcp" +. "$(dirname "$0")/test_header" +set_test_number 3 +#------------------------------------------------------------------------------- +# It should retry job log retrieval even if all hosts are not contactable. +#------------------------------------------------------------------------------- + +init_workflow "${TEST_NAME_BASE}-1" <<__FLOW_CONFIG__ +[scheduling] + [[graph]] + R1 = """ + remote + """ + +[runtime] + [[remote]] + # script = sleep 1 + platform = ${CYLC_TEST_PLATFORM} +__FLOW_CONFIG__ + +# configure job retries on the test platform +create_test_global_config '' " +[platforms] + [[${CYLC_TEST_PLATFORM}]] + retrieve job logs = True + retrieve job logs retry delays = 3*PT1S + retrieve job logs command = fido +" + +# * redirect retrieval attempts to a file where we can inspect them later +# * make it look like retrieval failed due to network issues (255 ret code) +JOB_LOG_RETR_CMD="${WORKFLOW_RUN_DIR}/bin/fido" +RETRIEVAL_ATTEMPT_LOG="${WORKFLOW_RUN_DIR}/retrieval-attempt-log" +mkdir "${WORKFLOW_RUN_DIR}/bin" +cat > "${WORKFLOW_RUN_DIR}/bin/fido" <<__HERE__ +#!/usr/bin/env bash +echo "$@" >> "${RETRIEVAL_ATTEMPT_LOG}" +exit 255 +__HERE__ +chmod +x "${JOB_LOG_RETR_CMD}" + +workflow_run_ok "${TEST_NAME_BASE}-play" \ + cylc play --debug --no-detach "${WORKFLOW_NAME}" + +# it should try retrieval three times +# Note: it should reset bad_hosts to allow retries to happen +TEST_NAME="${TEST_NAME_BASE}-retrieve-attempts" +# shellcheck disable=SC2002 +# (cat'ting into pipe to avoid having to sed out the filename) +if [[ $(cat "${RETRIEVAL_ATTEMPT_LOG}" | wc -l) -eq 3 ]]; then + ok "${TEST_NAME}" +else + fail "${TEST_NAME}" +fi + +# then fail once the retries have been exhausted +grep_workflow_log_ok "${TEST_NAME_BASE}-retrieve-fail" \ + 'job-logs-retrieve for task event:succeeded failed' + +purge diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py new file mode 100644 index 00000000000..a8413caca44 --- /dev/null +++ b/tests/integration/test_task_job_mgr.py @@ -0,0 +1,106 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging + +from cylc.flow import CYLC_LOG +from cylc.flow.task_state import TASK_STATUS_RUNNING + + +async def test_run_job_cmd_no_hosts_error( + flow, + scheduler, + start, + mock_glbl_cfg, + log_filter, +): + """It should catch NoHostsError. + + NoHostsError's should be caught and handled rather than raised because + they will cause problems (i.e. trigger shutdown) if they make it to the + Scheduler. + + NoHostError's can occur in the poll & kill logic, this test ensures that + these methods catch the NoHostsError and handle the event as a regular + SSH failure by pushing the issue down the 255 callback. + + See https://github.com/cylc/cylc-flow/pull/5195 + """ + # define a platform + mock_glbl_cfg( + 'cylc.flow.platforms.glbl_cfg', + ''' + [platforms] + [[no-host-platform]] + ''', + ) + + # define a workflow with a task which runs on that platform + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'foo' + } + }, + 'runtime': { + 'foo': { + 'platform': 'no-host-platform' + } + } + }) + + # start the workflow + schd = scheduler(id_) + async with start(schd) as log: + # set logging to debug level + log.set_level(logging.DEBUG, CYLC_LOG) + + # tell Cylc the task is running on that platform + schd.pool.get_tasks()[0].state_reset(TASK_STATUS_RUNNING) + schd.pool.get_tasks()[0].platform = { + 'name': 'no-host-platform', + 'hosts': ['no-host-platform'], + } + + # tell Cylc that that platform is not contactable + # (i.e. all hosts are in bad_hosts) + # (this casuses the NoHostsError to be raised) + schd.task_job_mgr.bad_hosts.add('no-host-platform') + + # polling the task should not result in an error... + schd.task_job_mgr.poll_task_jobs( + schd.workflow, + schd.pool.get_tasks() + ) + + # ...but the failure should be logged + assert log_filter( + log, + contains='No available hosts for no-host-platform', + ) + log.clear() + + # killing the task should not result in an error... + schd.task_job_mgr.kill_task_jobs( + schd.workflow, + schd.pool.get_tasks() + ) + + # ...but the failure should be logged + assert log_filter( + log, + contains='No available hosts for no-host-platform', + ) diff --git a/tests/unit/scripts/test_check_versions.py b/tests/unit/scripts/test_check_versions.py new file mode 100644 index 00000000000..3c52c2d33c4 --- /dev/null +++ b/tests/unit/scripts/test_check_versions.py @@ -0,0 +1,50 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest + +from cylc.flow.exceptions import NoHostsError +from cylc.flow.scripts.check_versions import check_versions + + +@pytest.fixture +def break_host_selection(monkeypatch): + """Make host selection for any platform fail with NoHostsError.""" + def _get_host_from_platform(platform, *args, **kwargs): + raise NoHostsError(platform) + + monkeypatch.setattr( + 'cylc.flow.scripts.check_versions.get_host_from_platform', + _get_host_from_platform, + ) + + def _get_platform(platform_name, *args, **kwargs): + return {'name': platform_name} + + monkeypatch.setattr( + 'cylc.flow.scripts.check_versions.get_platform', + _get_platform, + ) + + +def test_no_hosts_error(break_host_selection, capsys): + """It should handle NoHostsError events.""" + versions = check_versions(['buggered'], True) + # the broken platform should be skipped (so no returned versions) + assert not versions + # a warning should have been logged to stderr + out, err = capsys.readouterr() + assert 'Could not connect to buggered' in err diff --git a/tests/unit/test_host_select.py b/tests/unit/test_host_select.py index 8587927983f..02e31b5d56b 100644 --- a/tests/unit/test_host_select.py +++ b/tests/unit/test_host_select.py @@ -20,14 +20,17 @@ the host_select module. """ +import logging import socket import pytest +from cylc.flow import CYLC_LOG from cylc.flow.exceptions import HostSelectException from cylc.flow.host_select import ( + _get_metrics, select_host, - select_workflow_host + select_workflow_host, ) from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.parsec.exceptions import ListValueError @@ -200,3 +203,18 @@ def test_condemned_host_ambiguous(mock_glbl_cfg): ''' ) assert 'ambiguous host' in excinfo.value.msg + + +def test_get_metrics_no_hosts_error(caplog): + """It should handle SSH errors. + + If a host is not contactable then it should be shipped. + """ + caplog.set_level(logging.WARN, CYLC_LOG) + host_stats, data = _get_metrics(['not-a-host'], None) + # a warning should be logged + assert len(caplog.records) == 1 + # no data for the host should be returned + assert not host_stats + # the return code should be recorded + assert data == {'not-a-host': {'returncode': 255}}