Skip to content

Commit

Permalink
Merge pull request cylc#5195 from oliver-sanders/no-hosts-error
Browse files Browse the repository at this point in the history
ihs: catch NoHostsError
  • Loading branch information
hjoliver authored Dec 7, 2022
2 parents ae21c76 + 818887a commit d8d4946
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 37 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ with warning, for scan errors where workflow is stopped.
[#5199](https://github.com/cylc/cylc-flow/pull/5199) - Fix a problem with
the consolidation tutorial.

[#5195](https://github.com/cylc/cylc-flow/pull/5195) -
Fix issue where workflows can fail to shutdown due to unavailable remote
platforms and make job log retrieval more robust.

-------------------------------------------------------------------------------
## __cylc-8.0.3 (<span actions:bind='release-date'>Released 2022-10-17</span>)__

Expand Down
12 changes: 10 additions & 2 deletions cylc/flow/host_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import GlobalConfigError, HostSelectException
from cylc.flow.exceptions import (
GlobalConfigError,
HostSelectException,
NoHostsError,
)
from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host
from cylc.flow.remote import run_cmd, cylc_server_cmd
from cylc.flow.terminal import parse_dirty_json
Expand Down Expand Up @@ -553,7 +557,11 @@ def _get_metrics(hosts, metrics, data=None):
}
for host in hosts:
if is_remote_host(host):
proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs)
try:
proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs)
except NoHostsError:
LOG.warning(f'Could not contact {host}')
continue
else:
proc_map[host] = run_cmd(['cylc'] + cmd, **kwargs)

Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/network/ssh_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ async def async_request(self, command, args=None, timeout=None):
'cylc path': cylc_path,
'use login shell': login_sh,
}
# NOTE: this can not raise NoHostsError
# because we have provided the host
proc = remote_cylc_cmd(
cmd,
platform,
Expand Down
12 changes: 12 additions & 0 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def construct_rsync_over_ssh_cmd(
platform: contains info relating to platform
rsync_includes: files and directories to be included in the rsync
Raises:
NoHostsError:
If there are no hosts available for the requested platform.
Developer Warning:
The Cylc Subprocess Pool method ``rsync_255_fail`` relies on
``rsync_cmd[0] == 'rsync'``. Please check that changes to this function
Expand Down Expand Up @@ -369,6 +373,10 @@ def remote_cylc_cmd(
Uses the provided platform configuration to construct the command.
For arguments and returns see construct_ssh_cmd and run_cmd.
Raises:
NoHostsError: If the platform is not contactable.
"""
if not host:
# no host selected => perform host selection from platform config
Expand Down Expand Up @@ -405,6 +413,10 @@ def cylc_server_cmd(cmd, host=None, **kwargs):
with the localhost platform.
For arguments and returns see construct_ssh_cmd and run_cmd.
Raises:
NoHostsError: If the platform is not contactable.
"""
return remote_cylc_cmd(
cmd,
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ def _distribute(host, workflow_id_raw, workflow_id):
cmd.append("--host=localhost")

# Re-invoke the command
# NOTE: has the potential to raise NoHostsError, however, this will
# most likely have been raised during host-selection
cylc_server_cmd(cmd, host=host)
sys.exit(0)

Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scripts/cat_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ def main(
# TODO: Add Intelligent Host selection to this
with suppress(KeyboardInterrupt):
# (Ctrl-C while tailing)
# NOTE: This will raise NoHostsError if the platform is not
# contactable
remote_cylc_cmd(
cmd,
platform,
Expand Down
25 changes: 20 additions & 5 deletions cylc/flow/scripts/check_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from cylc.flow.cylc_subproc import procopen, PIPE, DEVNULL
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.config import WorkflowConfig
from cylc.flow.exceptions import NoHostsError
from cylc.flow.id_cli import parse_id
from cylc.flow.platforms import get_platform, get_host_from_platform
from cylc.flow.remote import construct_ssh_cmd
Expand Down Expand Up @@ -101,15 +102,26 @@ def main(_, options: 'Values', *ids) -> None:
sys.exit(0)

verbose = cylc.flow.flags.verbosity > 0
versions = check_versions(platforms, verbose)
report_results(platforms, versions, options.error)


def check_versions(platforms, verbose):
# get the cylc version on each platform
versions = {}
for platform_name in sorted(platforms):
platform = get_platform(platform_name)
host = get_host_from_platform(
platform,
bad_hosts=None
)
try:
host = get_host_from_platform(
platform,
bad_hosts=None
)
except NoHostsError:
print(
f'Could not connect to {platform["name"]}',
file=sys.stderr
)
continue
cmd = construct_ssh_cmd(
['version'],
platform,
Expand All @@ -127,7 +139,10 @@ def main(_, options: 'Values', *ids) -> None:
versions[platform_name] = out.strip()
else:
versions[platform_name] = f'ERROR: {err.strip()}'
return versions


def report_results(platforms, versions, exit_error):
# report results
max_len = max((len(platform_name) for platform_name in platforms))
print(f'{"platform".rjust(max_len)}: cylc version')
Expand All @@ -136,7 +151,7 @@ def main(_, options: 'Values', *ids) -> None:
print(f'{platform_name.rjust(max_len)}: {result}')
if all((version == CYLC_VERSION for version in versions.values())):
ret_code = 0
elif options.error:
elif exit_error:
ret_code = 1
else:
ret_code = 0
Expand Down
26 changes: 25 additions & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from cylc.flow import LOG, LOG_LEVELS
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import NoHostsError
from cylc.flow.hostuserutil import get_host, get_user, is_remote_platform
from cylc.flow.pathutil import (
get_remote_workflow_run_job_dir,
Expand Down Expand Up @@ -935,8 +936,29 @@ def _get_events_conf(self, itask, key, default=None):

def _process_job_logs_retrieval(self, schd, ctx, id_keys):
"""Process retrieval of task job logs from remote user@host."""
# get a host to run retrieval on
platform = get_platform(ctx.platform_name)
host = get_host_from_platform(platform, bad_hosts=self.bad_hosts)
try:
host = get_host_from_platform(platform, bad_hosts=self.bad_hosts)
except NoHostsError:
# All of the platforms hosts have been found to be uncontactable.
# Reset the bad hosts to allow retrieval retry to take place.
self.bad_hosts -= set(platform['hosts'])
try:
# Get a new host and try again.
host = get_host_from_platform(
platform,
bad_hosts=self.bad_hosts
)
except NoHostsError:
# We really can't get a host to try on e.g. no hosts
# configured (shouldn't happen). Nothing more we can do here,
# move onto the next submission retry.
for id_key in id_keys:
self.unset_waiting_event_timer(id_key)
return

# construct the retrieval command
ssh_str = str(platform["ssh command"])
rsync_str = str(platform["retrieve job logs command"])
cmd = shlex.split(rsync_str) + ["--rsh=" + ssh_str]
Expand All @@ -962,6 +984,8 @@ def _process_job_logs_retrieval(self, schd, ctx, id_keys):
)
# Local target
cmd.append(get_workflow_run_job_dir(schd.workflow) + "/")

# schedule command
self.proc_pool.put_command(
SubProcContext(
ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host
Expand Down
29 changes: 17 additions & 12 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,6 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
cmd, [len(b) for b in itasks_batches])

if remote_mode:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
Expand Down Expand Up @@ -944,13 +941,23 @@ def _run_job_cmd(
cmd.append(get_remote_workflow_run_job_dir(workflow))
job_log_dirs = []
host = 'localhost'

ctx = SubProcContext(cmd_key, cmd, host=host)
if remote_mode:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
try:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
except NoHostsError:
ctx.err = f'No available hosts for {platform["name"]}'
callback_255(ctx, workflow, itasks)
continue
else:
ctx = SubProcContext(cmd_key, cmd, host=host)

for itask in sorted(itasks, key=lambda task: task.identity):
job_log_dirs.append(
itask.tokens.duplicate(
Expand All @@ -960,9 +967,7 @@ def _run_job_cmd(
cmd += job_log_dirs
LOG.debug(f'{cmd_key} for {platform["name"]} on {host}')
self.proc_pool.put_command(
SubProcContext(
cmd_key, cmd, host=host
),
ctx,
bad_hosts=self.task_remote_mgr.bad_hosts,
callback=callback,
callback_args=[workflow, itasks],
Expand Down
37 changes: 21 additions & 16 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def remote_init(
)
self.remote_init_map[
platform['install target']] = REMOTE_INIT_FAILED
# reset the bad hosts to allow remote-init to retry
self.bad_hosts -= set(platform['hosts'])
self.ready = True
else:
Expand All @@ -266,6 +267,24 @@ def remote_init(
callback_255_args=[platform]
)

def construct_remote_tidy_ssh_cmd(
self, platform: Dict[str, Any]
) -> Tuple[List[str], str]:
"""Return a remote-tidy SSH command.
Rasies:
NoHostsError: If the platform is not contactable.
"""
cmd = ['remote-tidy']
cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity))
cmd.append(get_install_target_from_platform(platform))
cmd.append(get_remote_workflow_run_dir(self.workflow))
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

def remote_tidy(self) -> None:
"""Remove workflow contact files and keys from initialised remotes.
Expand All @@ -274,20 +293,6 @@ def remote_tidy(self) -> None:
Timeout any incomplete commands after 10 seconds.
"""
# Issue all SSH commands in parallel

def construct_remote_tidy_ssh_cmd(
platform: Dict[str, Any]
) -> Tuple[List[str], str]:
cmd = ['remote-tidy']
cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity))
cmd.append(get_install_target_from_platform(platform))
cmd.append(get_remote_workflow_run_dir(self.workflow))
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
Expand All @@ -298,7 +303,7 @@ def construct_remote_tidy_ssh_cmd(
platform = get_random_platform_for_install_target(
install_target
)
cmd, host = construct_remote_tidy_ssh_cmd(platform)
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
PlatformError(
Expand Down Expand Up @@ -332,7 +337,7 @@ def construct_remote_tidy_ssh_cmd(
timeout = time() + 10.0
self.bad_hosts.add(item.host)
try:
retry_cmd, retry_host = construct_remote_tidy_ssh_cmd(
retry_cmd, retry_host = self.construct_remote_tidy_ssh_cmd(
item.platform
)
except (NoHostsError, PlatformLookupError) as exc:
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,10 @@ def _remote_clean_cmd(
platform: Config for the platform on which to remove the workflow.
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling the command.
Raises:
NoHostsError: If the platform is not contactable.
"""
LOG.debug(
f"Cleaning {reg} on install target: {platform['install target']} "
Expand Down
Loading

0 comments on commit d8d4946

Please sign in to comment.