diff --git a/.github/workflows/test_fast.yml b/.github/workflows/test_fast.yml
index 5fb95def4e8..6530bb85cd6 100644
--- a/.github/workflows/test_fast.yml
+++ b/.github/workflows/test_fast.yml
@@ -20,12 +20,22 @@ jobs:
fail-fast: false # don't stop on first failure
matrix:
os: ['ubuntu-latest']
- python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3']
+ python-version: ['3.7', '3.8', '3.10', '3.11', '3']
include:
+ # mac os test
- os: 'macos-latest'
- python-version: '3.7'
+ python-version: '3.7' # oldest supported version
+
+ # non-utc timezone test
+ - os: 'ubuntu-latest'
+ python-version: '3.9' # not the oldest, not the most recent version
+ time-zone: 'XXX-09:35'
+
env:
+ # Use non-UTC time zone
+ TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes
+
steps:
- name: Checkout
uses: actions/checkout@v4
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index ac0e9f44e31..851ee1e39b7 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -1400,9 +1400,11 @@ def remove_if_complete(self, itask):
- retain and recompute runahead
(C7 failed tasks don't count toward runahead limit)
"""
+ ret = False
if cylc.flow.flags.cylc7_back_compat:
if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'finished')
+ ret = True
if self.compute_runahead():
self.release_runahead_tasks()
else:
@@ -1416,11 +1418,14 @@ def remove_if_complete(self, itask):
else:
# Remove as completed.
self.remove(itask, 'finished')
+ ret = True
if itask.identity == self.stop_task_id:
self.stop_task_finished = True
if self.compute_runahead():
self.release_runahead_tasks()
+ return ret
+
def spawn_on_all_outputs(
self, itask: TaskProxy, completed_only: bool = False
) -> None:
diff --git a/pytest.ini b/pytest.ini
index 81df3785cec..9be86cb507c 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -32,13 +32,10 @@ testpaths =
cylc/flow/
tests/unit/
tests/integration/
-env =
- # a weird timezone to check that tests aren't assuming the local timezone
- TZ=XXX-09:35
doctest_optionflags =
NORMALIZE_WHITESPACE
IGNORE_EXCEPTION_DETAIL
ELLIPSIS
asyncio_mode = auto
markers=
- linkcheck: Test links
\ No newline at end of file
+ linkcheck: Test links
diff --git a/setup.cfg b/setup.cfg
index 53b2024a9a3..9a3838de1dc 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -121,7 +121,6 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
- pytest-env>=0.6.2
pytest>=6
testfixtures>=6.11.0
towncrier>=23
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index f2f24b09ab5..120d64d24ce 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -20,6 +20,7 @@
from pathlib import Path
import pytest
from shutil import rmtree
+from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union
from cylc.flow.config import WorkflowConfig
@@ -32,8 +33,10 @@
install as cylc_install,
get_option_parser as install_gop
)
+from cylc.flow.util import serialise
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
+from cylc.flow.workflow_status import StopMode
from .utils import _rm_if_empty
from .utils.flow_tools import (
@@ -473,3 +476,140 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner
+
+
+@pytest.fixture
+def reflog():
+ """Integration test version of the --reflog CLI option.
+
+ This returns a set which captures task triggers.
+
+ Note, you'll need to call this on the scheduler *after* you have started
+ it.
+
+ Args:
+ schd:
+ The scheduler to capture triggering information for.
+ flow_nums:
+ If True, the flow numbers of the task being triggered will be added
+ to the end of each entry.
+
+ Returns:
+ tuple
+
+ (task, triggers):
+ If flow_nums == False
+ (task, flow_nums, triggers):
+ If flow_nums == True
+
+ task:
+ The [relative] task ID e.g. "1/a".
+ flow_nums:
+ The serialised flow nums e.g. ["1"].
+ triggers:
+ Sorted tuple of the trigger IDs, e.g. ("1/a", "2/b").
+
+ """
+
+ def _reflog(schd, flow_nums=False):
+ submit_task_jobs = schd.task_job_mgr.submit_task_jobs
+ triggers = set()
+
+ def _submit_task_jobs(*args, **kwargs):
+ nonlocal submit_task_jobs, triggers, flow_nums
+ itasks = submit_task_jobs(*args, **kwargs)
+ for itask in itasks:
+ deps = tuple(sorted(itask.state.get_resolved_dependencies()))
+ if flow_nums:
+ triggers.add(
+ (itask.identity, serialise(itask.flow_nums), deps or None)
+ )
+ else:
+ triggers.add((itask.identity, deps or None))
+ return itasks
+
+ schd.task_job_mgr.submit_task_jobs = _submit_task_jobs
+
+ return triggers
+
+ return _reflog
+
+
+@pytest.fixture
+def complete():
+ """Wait for the workflow, or tasks within it to complete.
+
+ Args:
+ schd:
+ The scheduler to await.
+ tokens_list:
+ If specified, this will wait for the tasks represented by these
+ tokens to be marked as completed by the task pool.
+ stop_mode:
+ If tokens_list is not provided, this will wait for the scheduler
+ to be shutdown with the specified mode (default = AUTO, i.e.
+ workflow completed normally).
+ timeout:
+ Max time to wait for the condition to be met.
+
+ Note, if you need to increase this, you might want to rethink your
+ test.
+
+ Note, use this timeout rather than wrapping the complete call with
+ async_timeout (handles shutdown logic more cleanly).
+
+ """
+ async def _complete(
+ schd,
+ *tokens_list,
+ stop_mode=StopMode.AUTO,
+ timeout=60,
+ ):
+ start_time = time()
+ tokens_list = [tokens.task for tokens in tokens_list]
+
+ # capture task completion
+ remove_if_complete = schd.pool.remove_if_complete
+
+ def _remove_if_complete(itask):
+ ret = remove_if_complete(itask)
+ if ret and itask.tokens.task in tokens_list:
+ tokens_list.remove(itask.tokens.task)
+ return ret
+
+ schd.pool.remove_if_complete = _remove_if_complete
+
+ # capture workflow shutdown
+ set_stop = schd._set_stop
+ has_shutdown = False
+
+ def _set_stop(mode=None):
+ nonlocal has_shutdown, stop_mode
+ if mode == stop_mode:
+ has_shutdown = True
+ return set_stop(mode)
+ else:
+ set_stop(mode)
+ raise Exception(f'Workflow bailed with stop mode = {mode}')
+
+ schd._set_stop = _set_stop
+
+ # determine the completion condition
+ if tokens_list:
+ condition = lambda: bool(tokens_list)
+ else:
+ condition = lambda: bool(not has_shutdown)
+
+ # wait for the condition to be met
+ while condition():
+ # allow the main loop to advance
+ await asyncio.sleep(0)
+ if time() - start_time > timeout:
+ raise Exception(
+ f'Timeout waiting for {", ".join(map(str, tokens_list))}'
+ )
+
+ # restore regular shutdown logic
+ schd._set_stop = set_stop
+
+ return _complete
diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py
index 02a1bb0a497..da8b156503d 100644
--- a/tests/integration/test_examples.py
+++ b/tests/integration/test_examples.py
@@ -229,3 +229,45 @@ async def test_db_select(one, start, db_select):
results = db_select(
schd, False, 'task_states', name='one', status='waiting')
assert len(results) == 1
+
+
+async def test_reflog(flow, scheduler, run, reflog, complete):
+ """Test the triggering of tasks.
+
+ This is the integration test version of "reftest" in the funtional tests.
+
+ It works by capturing the triggers which caused each submission so that
+ they can be compared with the expected outcome.
+ """
+ id_ = flow({
+ 'scheduler': {
+ 'allow implicit tasks': 'True',
+ },
+ 'scheduling': {
+ 'initial cycle point': '1',
+ 'final cycle point': '1',
+ 'cycling mode': 'integer',
+ 'graph': {
+ 'P1': '''
+ a => b => c
+ x => b => z
+ b[-P1] => b
+ '''
+ }
+ }
+ })
+ schd = scheduler(id_, paused_start=False)
+
+ async with run(schd):
+ triggers = reflog(schd) # Note: add flow_nums=True to capture flows
+ await complete(schd)
+
+ assert triggers == {
+ # 1/a was triggered by nothing (i.e. it's parentless)
+ ('1/a', None),
+ # 1/b was triggered by three tasks (note the pre-initial dependency)
+ ('1/b', ('0/b', '1/a', '1/x')),
+ ('1/c', ('1/b',)),
+ ('1/x', None),
+ ('1/z', ('1/b',)),
+ }
diff --git a/tests/integration/tui/screenshots/test_scheduler_logs.workflow-configuration-file.html b/tests/integration/tui/screenshots/test_scheduler_logs.workflow-configuration-file.html
index e3fcdfbab22..8cc2f9af73b 100644
--- a/tests/integration/tui/screenshots/test_scheduler_logs.workflow-configuration-file.html
+++ b/tests/integration/tui/screenshots/test_scheduler_logs.workflow-configuration-file.html
@@ -3,11 +3,11 @@
│ Path: mypath │
│ < Select File > │
│ │
+│ [runtime] │
+│ [[a]] │
│ [scheduling] │
│ [[graph]] │
│ R1 = a │
-│ [runtime] │
-│ [[a]] │
│ │
│ │
│ │
diff --git a/tests/integration/tui/test_updater.py b/tests/integration/tui/test_updater.py
index b3daac5a328..2d9927ca5eb 100644
--- a/tests/integration/tui/test_updater.py
+++ b/tests/integration/tui/test_updater.py
@@ -147,7 +147,16 @@ async def test_filters(one_conf, flow, scheduler, run, updater):
'graph': {
'R1': 'a & b & c',
}
- }
+ },
+ 'runtime': {
+ # TODO: remove this runtime section in
+ # https://github.com/cylc/cylc-flow/pull/5721
+ 'root': {
+ 'simulation': {
+ 'default run length': 'PT1M',
+ },
+ },
+ },
}, name='one'), paused_start=True)
two = scheduler(flow(one_conf, name='two'))
tre = scheduler(flow(one_conf, name='tre'))
diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py
index 69cf3e065f1..9270f12ee8d 100644
--- a/tests/integration/utils/flow_tools.py
+++ b/tests/integration/utils/flow_tools.py
@@ -53,7 +53,7 @@ def _make_src_flow(src_path, conf):
def _make_flow(
cylc_run_dir: Union[Path, str],
test_dir: Path,
- conf: Union[dict, str],
+ conf: dict,
name: Optional[str] = None,
id_: Optional[str] = None,
) -> str:
@@ -66,8 +66,19 @@ def _make_flow(
flow_run_dir = (test_dir / name)
flow_run_dir.mkdir(parents=True, exist_ok=True)
id_ = str(flow_run_dir.relative_to(cylc_run_dir))
- if isinstance(conf, dict):
- conf = flow_config_str(conf)
+ conf = flow_config_str({
+ # override the default simulation runtime logic to make
+ # tasks execute instantly
+ # NOTE: this is prepended so it can be overwritten
+ 'runtime': {
+ 'root': {
+ 'simulation': {
+ 'default run length': 'PT0S',
+ },
+ },
+ },
+ **conf,
+ })
with open((flow_run_dir / WorkflowFiles.FLOW_FILE), 'w+') as flow_file:
flow_file.write(conf)
return id_