From 9d56622d3ceb990ffd1f1ba295378bc34f6e9648 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Thu, 4 Jul 2024 19:15:05 +0100 Subject: [PATCH] Fix bug where a stalled paused workflow would have a status of running --- changes.d/6200.fix.md | 1 + cylc/flow/data_store_mgr.py | 9 ++-- cylc/flow/scheduler.py | 2 +- cylc/flow/workflow_status.py | 87 ++++++++++++------------------ tests/unit/test_workflow_status.py | 25 +++++---- 5 files changed, 56 insertions(+), 68 deletions(-) create mode 100644 changes.d/6200.fix.md diff --git a/changes.d/6200.fix.md b/changes.d/6200.fix.md new file mode 100644 index 00000000000..3b4cf8012cf --- /dev/null +++ b/changes.d/6200.fix.md @@ -0,0 +1 @@ +Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused \ No newline at end of file diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index c59fa7b6c62..0befc1b4dad 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -85,7 +85,10 @@ pdeepcopy, poverride ) -from cylc.flow.workflow_status import get_workflow_status +from cylc.flow.workflow_status import ( + get_workflow_status, + get_workflow_status_msg, +) from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( @@ -2174,8 +2177,8 @@ def update_workflow(self, reloaded=False): w_delta.latest_state_tasks[state].task_proxies[:] = tp_queue # Set status & msg if changed. - status, status_msg = map( - str, get_workflow_status(self.schd)) + status = get_workflow_status(self.schd).value + status_msg = get_workflow_status_msg(self.schd) if w_data.status != status or w_data.status_msg != status_msg: w_delta.status = status w_delta.status_msg = status_msg diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index ff593648e7b..b3bbbd23d7d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -2000,7 +2000,7 @@ def update_data_store(self): Call this method whenever the Scheduler's state has changed in a way that requires a data store update. - See cylc.flow.workflow_status.get_workflow_status() for a + See cylc.flow.workflow_status.get_workflow_status_msg() for a (non-exhaustive?) list of properties that if changed will require this update. diff --git a/cylc/flow/workflow_status.py b/cylc/flow/workflow_status.py index 02f42717ed3..6ed1a8f3071 100644 --- a/cylc/flow/workflow_status.py +++ b/cylc/flow/workflow_status.py @@ -16,7 +16,7 @@ """Workflow status constants.""" from enum import Enum -from typing import Tuple, TYPE_CHECKING +from typing import TYPE_CHECKING from cylc.flow.wallclock import get_time_string_from_unix_time as time2str @@ -143,62 +143,41 @@ class AutoRestartMode(Enum): """Workflow will stop immediately but *not* attempt to restart.""" -def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]: - """Return the status of the provided workflow. - - This should be a short, concise description of the workflow state. - - Args: - schd: The running workflow - - Returns: - tuple - (state, state_msg) - - state: - The WorkflowState. - state_msg: - Text describing the current state (may be an empty string). +def get_workflow_status(schd: 'Scheduler') -> WorkflowStatus: + """Return the status of the provided workflow.""" + if schd.stop_mode is not None: + return WorkflowStatus.STOPPING + if schd.is_paused or schd.reload_pending: + return WorkflowStatus.PAUSED + return WorkflowStatus.RUNNING - """ - status = WorkflowStatus.RUNNING - status_msg = '' +def get_workflow_status_msg(schd: 'Scheduler') -> str: + """Return a short, concise status message for the provided workflow.""" if schd.stop_mode is not None: - status = WorkflowStatus.STOPPING - status_msg = f'stopping: {schd.stop_mode.explain()}' - elif schd.reload_pending: - status = WorkflowStatus.PAUSED - status_msg = f'reloading: {schd.reload_pending}' - elif schd.is_stalled: - status_msg = 'stalled' - elif schd.is_paused: - status = WorkflowStatus.PAUSED - status_msg = 'paused' - elif schd.pool.hold_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_HOLD % - schd.pool.hold_point) - elif schd.pool.stop_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.pool.stop_point) - elif schd.stop_clock_time is not None: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - time2str(schd.stop_clock_time)) - elif schd.pool.stop_task_id: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.pool.stop_task_id) - elif schd.config and schd.config.final_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.config.final_point) - else: - # fallback - running indefinitely - status_msg = 'running' - - return (status.value, status_msg) + return f'stopping: {schd.stop_mode.explain()}' + if schd.reload_pending: + return f'reloading: {schd.reload_pending}' + if schd.is_stalled: + if schd.is_paused: + return 'stalled (paused)' + return 'stalled' + if schd.is_paused: + return 'paused' + if schd.pool.hold_point: + return WORKFLOW_STATUS_RUNNING_TO_HOLD % schd.pool.hold_point + if schd.pool.stop_point: + return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.pool.stop_point + if schd.stop_clock_time is not None: + return WORKFLOW_STATUS_RUNNING_TO_STOP % time2str( + schd.stop_clock_time + ) + if schd.pool.stop_task_id: + return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.pool.stop_task_id + if schd.config and schd.config.final_point: + return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.config.final_point + # fallback - running indefinitely + return 'running' class RunMode: diff --git a/tests/unit/test_workflow_status.py b/tests/unit/test_workflow_status.py index af88de3daab..0dc074a2f4b 100644 --- a/tests/unit/test_workflow_status.py +++ b/tests/unit/test_workflow_status.py @@ -17,6 +17,7 @@ from types import SimpleNamespace import pytest +from metomi.isodatetime.data import TimePoint from cylc.flow.workflow_status import ( StopMode, @@ -24,9 +25,13 @@ WORKFLOW_STATUS_RUNNING_TO_HOLD, WORKFLOW_STATUS_RUNNING_TO_STOP, get_workflow_status, + get_workflow_status_msg, ) +STOP_TIME = TimePoint(year=2006).to_local_time_zone() + + def schd( final_point=None, hold_point=None, @@ -50,6 +55,7 @@ def schd( stop_task_id=stop_task_id, ), config=SimpleNamespace(final_point=final_point), + options=SimpleNamespace(utc_mode=True), ) @@ -83,9 +89,9 @@ def schd( WORKFLOW_STATUS_RUNNING_TO_STOP % 'point' ), ( - {'stop_clock_time': 1234}, + {'stop_clock_time': int(STOP_TIME.seconds_since_unix_epoch)}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_STOP % '' + WORKFLOW_STATUS_RUNNING_TO_STOP % str(STOP_TIME) ), ( {'stop_task_id': 'foo'}, @@ -112,22 +118,21 @@ def schd( ( # stopping should trump stalled, paused & running { - 'stop_mode': StopMode.AUTO, + 'stop_mode': StopMode.REQUEST_NOW, 'is_stalled': True, 'is_paused': True }, WorkflowStatus.STOPPING, - 'stopping' + 'stopping: shutting down' ), ( - # stalled should trump paused & running {'is_stalled': True, 'is_paused': True}, - WorkflowStatus.RUNNING, - 'stalled' + WorkflowStatus.PAUSED, + 'stalled (paused)' ), ] ) def test_get_workflow_status(kwargs, state, message): - state_, message_ = get_workflow_status(schd(**kwargs)) - assert state_ == state.value - assert message in message_ + scheduler = schd(**kwargs) + assert get_workflow_status(scheduler) == state + assert get_workflow_status_msg(scheduler) == message