Skip to content

Commit

Permalink
Merge pull request cylc#5573 from hjoliver/remove-removed-tasks-from-…
Browse files Browse the repository at this point in the history
…queues

Remove 'removed' tasks from queues as well as task pool.
  • Loading branch information
wxtim authored Jul 18, 2023
2 parents 7dc5e41 + 7683884 commit 69d5905
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ issue which could cause jobs to fail if this variable became too long.

[#5992](https://github.com/cylc/cylc-flow/pull/5992) -
Before trying to reload the workflow definition, the scheduler will
now wait for preparing tasks to submit, and pause the workflow.
now wait for preparing tasks to submit, and pause the workflow.
After successful reload the scheduler will unpause the workflow.

-[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining
Expand Down Expand Up @@ -96,6 +96,9 @@ Fix a bug in Cylc 7 compatibility mode where tasks running in the `none` flow
Fix a possible issue where workflows started using
`cylc play --start-cycle-point` could hang during startup.

[#5573](https://github.com/cylc/cylc-flow/pull/5573) - Fix bug that ran a
queued waiting task even after removal by `cylc remove`.

[#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps
for `cylc play` when called by `cylc vip` or `cylc vr`.

Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def remove(self, itask, reason=""):
if not self.hidden_pool[itask.point]:
del self.hidden_pool[itask.point]
LOG.debug(f"[{itask}] {msg}")
self.task_queue_mgr.remove_task(itask)
return

try:
Expand All @@ -759,9 +760,9 @@ def remove(self, itask, reason=""):
self.main_pool_changed = True
if not self.main_pool[itask.point]:
del self.main_pool[itask.point]
self.task_queue_mgr.remove_task(itask)
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()
self.task_queue_mgr.remove_task(itask)
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()

# Notify the data-store manager of their removal
# (the manager uses window boundary tracking for pruning).
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,3 +1117,43 @@ async def test_no_flow_tasks_dont_spawn(
]
for itask in pool
] == pool

async def test_task_proxy_remove_from_queues(
flow, one_conf, scheduler, start,
):
"""TaskPool.remove should delete task proxies from queues.
See https://github.com/cylc/cylc-flow/pull/5573
"""
# Set up a scheduler with a non-default queue:
one_conf['scheduling'] = {
'queues': {'queue_two': {'members': 'one, control'}},
'graph': {'R1': 'two & one & hidden & control & hidden_control'},
}
schd = scheduler(flow(one_conf))
async with start(schd):
# Get a list of itasks:
itasks = schd.pool.get_tasks()
point = itasks[0].point

for itask in itasks:
id_ = itask.identity

# Move some tasks to the hidden_pool to ensure that these are
# removed too:
if 'hidden' in itask.identity:
schd.pool.hidden_pool.setdefault(point, {id_: itask})
del schd.pool.main_pool[point][id_]

# The meat of the test - remove itask from pool if it
# doesn't have "control" in the name:
if 'control' not in id_:
schd.pool.remove(itask)

# Look at the queues afterwards:
queues_after = {
name: [itask.identity for itask in queue.deque]
for name, queue in schd.pool.task_queue_mgr.queues.items()}

assert queues_after['default'] == ['1/hidden_control']
assert queues_after['queue_two'] == ['1/control']

0 comments on commit 69d5905

Please sign in to comment.