Skip to content

Commit

Permalink
Ensure workflow status shows the earliest of the stop point, hold poi…
Browse files Browse the repository at this point in the history
…nt or stop task
  • Loading branch information
MetRonnie committed Jul 5, 2024
1 parent 47b74ae commit a0ee209
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 21 deletions.
40 changes: 32 additions & 8 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
"""Workflow status constants."""

from enum import Enum
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, Union

from cylc.flow.cycling.loader import get_point
from cylc.flow.id import tokenise
from cylc.flow.wallclock import get_time_string_from_unix_time as time2str

if TYPE_CHECKING:
from optparse import Values

from cylc.flow.cycling import PointBase
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_pool import TaskPool

# Keys for identify API call
KEY_GROUP = "group"
Expand Down Expand Up @@ -160,26 +165,45 @@ def get_workflow_status_msg(schd: 'Scheduler') -> str:
return f'reloading: {schd.reload_pending}'
if schd.is_stalled:
if schd.is_paused:
return 'stalled (paused)'
return 'stalled and paused'
return 'stalled'
if schd.is_paused:
return 'paused'
if schd.pool.hold_point:
return WORKFLOW_STATUS_RUNNING_TO_HOLD % schd.pool.hold_point
if schd.pool.stop_point:
return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.pool.stop_point
if schd.stop_clock_time is not None:
return WORKFLOW_STATUS_RUNNING_TO_STOP % time2str(
schd.stop_clock_time
)
if schd.pool.stop_task_id:
return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.pool.stop_task_id
stop_point_msg = _get_earliest_stop_point_status_msg(schd.pool)
if stop_point_msg is not None:
return stop_point_msg
if schd.config and schd.config.final_point:
return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.config.final_point
# fallback - running indefinitely
return 'running'


def _get_earliest_stop_point_status_msg(pool: 'TaskPool') -> Optional[str]:
"""Return the status message for the earliest stop point in the pool,
if any."""
template = WORKFLOW_STATUS_RUNNING_TO_STOP
prop: Union[PointBase, str, None] = pool.stop_task_id
min_point: Optional[PointBase] = get_point(
tokenise(pool.stop_task_id, relative=True)['cycle']
if pool.stop_task_id else None
)
for point, tmpl in (
(pool.stop_point, WORKFLOW_STATUS_RUNNING_TO_STOP),
(pool.hold_point, WORKFLOW_STATUS_RUNNING_TO_HOLD)
):
if point is not None and (min_point is None or point < min_point):
template = tmpl
min_point = point
prop = point
if prop is None:
return None
return template % prop


class RunMode:
"""The possible run modes of a workflow."""

Expand Down
63 changes: 50 additions & 13 deletions tests/unit/test_workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import pytest
from metomi.isodatetime.data import TimePoint

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.workflow_status import (
StopMode,
WorkflowStatus,
WORKFLOW_STATUS_RUNNING_TO_HOLD,
WORKFLOW_STATUS_RUNNING_TO_STOP,
StopMode,
WorkflowStatus,
get_workflow_status,
get_workflow_status_msg,
)


STOP_TIME = TimePoint(year=2006).to_local_time_zone()


Expand Down Expand Up @@ -79,29 +79,29 @@ def schd(
'stopping: waiting for active jobs to complete'
),
(
{'hold_point': 'point'},
{'hold_point': 2},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_HOLD % 'point'
WORKFLOW_STATUS_RUNNING_TO_HOLD % 2
),
(
{'stop_point': 'point'},
{'stop_point': 4},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % 'point'
WORKFLOW_STATUS_RUNNING_TO_STOP % 4
),
(
{'stop_clock_time': int(STOP_TIME.seconds_since_unix_epoch)},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % str(STOP_TIME)
),
(
{'stop_task_id': 'foo'},
{'stop_task_id': '6/foo'},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % 'foo'
WORKFLOW_STATUS_RUNNING_TO_STOP % '6/foo'
),
(
{'final_point': 'point'},
{'final_point': 8},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % 'point'
WORKFLOW_STATUS_RUNNING_TO_STOP % 8
),
(
{'is_stalled': True},
Expand All @@ -128,11 +128,48 @@ def schd(
(
{'is_stalled': True, 'is_paused': True},
WorkflowStatus.PAUSED,
'stalled (paused)'
'stalled and paused',
),
(
# earliest of stop point, hold point and stop task id
{
'stop_point': IntegerPoint(4),
'hold_point': IntegerPoint(2),
'stop_task_id': '6/foo',
},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_HOLD % 2,
),
(
{
'stop_point': IntegerPoint(11),
'hold_point': IntegerPoint(15),
'stop_task_id': '9/bar',
},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % '9/bar',
),
(
{
'stop_point': IntegerPoint(3),
'hold_point': IntegerPoint(3),
},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % 3,
),
(
# stop point trumps final point
{
'stop_point': IntegerPoint(1),
'final_point': IntegerPoint(2),
},
WorkflowStatus.RUNNING,
WORKFLOW_STATUS_RUNNING_TO_STOP % 1,
),
]
)
def test_get_workflow_status(kwargs, state, message):
def test_get_workflow_status(kwargs, state, message, set_cycling_type):
set_cycling_type()
scheduler = schd(**kwargs)
assert get_workflow_status(scheduler) == state
assert get_workflow_status_msg(scheduler) == message

0 comments on commit a0ee209

Please sign in to comment.