Skip to content

Commit

Permalink
pool: update DB after removing a task
Browse files Browse the repository at this point in the history
* Closes #6315
* If the DB is not updated after a task is removed, then it can be
  respawned in its previous state as the result of upstream output
  completion.
  • Loading branch information
oliver-sanders committed Oct 14, 2024
1 parent 6c10575 commit 49af406
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,11 @@ def remove(self, itask, reason=None):
msg += " - active job orphaned"

LOG.log(level, f"[{itask}] {msg}")

# ensure this task is written to the DB before moving on
# https://github.com/cylc/cylc-flow/issues/6315
self.workflow_db_mgr.process_queued_ops()

del itask

def get_tasks(self) -> List[TaskProxy]:
Expand Down
55 changes: 55 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2188,3 +2188,58 @@ async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type):

# the task should also have been removed from the queue
assert not schd.pool.task_queue_mgr.remove_task(itask)


async def test_downstream_complete_before_upstream(
flow, scheduler, start, db_select
):
"""It should handle an upstream task completing before a downstream task.
See https://github.com/cylc/cylc-flow/issues/6315
"""
id_ = flow(
{
'scheduling': {
'graph': {
'R1': 'a => b',
},
},
}
)
schd = scheduler(id_)
async with start(schd):
# 1/a should be pre-spawned (parentless)
a_1 = schd.pool.get_task(IntegerPoint('1'), 'a')
assert a_1

# spawn 1/b (this can happens as the result of request e.g. trigger)
b_1 = schd.pool.spawn_task('b', IntegerPoint('1'), {1})
schd.pool.add_to_pool(b_1)
assert b_1

# mark 1/b as succeeded
schd.task_events_mgr.process_message(b_1, 'INFO', 'succeeded')

# 1/b should be removed from the pool (completed)
assert schd.pool.get_tasks() == [a_1]

# as a side effect the DB should have been updated
assert (
TASK_OUTPUT_SUCCEEDED
in db_select(
schd,
# "False" means "do not run the DB update before checking it"
False, # do not change this to "True"
'task_outputs',
'outputs',
name='b',
cycle='1',
)[0][0]
)

# mark 1/a as succeeded
schd.task_events_mgr.process_message(a_1, 'INFO', 'succeeded')

# 1/a should be removed from the pool (completed)
# 1/b should not be re-spawned by the success of 1/a
assert schd.pool.get_tasks() == []

0 comments on commit 49af406

Please sign in to comment.