Skip to content

Commit

Permalink
Merge pull request cylc#5826 from oliver-sanders/reflog
Browse files Browse the repository at this point in the history
tests/i: add reflog fixture
  • Loading branch information
MetRonnie authored Dec 14, 2023
2 parents fe43c7c + fa340fa commit 6f62691
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 13 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ jobs:
fail-fast: false # don't stop on first failure
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3']
python-version: ['3.7', '3.8', '3.10', '3.11', '3']
include:
# mac os test
- os: 'macos-latest'
python-version: '3.7'
python-version: '3.7' # oldest supported version

# non-utc timezone test
- os: 'ubuntu-latest'
python-version: '3.9' # not the oldest, not the most recent version
time-zone: 'XXX-09:35'

env:
# Use non-UTC time zone
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes

steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,9 +1400,11 @@ def remove_if_complete(self, itask):
- retain and recompute runahead
(C7 failed tasks don't count toward runahead limit)
"""
ret = False
if cylc.flow.flags.cylc7_back_compat:
if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'finished')
ret = True
if self.compute_runahead():
self.release_runahead_tasks()
else:
Expand All @@ -1416,11 +1418,14 @@ def remove_if_complete(self, itask):
else:
# Remove as completed.
self.remove(itask, 'finished')
ret = True
if itask.identity == self.stop_task_id:
self.stop_task_finished = True
if self.compute_runahead():
self.release_runahead_tasks()

return ret

def spawn_on_all_outputs(
self, itask: TaskProxy, completed_only: bool = False
) -> None:
Expand Down
5 changes: 1 addition & 4 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ testpaths =
cylc/flow/
tests/unit/
tests/integration/
env =
# a weird timezone to check that tests aren't assuming the local timezone
TZ=XXX-09:35
doctest_optionflags =
NORMALIZE_WHITESPACE
IGNORE_EXCEPTION_DETAIL
ELLIPSIS
asyncio_mode = auto
markers=
linkcheck: Test links
linkcheck: Test links
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
140 changes: 140 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
import pytest
from shutil import rmtree
from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union

from cylc.flow.config import WorkflowConfig
Expand All @@ -32,8 +33,10 @@
install as cylc_install,
get_option_parser as install_gop
)
from cylc.flow.util import serialise
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.workflow_status import StopMode

from .utils import _rm_if_empty
from .utils.flow_tools import (
Expand Down Expand Up @@ -473,3 +476,140 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner


@pytest.fixture
def reflog():
"""Integration test version of the --reflog CLI option.
This returns a set which captures task triggers.
Note, you'll need to call this on the scheduler *after* you have started
it.
Args:
schd:
The scheduler to capture triggering information for.
flow_nums:
If True, the flow numbers of the task being triggered will be added
to the end of each entry.
Returns:
tuple
(task, triggers):
If flow_nums == False
(task, flow_nums, triggers):
If flow_nums == True
task:
The [relative] task ID e.g. "1/a".
flow_nums:
The serialised flow nums e.g. ["1"].
triggers:
Sorted tuple of the trigger IDs, e.g. ("1/a", "2/b").
"""

def _reflog(schd, flow_nums=False):
submit_task_jobs = schd.task_job_mgr.submit_task_jobs
triggers = set()

def _submit_task_jobs(*args, **kwargs):
nonlocal submit_task_jobs, triggers, flow_nums
itasks = submit_task_jobs(*args, **kwargs)
for itask in itasks:
deps = tuple(sorted(itask.state.get_resolved_dependencies()))
if flow_nums:
triggers.add(
(itask.identity, serialise(itask.flow_nums), deps or None)
)
else:
triggers.add((itask.identity, deps or None))
return itasks

schd.task_job_mgr.submit_task_jobs = _submit_task_jobs

return triggers

return _reflog


@pytest.fixture
def complete():
"""Wait for the workflow, or tasks within it to complete.
Args:
schd:
The scheduler to await.
tokens_list:
If specified, this will wait for the tasks represented by these
tokens to be marked as completed by the task pool.
stop_mode:
If tokens_list is not provided, this will wait for the scheduler
to be shutdown with the specified mode (default = AUTO, i.e.
workflow completed normally).
timeout:
Max time to wait for the condition to be met.
Note, if you need to increase this, you might want to rethink your
test.
Note, use this timeout rather than wrapping the complete call with
async_timeout (handles shutdown logic more cleanly).
"""
async def _complete(
schd,
*tokens_list,
stop_mode=StopMode.AUTO,
timeout=60,
):
start_time = time()
tokens_list = [tokens.task for tokens in tokens_list]

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask):
ret = remove_if_complete(itask)
if ret and itask.tokens.task in tokens_list:
tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if tokens_list:
condition = lambda: bool(tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if time() - start_time > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop

return _complete
42 changes: 42 additions & 0 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,45 @@ async def test_db_select(one, start, db_select):
results = db_select(
schd, False, 'task_states', name='one', status='waiting')
assert len(results) == 1


async def test_reflog(flow, scheduler, run, reflog, complete):
"""Test the triggering of tasks.
This is the integration test version of "reftest" in the funtional tests.
It works by capturing the triggers which caused each submission so that
they can be compared with the expected outcome.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': '1',
'final cycle point': '1',
'cycling mode': 'integer',
'graph': {
'P1': '''
a => b => c
x => b => z
b[-P1] => b
'''
}
}
})
schd = scheduler(id_, paused_start=False)

async with run(schd):
triggers = reflog(schd) # Note: add flow_nums=True to capture flows
await complete(schd)

assert triggers == {
# 1/a was triggered by nothing (i.e. it's parentless)
('1/a', None),
# 1/b was triggered by three tasks (note the pre-initial dependency)
('1/b', ('0/b', '1/a', '1/x')),
('1/c', ('1/b',)),
('1/x', None),
('1/z', ('1/b',)),
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">Path: mypath </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">&lt;</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#e5e5e5;background:#000000">S</span><span style="color:#000000;background:#e5e5e5">elect File </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">&gt;</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[runtime] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[a]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[scheduling] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[graph]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> R1 = a </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">[runtime] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> [[a]] </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"></span>
Expand Down
11 changes: 10 additions & 1 deletion tests/integration/tui/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,16 @@ async def test_filters(one_conf, flow, scheduler, run, updater):
'graph': {
'R1': 'a & b & c',
}
}
},
'runtime': {
# TODO: remove this runtime section in
# https://github.com/cylc/cylc-flow/pull/5721
'root': {
'simulation': {
'default run length': 'PT1M',
},
},
},
}, name='one'), paused_start=True)
two = scheduler(flow(one_conf, name='two'))
tre = scheduler(flow(one_conf, name='tre'))
Expand Down
17 changes: 14 additions & 3 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _make_src_flow(src_path, conf):
def _make_flow(
cylc_run_dir: Union[Path, str],
test_dir: Path,
conf: Union[dict, str],
conf: dict,
name: Optional[str] = None,
id_: Optional[str] = None,
) -> str:
Expand All @@ -66,8 +66,19 @@ def _make_flow(
flow_run_dir = (test_dir / name)
flow_run_dir.mkdir(parents=True, exist_ok=True)
id_ = str(flow_run_dir.relative_to(cylc_run_dir))
if isinstance(conf, dict):
conf = flow_config_str(conf)
conf = flow_config_str({
# override the default simulation runtime logic to make
# tasks execute instantly
# NOTE: this is prepended so it can be overwritten
'runtime': {
'root': {
'simulation': {
'default run length': 'PT0S',
},
},
},
**conf,
})
with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
flow_file.write(conf)
return id_
Expand Down

0 comments on commit 6f62691

Please sign in to comment.