diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index b4d2d503799..c116e6968a5 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -13,10 +13,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Tests for the behaviour of xtrigger manager. -""" +"""Tests for the behaviour of xtrigger manager.""" -from pytest_mock import mocker +import asyncio +from pathlib import Path +from textwrap import dedent + +from cylc.flow.pathutil import get_workflow_run_dir async def test_2_xtriggers(flow, start, scheduler, monkeypatch): """Test that if an itask has 2 wall_clock triggers with different @@ -118,4 +121,69 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): # resulting in two calls to put_xtriggers. This test fails # on master, but with call count 0 (not 2) because the main # loop doesn't run in this test. - + + +async def test_xtriggers_restart(flow, start, scheduler, db_select): + """It should write xtrigger results to the DB and load them on restart.""" + # define a workflow which uses a custom xtrigger + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'xtriggers': { + 'mytrig': 'mytrig()' + }, + 'graph': { + 'R1': '@mytrig => foo' + }, + } + }) + + # add a custom xtrigger to the workflow + run_dir = Path(get_workflow_run_dir(id_)) + xtrig_dir = run_dir / 'lib/python' + xtrig_dir.mkdir(parents=True) + (xtrig_dir / 'mytrig.py').write_text(dedent(''' + from random import random + + def mytrig(*args, **kwargs): + # return a different random number each time + return True, {"x": str(random())} + ''')) + + # start the workflow & run the xtrigger + schd = scheduler(id_) + async with start(schd): + # run all xtriggers + for task in schd.pool.get_tasks(): + schd.xtrigger_mgr.call_xtriggers_async(task) + # one xtrigger should have been scheduled to run + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 1 + # wait for it to return + for _ in range(50): + await asyncio.sleep(0.1) + schd.proc_pool.process() + if len(schd.proc_pool.runnings) == 0: + break + else: + raise Exception('Process pool did not clear') + + # the xtrigger should be written to the DB + db_xtriggers = db_select(schd, True, 'xtriggers') + assert len(db_xtriggers) == 1 + assert db_xtriggers[0][0] == 'mytrig()' + assert db_xtriggers[0][1].startswith('{"x":') + + # restart the workflow, the xtrigger should *not* run again + schd = scheduler(id_) + async with start(schd): + # run all xtriggers + for task in schd.pool.get_tasks(): + schd.xtrigger_mgr.call_xtriggers_async(task) + # the xtrigger should have been loaded from the DB + # (so no xtriggers should be scheduled to run) + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0 + + # check the DB to ensure no additional entries have been created + assert db_select(schd, True, 'xtriggers') == db_xtriggers