From 49af4060989de575586d662992cdd5c51351c5ca Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 14 Oct 2024 16:43:26 +0100 Subject: [PATCH] pool: update DB after removing a task * Closes https://github.com/cylc/cylc-flow/issues/6315 * If the DB is not updated after a task is removed, then it can be respawned in its previous state as the result of upstream output completion. --- cylc/flow/task_pool.py | 5 +++ tests/integration/test_task_pool.py | 55 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 082745e43e9..ff75eb67d7d 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -870,6 +870,11 @@ def remove(self, itask, reason=None): msg += " - active job orphaned" LOG.log(level, f"[{itask}] {msg}") + + # ensure this task is written to the DB before moving on + # https://github.com/cylc/cylc-flow/issues/6315 + self.workflow_db_mgr.process_queued_ops() + del itask def get_tasks(self) -> List[TaskProxy]: diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 1064edb874a..35ead99264f 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2188,3 +2188,58 @@ async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type): # the task should also have been removed from the queue assert not schd.pool.task_queue_mgr.remove_task(itask) + + +async def test_downstream_complete_before_upstream( + flow, scheduler, start, db_select +): + """It should handle an upstream task completing before a downstream task. + + See https://github.com/cylc/cylc-flow/issues/6315 + """ + id_ = flow( + { + 'scheduling': { + 'graph': { + 'R1': 'a => b', + }, + }, + } + ) + schd = scheduler(id_) + async with start(schd): + # 1/a should be pre-spawned (parentless) + a_1 = schd.pool.get_task(IntegerPoint('1'), 'a') + assert a_1 + + # spawn 1/b (this can happens as the result of request e.g. trigger) + b_1 = schd.pool.spawn_task('b', IntegerPoint('1'), {1}) + schd.pool.add_to_pool(b_1) + assert b_1 + + # mark 1/b as succeeded + schd.task_events_mgr.process_message(b_1, 'INFO', 'succeeded') + + # 1/b should be removed from the pool (completed) + assert schd.pool.get_tasks() == [a_1] + + # as a side effect the DB should have been updated + assert ( + TASK_OUTPUT_SUCCEEDED + in db_select( + schd, + # "False" means "do not run the DB update before checking it" + False, # do not change this to "True" + 'task_outputs', + 'outputs', + name='b', + cycle='1', + )[0][0] + ) + + # mark 1/a as succeeded + schd.task_events_mgr.process_message(a_1, 'INFO', 'succeeded') + + # 1/a should be removed from the pool (completed) + # 1/b should not be re-spawned by the success of 1/a + assert schd.pool.get_tasks() == []