Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/i: add reflog fixture #5826

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ jobs:
fail-fast: false # don't stop on first failure
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3']
python-version: ['3.7', '3.8', '3.10', '3.11', '3']
include:
# mac os test
- os: 'macos-latest'
python-version: '3.7'
python-version: '3.7' # oldest supported version

# non-utc timezone test
- os: 'ubuntu-latest'
python-version: '3.9' # not the oldest, not the most recent version
time-zone: 'XXX-09:35'

env:
# Use non-UTC time zone
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes

steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,9 +1400,11 @@ def remove_if_complete(self, itask):
- retain and recompute runahead
(C7 failed tasks don't count toward runahead limit)
"""
ret = False
if cylc.flow.flags.cylc7_back_compat:
if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'finished')
ret = True
if self.compute_runahead():
self.release_runahead_tasks()
else:
Expand All @@ -1416,11 +1418,14 @@ def remove_if_complete(self, itask):
else:
# Remove as completed.
self.remove(itask, 'finished')
ret = True
if itask.identity == self.stop_task_id:
self.stop_task_finished = True
if self.compute_runahead():
self.release_runahead_tasks()

return ret

def spawn_on_all_outputs(
self, itask: TaskProxy, completed_only: bool = False
) -> None:
Expand Down
5 changes: 1 addition & 4 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ testpaths =
cylc/flow/
tests/unit/
tests/integration/
env =
# a weird timezone to check that tests aren't assuming the local timezone
TZ=XXX-09:35
doctest_optionflags =
NORMALIZE_WHITESPACE
IGNORE_EXCEPTION_DETAIL
ELLIPSIS
asyncio_mode = auto
markers=
linkcheck: Test links
linkcheck: Test links
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
140 changes: 140 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
import pytest
from shutil import rmtree
from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union

from cylc.flow.config import WorkflowConfig
Expand All @@ -32,8 +33,10 @@
install as cylc_install,
get_option_parser as install_gop
)
from cylc.flow.util import serialise
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.workflow_status import StopMode

from .utils import _rm_if_empty
from .utils.flow_tools import (
Expand Down Expand Up @@ -473,3 +476,140 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner


@pytest.fixture
def reflog():
"""Integration test version of the --reflog CLI option.

This returns a set which captures task triggers.

Note, you'll need to call this on the scheduler *after* you have started
it.

Args:
schd:
The scheduler to capture triggering information for.
flow_nums:
If True, the flow numbers of the task being triggered will be added
to the end of each entry.

Returns:
tuple

(task, triggers):
If flow_nums == False
(task, flow_nums, triggers):
If flow_nums == True

task:
The [relative] task ID e.g. "1/a".
flow_nums:
The serialised flow nums e.g. ["1"].
triggers:
Sorted tuple of the trigger IDs, e.g. ("1/a", "2/b").

"""

def _reflog(schd, flow_nums=False):
submit_task_jobs = schd.task_job_mgr.submit_task_jobs
triggers = set()

def _submit_task_jobs(*args, **kwargs):
nonlocal submit_task_jobs, triggers, flow_nums
itasks = submit_task_jobs(*args, **kwargs)
for itask in itasks:
deps = tuple(sorted(itask.state.get_resolved_dependencies()))
if flow_nums:
triggers.add(
(itask.identity, serialise(itask.flow_nums), deps or None)
)
else:
triggers.add((itask.identity, deps or None))
return itasks

schd.task_job_mgr.submit_task_jobs = _submit_task_jobs
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved

return triggers

return _reflog


@pytest.fixture
def complete():
"""Wait for the workflow, or tasks within it to complete.

Args:
schd:
The scheduler to await.
tokens_list:
If specified, this will wait for the tasks represented by these
tokens to be marked as completed by the task pool.
stop_mode:
If tokens_list is not provided, this will wait for the scheduler
to be shutdown with the specified mode (default = AUTO, i.e.
workflow completed normally).
timeout:
Max time to wait for the condition to be met.

Note, if you need to increase this, you might want to rethink your
test.

Note, use this timeout rather than wrapping the complete call with
async_timeout (handles shutdown logic more cleanly).

"""
async def _complete(
schd,
*tokens_list,
stop_mode=StopMode.AUTO,
timeout=60,
):
start_time = time()
tokens_list = [tokens.task for tokens in tokens_list]

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask):
ret = remove_if_complete(itask)
if ret and itask.tokens.task in tokens_list:
tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if tokens_list:
condition = lambda: bool(tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if time() - start_time > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop

return _complete
42 changes: 42 additions & 0 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,45 @@ async def test_db_select(one, start, db_select):
results = db_select(
schd, False, 'task_states', name='one', status='waiting')
assert len(results) == 1


async def test_reflog(flow, scheduler, run, reflog, complete):
"""Test the triggering of tasks.

This is the integration test version of "reftest" in the funtional tests.

It works by capturing the triggers which caused each submission so that
they can be compared with the expected outcome.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': '1',
'final cycle point': '1',
'cycling mode': 'integer',
'graph': {
'P1': '''
a => b => c
x => b => z
b[-P1] => b
'''
}
}
})
schd = scheduler(id_, paused_start=False)

async with run(schd):
triggers = reflog(schd) # Note: add flow_nums=True to capture flows
await complete(schd)

assert triggers == {
# 1/a was triggered by nothing (i.e. it's parentless)
('1/a', None),
# 1/b was triggered by three tasks (note the pre-initial dependency)
('1/b', ('0/b', '1/a', '1/x')),
('1/c', ('1/b',)),
('1/x', None),
('1/z', ('1/b',)),
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">Path: mypath </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">&lt;</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#e5e5e5;background:#000000">S</span><span style="color:#000000;background:#e5e5e5">elect File </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">&gt;</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[runtime] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of the [scheduling] and [runtime] sections of the config changed as a result of prepending the default simulation runtime.

<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[a]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[scheduling] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[graph]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> R1 = a </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[runtime] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[a]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
<span style="color:#000000;background:#e5e5e5">│</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">│</span>
Expand Down
11 changes: 10 additions & 1 deletion tests/integration/tui/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,16 @@ async def test_filters(one_conf, flow, scheduler, run, updater):
'graph': {
'R1': 'a & b & c',
}
}
},
'runtime': {
# TODO: remove this runtime section in
# https://github.com/cylc/cylc-flow/pull/5721
'root': {
'simulation': {
'default run length': 'PT1M',
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch, short runtimes could cause tasks to go through running & succeeded even whilst the workflow is paused.

},
},
},
}, name='one'), paused_start=True)
two = scheduler(flow(one_conf, name='two'))
tre = scheduler(flow(one_conf, name='tre'))
Expand Down
17 changes: 14 additions & 3 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _make_src_flow(src_path, conf):
def _make_flow(
cylc_run_dir: Union[Path, str],
test_dir: Path,
conf: Union[dict, str],
conf: dict,
name: Optional[str] = None,
id_: Optional[str] = None,
) -> str:
Expand All @@ -66,8 +66,19 @@ def _make_flow(
flow_run_dir = (test_dir / name)
flow_run_dir.mkdir(parents=True, exist_ok=True)
id_ = str(flow_run_dir.relative_to(cylc_run_dir))
if isinstance(conf, dict):
conf = flow_config_str(conf)
conf = flow_config_str({
# override the default simulation runtime logic to make
# tasks execute instantly
# NOTE: this is prepended so it can be overwritten
'runtime': {
'root': {
'simulation': {
'default run length': 'PT0S',
},
},
},
**conf,
})
with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
flow_file.write(conf)
return id_
Expand Down
Loading