diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 6ddc62d39f2..e964d2bfedb 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -51,13 +51,13 @@ from cylc.flow.task_state import ( TASK_STATUSES_ACTIVE, TASK_STATUSES_FINAL, - TASK_STATUS_WAITING, TASK_STATUS_EXPIRED, + TASK_STATUS_FAILED, TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING, + TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED, - TASK_STATUS_FAILED, + TASK_STATUS_WAITING, TASK_OUTPUT_EXPIRED, TASK_OUTPUT_FAILED, TASK_OUTPUT_SUCCEEDED, @@ -362,20 +362,35 @@ def compute_runahead(self, force=False) -> bool: # Find the earliest point with unfinished tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): if ( - points # got the limit already so this point too - or any( - not itask.state( - TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_EXPIRED - ) - or ( - # For Cylc 7 back-compat, ignore incomplete tasks. - # (Success is required in back-compat mode, so - # failedtasks end up as incomplete; and Cylc 7 - # ignores failed tasks in computing the limit). - itask.state.outputs.is_incomplete() - and not cylc.flow.flags.cylc7_back_compat + any( + # filter out runahead tasks + itask.state(is_runahead=False) + and ( + # waiting tasks + # * tasks with partially satisfied prereqs + # * tasks with incomplete xtriggers + # * held tasks + itask.state(TASK_STATUS_WAITING) + + # unfinished tasks (back-compat mode) + # * Cylc 7 runahead logic considered a cycle to be + # active if it had "unfinished" tasks + or ( + cylc.flow.flags.cylc7_back_compat + and not itask.state( + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED, + TASK_STATUS_EXPIRED, + ) + ) + + # incomplete tasks (optional outputs) + # * tasks with one or more required outputs + # incomplete + or ( + not cylc.flow.flags.cylc7_back_compat + and itask.state.outputs.is_incomplete() + ) ) for itask in itasks ) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 97526088660..1efc151a9b2 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1246,3 +1246,91 @@ async def test_detect_incomplete_tasks( assert log_filter(log, contains=f"[{itask}] did not complete required outputs:") # the task should not have been removed assert itask in schd.pool.get_tasks() + + +@pytest.mark.parametrize('back_compat', [True, False]) +@pytest.mark.parametrize('integer', [True, False]) +async def test_compute_runahead( + integer, + back_compat, + flow, + scheduler, + start, + monkeypatch, +): + """Test the calculation of the runahead limit. + + This test ensures that: + * Runahead tasks are excluded from computations + see https://github.com/cylc/cylc-flow/issues/5825 + * Tasks are initiated with the correct is_runahead status on statup. + * Behaviour is the same in compat/regular modes. + * Behaviour is the same for integer/datetime cycling modes. + + """ + if integer: + # cycling mode = integer + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'runahead limit': 'P3', + 'graph': { + 'P1': 'a' + }, + } + } + point = lambda point: IntegerPoint(str(int(point))) + else: + # cycling mode = gregorian + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '0001', + 'runahead limit': 'P3Y', + 'graph': { + 'P1Y': 'a' + }, + } + } + point = ISO8601Point + + monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', back_compat) + + id_ = flow(config) + schd = scheduler(id_) + async with start(schd): + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 + + # ensure task states are initiated with is_runahead status + assert schd.pool.get_task(point('0001'), 'a').state(is_runahead=False) + assert schd.pool.get_task(point('0005'), 'a').state(is_runahead=True) + + # mark the first three cycles as running + for cycle in range(1, 4): + schd.pool.get_task(point(f'{cycle:04}'), 'a').state.reset( + TASK_STATUS_RUNNING + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # mark cycle 1 as incomplete (but finished) + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_SUBMIT_FAILED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # mark cycle 1 as complete + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_SUCCEEDED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 5 # +1