diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ac0e9f44e31..851ee1e39b7 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1400,9 +1400,11 @@ def remove_if_complete(self, itask): - retain and recompute runahead (C7 failed tasks don't count toward runahead limit) """ + ret = False if cylc.flow.flags.cylc7_back_compat: if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED): self.remove(itask, 'finished') + ret = True if self.compute_runahead(): self.release_runahead_tasks() else: @@ -1416,11 +1418,14 @@ def remove_if_complete(self, itask): else: # Remove as completed. self.remove(itask, 'finished') + ret = True if itask.identity == self.stop_task_id: self.stop_task_finished = True if self.compute_runahead(): self.release_runahead_tasks() + return ret + def spawn_on_all_outputs( self, itask: TaskProxy, completed_only: bool = False ) -> None: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f2f24b09ab5..120d64d24ce 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,6 +20,7 @@ from pathlib import Path import pytest from shutil import rmtree +from time import time from typing import List, TYPE_CHECKING, Set, Tuple, Union from cylc.flow.config import WorkflowConfig @@ -32,8 +33,10 @@ install as cylc_install, get_option_parser as install_gop ) +from cylc.flow.util import serialise from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id +from cylc.flow.workflow_status import StopMode from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -473,3 +476,140 @@ def _inner(source, **kwargs): workflow_id = infer_latest_run_from_id(workflow_id) return workflow_id yield _inner + + +@pytest.fixture +def reflog(): + """Integration test version of the --reflog CLI option. + + This returns a set which captures task triggers. + + Note, you'll need to call this on the scheduler *after* you have started + it. + + Args: + schd: + The scheduler to capture triggering information for. + flow_nums: + If True, the flow numbers of the task being triggered will be added + to the end of each entry. + + Returns: + tuple + + (task, triggers): + If flow_nums == False + (task, flow_nums, triggers): + If flow_nums == True + + task: + The [relative] task ID e.g. "1/a". + flow_nums: + The serialised flow nums e.g. ["1"]. + triggers: + Sorted tuple of the trigger IDs, e.g. ("1/a", "2/b"). + + """ + + def _reflog(schd, flow_nums=False): + submit_task_jobs = schd.task_job_mgr.submit_task_jobs + triggers = set() + + def _submit_task_jobs(*args, **kwargs): + nonlocal submit_task_jobs, triggers, flow_nums + itasks = submit_task_jobs(*args, **kwargs) + for itask in itasks: + deps = tuple(sorted(itask.state.get_resolved_dependencies())) + if flow_nums: + triggers.add( + (itask.identity, serialise(itask.flow_nums), deps or None) + ) + else: + triggers.add((itask.identity, deps or None)) + return itasks + + schd.task_job_mgr.submit_task_jobs = _submit_task_jobs + + return triggers + + return _reflog + + +@pytest.fixture +def complete(): + """Wait for the workflow, or tasks within it to complete. + + Args: + schd: + The scheduler to await. + tokens_list: + If specified, this will wait for the tasks represented by these + tokens to be marked as completed by the task pool. + stop_mode: + If tokens_list is not provided, this will wait for the scheduler + to be shutdown with the specified mode (default = AUTO, i.e. + workflow completed normally). + timeout: + Max time to wait for the condition to be met. + + Note, if you need to increase this, you might want to rethink your + test. + + Note, use this timeout rather than wrapping the complete call with + async_timeout (handles shutdown logic more cleanly). + + """ + async def _complete( + schd, + *tokens_list, + stop_mode=StopMode.AUTO, + timeout=60, + ): + start_time = time() + tokens_list = [tokens.task for tokens in tokens_list] + + # capture task completion + remove_if_complete = schd.pool.remove_if_complete + + def _remove_if_complete(itask): + ret = remove_if_complete(itask) + if ret and itask.tokens.task in tokens_list: + tokens_list.remove(itask.tokens.task) + return ret + + schd.pool.remove_if_complete = _remove_if_complete + + # capture workflow shutdown + set_stop = schd._set_stop + has_shutdown = False + + def _set_stop(mode=None): + nonlocal has_shutdown, stop_mode + if mode == stop_mode: + has_shutdown = True + return set_stop(mode) + else: + set_stop(mode) + raise Exception(f'Workflow bailed with stop mode = {mode}') + + schd._set_stop = _set_stop + + # determine the completion condition + if tokens_list: + condition = lambda: bool(tokens_list) + else: + condition = lambda: bool(not has_shutdown) + + # wait for the condition to be met + while condition(): + # allow the main loop to advance + await asyncio.sleep(0) + if time() - start_time > timeout: + raise Exception( + f'Timeout waiting for {", ".join(map(str, tokens_list))}' + ) + + # restore regular shutdown logic + schd._set_stop = set_stop + + return _complete diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py index 02a1bb0a497..da8b156503d 100644 --- a/tests/integration/test_examples.py +++ b/tests/integration/test_examples.py @@ -229,3 +229,45 @@ async def test_db_select(one, start, db_select): results = db_select( schd, False, 'task_states', name='one', status='waiting') assert len(results) == 1 + + +async def test_reflog(flow, scheduler, run, reflog, complete): + """Test the triggering of tasks. + + This is the integration test version of "reftest" in the funtional tests. + + It works by capturing the triggers which caused each submission so that + they can be compared with the expected outcome. + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'initial cycle point': '1', + 'final cycle point': '1', + 'cycling mode': 'integer', + 'graph': { + 'P1': ''' + a => b => c + x => b => z + b[-P1] => b + ''' + } + } + }) + schd = scheduler(id_, paused_start=False) + + async with run(schd): + triggers = reflog(schd) # Note: add flow_nums=True to capture flows + await complete(schd) + + assert triggers == { + # 1/a was triggered by nothing (i.e. it's parentless) + ('1/a', None), + # 1/b was triggered by three tasks (note the pre-initial dependency) + ('1/b', ('0/b', '1/a', '1/x')), + ('1/c', ('1/b',)), + ('1/x', None), + ('1/z', ('1/b',)), + } diff --git a/tests/integration/tui/test_updater.py b/tests/integration/tui/test_updater.py index b3daac5a328..2d9927ca5eb 100644 --- a/tests/integration/tui/test_updater.py +++ b/tests/integration/tui/test_updater.py @@ -147,7 +147,16 @@ async def test_filters(one_conf, flow, scheduler, run, updater): 'graph': { 'R1': 'a & b & c', } - } + }, + 'runtime': { + # TODO: remove this runtime section in + # https://github.com/cylc/cylc-flow/pull/5721 + 'root': { + 'simulation': { + 'default run length': 'PT1M', + }, + }, + }, }, name='one'), paused_start=True) two = scheduler(flow(one_conf, name='two')) tre = scheduler(flow(one_conf, name='tre')) diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 69cf3e065f1..9270f12ee8d 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -53,7 +53,7 @@ def _make_src_flow(src_path, conf): def _make_flow( cylc_run_dir: Union[Path, str], test_dir: Path, - conf: Union[dict, str], + conf: dict, name: Optional[str] = None, id_: Optional[str] = None, ) -> str: @@ -66,8 +66,19 @@ def _make_flow( flow_run_dir = (test_dir / name) flow_run_dir.mkdir(parents=True, exist_ok=True) id_ = str(flow_run_dir.relative_to(cylc_run_dir)) - if isinstance(conf, dict): - conf = flow_config_str(conf) + conf = flow_config_str({ + # override the default simulation runtime logic to make + # tasks execute instantly + # NOTE: this is prepended so it can be overwritten + 'runtime': { + 'root': { + 'simulation': { + 'default run length': 'PT0S', + }, + }, + }, + **conf, + }) with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file: flow_file.write(conf) return id_