Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task_pool: fix compute_runahead calculation #5833

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_FINAL,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_WAITING,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUCCEEDED,
Expand Down Expand Up @@ -362,20 +362,35 @@ def compute_runahead(self, force=False) -> bool:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
if (
points # got the limit already so this point too
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could never be true as points is populated on a different branch.

Copy link
Member

@hjoliver hjoliver Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's populated on this branch too, within the same loop. So the logic says to automatically append the next points once we have added the first one based on the state checks. (However, I'm not sure why it wasn't just stopping at the first one, since then we just take the min anyway).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're iterating in sorted order the earlier cycles should come first, so a break should make sense?

Copy link
Member Author

@oliver-sanders oliver-sanders Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's using points later in ways which don't cause test failures, so I'll restore the original to avoid breakages.

Actually, no, adding this line back breaks the new tests. I'm not sure why we would add cycle points with no active tasks to the points list?

or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
Comment on lines -367 to -371
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This counted waiting runahead tasks.

It should be superfluous if not cylc.flow.flags.cylc7_back_compat because is_incomplete should cover it anyway.

or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
any(
# filter out runahead tasks
itask.state(is_runahead=False)
and (
# waiting tasks
# * tasks with partially satisfied prereqs
# * tasks with incomplete xtriggers
# * held tasks
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
itask.state(TASK_STATUS_WAITING)

# unfinished tasks (back-compat mode)
# * Cylc 7 runahead logic considered a cycle to be
# active if it had "unfinished" tasks
or (
cylc.flow.flags.cylc7_back_compat
and not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED,
)
)

# incomplete tasks (optional outputs)
# * tasks with one or more required outputs
# incomplete
or (
not cylc.flow.flags.cylc7_back_compat
and itask.state.outputs.is_incomplete()
)
)
for itask in itasks
)
Expand Down
5 changes: 3 additions & 2 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ testpaths =
tests/integration/
env =
# a weird timezone to check that tests aren't assuming the local timezone
TZ=XXX-09:35
# Note: this doesn't work properly with pytest-xdist
# TZ=XXX-09:35
Comment on lines 36 to +38
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents the new test from failing.

Setting the TZ variable in this way doesn't work as intended.

A solution will arrive in #5826

doctest_optionflags =
NORMALIZE_WHITESPACE
IGNORE_EXCEPTION_DETAIL
ELLIPSIS
asyncio_mode = auto
markers=
linkcheck: Test links
linkcheck: Test links
18 changes: 18 additions & 0 deletions tests/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ These methods both shut down the workflow / clean up after themselves.

It is necessary to shut down workflows correctly to clean up resorces and
running tasks.

## Module Scoped Fixtures

There's a reasonable overhead to some text fixtures, especially the ones which
involve writing files to disk or starting Cylc schedulers.

To make tests run faster you can use module-scoped fixtures, these are test
fixtures which are created once, then reused for all tests in the module.

You'll find a bunch of module-scoped fixtues prefixed with `mod_`, e.g.
`mod_start` is the module-scoped version of `start`. When using module-scoped
fixtures, ensure that tests do not modify the fixture object as this will enable
tests to interact.

In order to get speedup from module-scoped fixtures when running with
pytest-xdist, we configure pytest-xdist to run all of the tests in a module in
series using the same pytest runner. This incentivises breaking up larger test
modules.
Comment on lines +83 to +84
Copy link
Member

@wxtim wxtim Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible alternative

Suggested change
series using the same pytest runner. This incentivises breaking up larger test
modules.
series using the same pytest runner. Consider breaking up
large test modules to allow tests to be run in parallel.

89 changes: 89 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,3 +1246,92 @@ async def test_detect_incomplete_tasks(
assert log_filter(log, contains=f"[{itask}] did not complete required outputs:")
# the task should not have been removed
assert itask in schd.pool.get_tasks()


@pytest.mark.parametrize('compat_mode', ['compat-mode', 'normal-mode'])
@pytest.mark.parametrize('cycling_mode', ['integer', 'datetime'])
async def test_compute_runahead(
cycling_mode,
compat_mode,
flow,
scheduler,
start,
monkeypatch,
):
"""Test the calculation of the runahead limit.

This test ensures that:
* Runahead tasks are excluded from computations
see https://github.com/cylc/cylc-flow/issues/5825
* Tasks are initiated with the correct is_runahead status on statup.
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
* Behaviour is the same in compat/regular modes.
* Behaviour is the same for integer/datetime cycling modes.

"""
if cycling_mode == 'integer':
config = {
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': '1',
'cycling mode': 'integer',
'runahead limit': 'P3',
'graph': {
'P1': 'a'
},
}
}
point = lambda point: IntegerPoint(str(int(point)))
else:
config = {
'scheduler': {
'allow implicit tasks': 'True',
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '0001',
'runahead limit': 'P3Y',
'graph': {
'P1Y': 'a'
},
}
}
point = ISO8601Point

monkeypatch.setattr(
'cylc.flow.flags.cylc7_back_compat',
compat_mode == 'compat-mode',
)

id_ = flow(config)
schd = scheduler(id_)
async with start(schd):
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4

# ensure task states are initiated with is_runahead status
assert schd.pool.get_task(point('0001'), 'a').state(is_runahead=False)
assert schd.pool.get_task(point('0005'), 'a').state(is_runahead=True)

# mark the first three cycles as running
for cycle in range(1, 4):
schd.pool.get_task(point(f'{cycle:04}'), 'a').state.reset(
TASK_STATUS_RUNNING
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4 # no change

# mark cycle 1 as incomplete (but finished)
schd.pool.get_task(point('0001'), 'a').state.reset(
TASK_STATUS_SUBMIT_FAILED
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 4 # no change

# mark cycle 1 as complete
schd.pool.get_task(point('0001'), 'a').state.reset(
TASK_STATUS_SUCCEEDED
)
schd.pool.compute_runahead(force=True)
assert int(str(schd.pool.runahead_limit_point)) == 5 # +1
Loading