diff --git a/changes.d/6200.fix.md b/changes.d/6200.fix.md new file mode 100644 index 00000000000..3b4cf8012cf --- /dev/null +++ b/changes.d/6200.fix.md @@ -0,0 +1 @@ +Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused \ No newline at end of file diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index a4ea43df5cf..134681bdfd5 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -189,16 +189,19 @@ async def stop( schd.workflow_db_mgr.put_workflow_stop_cycle_point( schd.options.stopcp ) + schd._update_workflow_state() elif clock_time is not None: # schedule shutdown after wallclock time passes provided time parser = TimePointParser() schd.set_stop_clock( int(parser.parse(clock_time).seconds_since_unix_epoch) ) + schd._update_workflow_state() elif task is not None: # schedule shutdown after task succeeds task_id = TaskID.get_standardised_taskid(task) schd.pool.set_stop_task(task_id) + schd._update_workflow_state() else: # immediate shutdown with suppress(KeyError): @@ -229,6 +232,7 @@ async def release_hold_point(schd: 'Scheduler'): yield LOG.info("Releasing all tasks and removing hold cycle point.") schd.pool.release_hold_point() + schd._update_workflow_state() @_command('resume') @@ -287,6 +291,7 @@ async def set_hold_point(schd: 'Scheduler', point: str): "All tasks after this point will be held." ) schd.pool.set_hold_point(cycle_point) + schd._update_workflow_state() @_command('pause') diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index c59fa7b6c62..0befc1b4dad 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -85,7 +85,10 @@ pdeepcopy, poverride ) -from cylc.flow.workflow_status import get_workflow_status +from cylc.flow.workflow_status import ( + get_workflow_status, + get_workflow_status_msg, +) from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( @@ -2174,8 +2177,8 @@ def update_workflow(self, reloaded=False): w_delta.latest_state_tasks[state].task_proxies[:] = tp_queue # Set status & msg if changed. - status, status_msg = map( - str, get_workflow_status(self.schd)) + status = get_workflow_status(self.schd).value + status_msg = get_workflow_status_msg(self.schd) if w_data.status != status or w_data.status_msg != status_msg: w_delta.status = status w_delta.status_msg = status_msg diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index ff593648e7b..b3bbbd23d7d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -2000,7 +2000,7 @@ def update_data_store(self): Call this method whenever the Scheduler's state has changed in a way that requires a data store update. - See cylc.flow.workflow_status.get_workflow_status() for a + See cylc.flow.workflow_status.get_workflow_status_msg() for a (non-exhaustive?) list of properties that if changed will require this update. diff --git a/cylc/flow/tui/util.py b/cylc/flow/tui/util.py index 88e960e249a..33494f9fb06 100644 --- a/cylc/flow/tui/util.py +++ b/cylc/flow/tui/util.py @@ -384,19 +384,6 @@ def get_task_status_summary(flow): ] -def get_workflow_status_str(flow): - """Return a workflow status string for the header. - - Arguments: - flow (dict): - GraphQL JSON response for this workflow. - - Returns: - list - Text list for the urwid.Text widget. - - """ - - def _render_user(node, data): return f'~{ME}' diff --git a/cylc/flow/wallclock.py b/cylc/flow/wallclock.py index 46c5d2487ce..5c3479a07db 100644 --- a/cylc/flow/wallclock.py +++ b/cylc/flow/wallclock.py @@ -16,7 +16,7 @@ """Wall clock related utilities.""" from calendar import timegm -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from metomi.isodatetime.timezone import ( get_local_time_zone_format, get_local_time_zone, TimeZoneFormatMode) @@ -209,7 +209,7 @@ def get_time_string_from_unix_time(unix_time, display_sub_seconds=False, to use as the time zone designator. """ - date_time = datetime.utcfromtimestamp(unix_time) + date_time = datetime.fromtimestamp(unix_time, timezone.utc) return get_time_string(date_time, display_sub_seconds=display_sub_seconds, use_basic_format=use_basic_format, diff --git a/cylc/flow/workflow_status.py b/cylc/flow/workflow_status.py index 02f42717ed3..d6d6fb587dc 100644 --- a/cylc/flow/workflow_status.py +++ b/cylc/flow/workflow_status.py @@ -16,13 +16,18 @@ """Workflow status constants.""" from enum import Enum -from typing import Tuple, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, Union +from cylc.flow.cycling.loader import get_point +from cylc.flow.id import tokenise from cylc.flow.wallclock import get_time_string_from_unix_time as time2str if TYPE_CHECKING: from optparse import Values + + from cylc.flow.cycling import PointBase from cylc.flow.scheduler import Scheduler + from cylc.flow.task_pool import TaskPool # Keys for identify API call KEY_GROUP = "group" @@ -143,62 +148,60 @@ class AutoRestartMode(Enum): """Workflow will stop immediately but *not* attempt to restart.""" -def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]: - """Return the status of the provided workflow. - - This should be a short, concise description of the workflow state. - - Args: - schd: The running workflow - - Returns: - tuple - (state, state_msg) - - state: - The WorkflowState. - state_msg: - Text describing the current state (may be an empty string). +def get_workflow_status(schd: 'Scheduler') -> WorkflowStatus: + """Return the status of the provided workflow.""" + if schd.stop_mode is not None: + return WorkflowStatus.STOPPING + if schd.is_paused or schd.reload_pending: + return WorkflowStatus.PAUSED + return WorkflowStatus.RUNNING - """ - status = WorkflowStatus.RUNNING - status_msg = '' +def get_workflow_status_msg(schd: 'Scheduler') -> str: + """Return a short, concise status message for the provided workflow.""" if schd.stop_mode is not None: - status = WorkflowStatus.STOPPING - status_msg = f'stopping: {schd.stop_mode.explain()}' - elif schd.reload_pending: - status = WorkflowStatus.PAUSED - status_msg = f'reloading: {schd.reload_pending}' - elif schd.is_stalled: - status_msg = 'stalled' - elif schd.is_paused: - status = WorkflowStatus.PAUSED - status_msg = 'paused' - elif schd.pool.hold_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_HOLD % - schd.pool.hold_point) - elif schd.pool.stop_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.pool.stop_point) - elif schd.stop_clock_time is not None: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - time2str(schd.stop_clock_time)) - elif schd.pool.stop_task_id: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.pool.stop_task_id) - elif schd.config and schd.config.final_point: - status_msg = ( - WORKFLOW_STATUS_RUNNING_TO_STOP % - schd.config.final_point) - else: - # fallback - running indefinitely - status_msg = 'running' - - return (status.value, status_msg) + return f'stopping: {schd.stop_mode.explain()}' + if schd.reload_pending: + return f'reloading: {schd.reload_pending}' + if schd.is_stalled: + if schd.is_paused: + return 'stalled and paused' + return 'stalled' + if schd.is_paused: + return 'paused' + if schd.stop_clock_time is not None: + return WORKFLOW_STATUS_RUNNING_TO_STOP % time2str( + schd.stop_clock_time + ) + stop_point_msg = _get_earliest_stop_point_status_msg(schd.pool) + if stop_point_msg is not None: + return stop_point_msg + if schd.config and schd.config.final_point: + return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.config.final_point + # fallback - running indefinitely + return 'running' + + +def _get_earliest_stop_point_status_msg(pool: 'TaskPool') -> Optional[str]: + """Return the status message for the earliest stop point in the pool, + if any.""" + template = WORKFLOW_STATUS_RUNNING_TO_STOP + prop: Union[PointBase, str, None] = pool.stop_task_id + min_point: Optional[PointBase] = get_point( + tokenise(pool.stop_task_id, relative=True)['cycle'] + if pool.stop_task_id else None + ) + for point, tmpl in ( + (pool.stop_point, WORKFLOW_STATUS_RUNNING_TO_STOP), + (pool.hold_point, WORKFLOW_STATUS_RUNNING_TO_HOLD) + ): + if point is not None and (min_point is None or point < min_point): + template = tmpl + min_point = point + prop = point + if prop is None: + return None + return template % prop class RunMode: diff --git a/tests/unit/test_workflow_status.py b/tests/unit/test_workflow_status.py index af88de3daab..f0c92fd1530 100644 --- a/tests/unit/test_workflow_status.py +++ b/tests/unit/test_workflow_status.py @@ -17,15 +17,20 @@ from types import SimpleNamespace import pytest +from metomi.isodatetime.data import TimePoint +from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.workflow_status import ( - StopMode, - WorkflowStatus, WORKFLOW_STATUS_RUNNING_TO_HOLD, WORKFLOW_STATUS_RUNNING_TO_STOP, + StopMode, + WorkflowStatus, get_workflow_status, + get_workflow_status_msg, ) +STOP_TIME = TimePoint(year=2006).to_local_time_zone() + def schd( final_point=None, @@ -50,6 +55,7 @@ def schd( stop_task_id=stop_task_id, ), config=SimpleNamespace(final_point=final_point), + options=SimpleNamespace(utc_mode=True), ) @@ -73,29 +79,29 @@ def schd( 'stopping: waiting for active jobs to complete' ), ( - {'hold_point': 'point'}, + {'hold_point': 2}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_HOLD % 'point' + WORKFLOW_STATUS_RUNNING_TO_HOLD % 2 ), ( - {'stop_point': 'point'}, + {'stop_point': 4}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_STOP % 'point' + WORKFLOW_STATUS_RUNNING_TO_STOP % 4 ), ( - {'stop_clock_time': 1234}, + {'stop_clock_time': int(STOP_TIME.seconds_since_unix_epoch)}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_STOP % '' + WORKFLOW_STATUS_RUNNING_TO_STOP % str(STOP_TIME) ), ( - {'stop_task_id': 'foo'}, + {'stop_task_id': '6/foo'}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_STOP % 'foo' + WORKFLOW_STATUS_RUNNING_TO_STOP % '6/foo' ), ( - {'final_point': 'point'}, + {'final_point': 8}, WorkflowStatus.RUNNING, - WORKFLOW_STATUS_RUNNING_TO_STOP % 'point' + WORKFLOW_STATUS_RUNNING_TO_STOP % 8 ), ( {'is_stalled': True}, @@ -112,22 +118,58 @@ def schd( ( # stopping should trump stalled, paused & running { - 'stop_mode': StopMode.AUTO, + 'stop_mode': StopMode.REQUEST_NOW, 'is_stalled': True, 'is_paused': True }, WorkflowStatus.STOPPING, - 'stopping' + 'stopping: shutting down' ), ( - # stalled should trump paused & running {'is_stalled': True, 'is_paused': True}, + WorkflowStatus.PAUSED, + 'stalled and paused', + ), + ( + # earliest of stop point, hold point and stop task id + { + 'stop_point': IntegerPoint(4), + 'hold_point': IntegerPoint(2), + 'stop_task_id': '6/foo', + }, WorkflowStatus.RUNNING, - 'stalled' + WORKFLOW_STATUS_RUNNING_TO_HOLD % 2, + ), + ( + { + 'stop_point': IntegerPoint(11), + 'hold_point': IntegerPoint(15), + 'stop_task_id': '9/bar', + }, + WorkflowStatus.RUNNING, + WORKFLOW_STATUS_RUNNING_TO_STOP % '9/bar', + ), + ( + { + 'stop_point': IntegerPoint(3), + 'hold_point': IntegerPoint(3), + }, + WorkflowStatus.RUNNING, + WORKFLOW_STATUS_RUNNING_TO_STOP % 3, + ), + ( + # stop point trumps final point + { + 'stop_point': IntegerPoint(1), + 'final_point': IntegerPoint(2), + }, + WorkflowStatus.RUNNING, + WORKFLOW_STATUS_RUNNING_TO_STOP % 1, ), ] ) -def test_get_workflow_status(kwargs, state, message): - state_, message_ = get_workflow_status(schd(**kwargs)) - assert state_ == state.value - assert message in message_ +def test_get_workflow_status(kwargs, state, message, set_cycling_type): + set_cycling_type() + scheduler = schd(**kwargs) + assert get_workflow_status(scheduler) == state + assert get_workflow_status_msg(scheduler) == message