Skip to content

Commit

Permalink
tests/i: add reflog utility
Browse files Browse the repository at this point in the history
* Add an itegration test pattern for implementing reference tests.
  • Loading branch information
oliver-sanders committed Nov 17, 2023
1 parent 1f9c03a commit 0de2bfd
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 3 deletions.
5 changes: 5 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,9 +1400,11 @@ def remove_if_complete(self, itask):
- retain and recompute runahead
(C7 failed tasks don't count toward runahead limit)
"""
ret = False
if cylc.flow.flags.cylc7_back_compat:
if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'finished')
ret = True
if self.compute_runahead():
self.release_runahead_tasks()
else:
Expand All @@ -1416,11 +1418,14 @@ def remove_if_complete(self, itask):
else:
# Remove as completed.
self.remove(itask, 'finished')
ret = True
if itask.identity == self.stop_task_id:
self.stop_task_finished = True
if self.compute_runahead():
self.release_runahead_tasks()

return ret

def spawn_on_all_outputs(
self, itask: TaskProxy, completed_only: bool = False
) -> None:
Expand Down
142 changes: 142 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
import pytest
from shutil import rmtree
from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union

from cylc.flow.config import WorkflowConfig
Expand All @@ -32,8 +33,10 @@
install as cylc_install,
get_option_parser as install_gop
)
from cylc.flow.util import serialise
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.workflow_status import StopMode

from .utils import _rm_if_empty
from .utils.flow_tools import (
Expand Down Expand Up @@ -473,3 +476,142 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner


@pytest.fixture
def reflog(monkeypatch):
"""Integration test version of the --reflog CLI option.
This returns a set which captures task triggers.
Note, you'll need to call this on the scheduler *after* you have started
it.
Args:
schd:
The scheduler to capture triggering information for.
flow_nums:
If True, the flow numbers of the task being triggered will be added
to the end of each entry.
Returns:
tuple
(task, triggers):
If flow_nums == False
(task, flow_nums, triggers):
If flow_nums == True
task:
The [relative] task ID e.g. "1/a".
flow_nums:
The serialised flow nums e.g. ["1"].
triggers:
Sorted tuple of the trigger IDs, e.g. ("1/a", "2/b").
"""

def _reflog(schd, flow_nums=False):
nonlocal monkeypatch

submit_task_jobs = schd.task_job_mgr.submit_task_jobs
triggers = set()

def _submit_task_jobs(*args, **kwargs):
nonlocal submit_task_jobs, triggers, flow_nums
itasks = submit_task_jobs(*args, **kwargs)
for itask in itasks:
deps = tuple(sorted(itask.state.get_resolved_dependencies()))
if flow_nums:
triggers.add(
(itask.identity, serialise(itask.flow_nums), deps or None)
)
else:
triggers.add((itask.identity, deps or None))
return itasks

schd.task_job_mgr.submit_task_jobs = _submit_task_jobs

return triggers

return _reflog


@pytest.fixture
def complete():
"""Wait for the workflow, or tasks within it to complete.
Args:
schd:
The scheduler to await.
tokens_list:
If specified, this will wait for the tasks represented by these
tokens to be marked as completed by the task pool.
stop_mode:
If tokens_list is not provided, this will wait for the scheduler
to be shutdown with the specified mode (default = AUTO, i.e.
workflow completed normally).
timeout:
Max time to wait for the condition to be met.
Note, if you need to increase this, you might want to rethink your
test.
Note, use this timeout rather than wrapping the complete call with
async_timeout (handles shutdown logic more cleanly).
"""
async def _complete(
schd,
*tokens_list,
stop_mode=StopMode.AUTO,
timeout=60,
):
start_time = time()
tokens_list = [tokens.task for tokens in tokens_list]

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask):
ret = remove_if_complete(itask)
if ret and itask.tokens.task in tokens_list:
tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if tokens_list:
condition = lambda: bool(tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if time() - start_time > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop

return _complete
42 changes: 42 additions & 0 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,45 @@ async def test_db_select(one, start, db_select):
results = db_select(
schd, False, 'task_states', name='one', status='waiting')
assert len(results) == 1


async def test_reflog(flow, scheduler, run, reflog, complete):
"""Test the triggering of tasks.
This is the integration test version of "reftest" in the funtional tests.
It works by capturing the triggers which caused each submission so that
they can be compared with the expected outcome.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': '1',
'final cycle point': '1',
'cycling mode': 'integer',
'graph': {
'P1': '''
a => b => c
x => b => z
b[-P1] => b
'''
}
}
})
schd = scheduler(id_, paused_start=False)

async with run(schd):
triggers = reflog(schd) # Note: add flow_nums=True to capture flows
await complete(schd)

assert triggers == {
# 1/a was triggered by nothing (i.e. it's parentless)
('1/a', None),
# 1/b was triggered by three tasks (note the pre-initial dependency)
('1/b', ('0/b', '1/a', '1/x')),
('1/c', ('1/b',)),
('1/x', None),
('1/z', ('1/b',)),
}
17 changes: 14 additions & 3 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _make_src_flow(src_path, conf):
def _make_flow(
cylc_run_dir: Union[Path, str],
test_dir: Path,
conf: Union[dict, str],
conf: dict,
name: Optional[str] = None,
id_: Optional[str] = None,
) -> str:
Expand All @@ -66,8 +66,19 @@ def _make_flow(
flow_run_dir = (test_dir / name)
flow_run_dir.mkdir(parents=True, exist_ok=True)
id_ = str(flow_run_dir.relative_to(cylc_run_dir))
if isinstance(conf, dict):
conf = flow_config_str(conf)
conf = flow_config_str({
# override the default simulation runtime logic to make
# tasks execute instantly
# NOTE: this is prepended so it can be overwritten
'runtime': {
'root': {
'simulation': {
'default run length': 'PT0S',
},
},
},
**conf,
})
with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
flow_file.write(conf)
return id_
Expand Down

0 comments on commit 0de2bfd

Please sign in to comment.