From a6cf7d43a6c5a671e135fe8a645767eedb09ef9a Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 8 Jun 2023 22:32:40 +1200 Subject: [PATCH 1/5] Remove 'removed' tasks from queues as well as pool. --- cylc/flow/task_pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index aab8067e7a2..e7d85f66938 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -749,6 +749,7 @@ def remove(self, itask, reason=""): if not self.hidden_pool[itask.point]: del self.hidden_pool[itask.point] LOG.debug(f"[{itask}] {msg}") + self.task_queue_mgr.remove_task(itask) return try: @@ -759,9 +760,9 @@ def remove(self, itask, reason=""): self.main_pool_changed = True if not self.main_pool[itask.point]: del self.main_pool[itask.point] - self.task_queue_mgr.remove_task(itask) - if itask.tdef.max_future_prereq_offset is not None: - self.set_max_future_offset() + self.task_queue_mgr.remove_task(itask) + if itask.tdef.max_future_prereq_offset is not None: + self.set_max_future_offset() # Notify the data-store manager of their removal # (the manager uses window boundary tracking for pruning). From 7759ab267cfad9196078d7a2c5f0766877eb0df0 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 17 Jul 2023 13:29:38 +0100 Subject: [PATCH 2/5] Update change log --- CHANGES.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index a07d43ed979..92020dc48c2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,7 +23,7 @@ issue which could cause jobs to fail if this variable became too long. [#5992](https://github.com/cylc/cylc-flow/pull/5992) - Before trying to reload the workflow definition, the scheduler will -now wait for preparing tasks to submit, and pause the workflow. +now wait for preparing tasks to submit, and pause the workflow. After successful reload the scheduler will unpause the workflow. -[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining @@ -96,6 +96,9 @@ Fix a bug in Cylc 7 compatibility mode where tasks running in the `none` flow Fix a possible issue where workflows started using `cylc play --start-cycle-point` could hang during startup. +[#5573](https://github.com/cylc/cylc-flow/pull/5573) - Fix bug that ran a +queued waiting task even after removal by `cylc remove`. + [#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps for `cylc play` when called by `cylc vip` or `cylc vr`. From 39a645fa169eb3aacd1e6e717e58bda263197c66 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 17 Jul 2023 14:54:21 +0100 Subject: [PATCH 3/5] added test --- tests/integration/test_task_pool.py | 40 +++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 545d96a1aa4..834e536e74d 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1117,3 +1117,43 @@ async def test_no_flow_tasks_dont_spawn( ] for itask in pool ] == pool + +async def test_task_proxy_remove_from_queues( + flow, one_conf, scheduler, start, +): + """TaskPool.remove deletes task proxies from queues.""" + # Set up a scheduler with a non-default queue: + one_conf['scheduling'] = { + 'queues': {'queue_two': {'members': 'one, control'}}, + 'graph': {'R1': 'two & one & hidden & control & hidden_control'}, + } + schd = scheduler(flow(one_conf)) + async with start(schd): + # Get a list of itasks: + itasks = schd.pool.get_tasks() + point = itasks[0].point + + for itask in itasks: + id_ = itask.identity + + # Move some tasks to the hidden_pool to ensure that these are + # removed too: + if 'hidden' in itask.identity: + schd.pool.hidden_pool.setdefault(point, {}) + schd.pool.hidden_pool[point][itask.identity] = itask + schd.pool.hidden_pool[ + point][id_] = schd.pool.main_pool[point][id_] + del schd.pool.main_pool[point][id_] + + # The meat of the test - remove itask from pool if it + # doesn't have "control" in the name: + if 'control' not in id_: + schd.pool.remove(itask) + + # Look at the queues afterwards: + queues_after = { + name: [itask.identity for itask in queue.deque] + for name, queue in schd.pool.task_queue_mgr.queues.items()} + + assert queues_after['default'] == ['1/hidden_control'] + assert queues_after['queue_two'] == ['1/control'] From 57c36500a6b0e987228174b11a6cd96f5255ca8d Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 17 Jul 2023 15:45:29 +0100 Subject: [PATCH 4/5] Update tests/integration/test_task_pool.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- tests/integration/test_task_pool.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 834e536e74d..8fac4e036e0 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1139,10 +1139,7 @@ async def test_task_proxy_remove_from_queues( # Move some tasks to the hidden_pool to ensure that these are # removed too: if 'hidden' in itask.identity: - schd.pool.hidden_pool.setdefault(point, {}) - schd.pool.hidden_pool[point][itask.identity] = itask - schd.pool.hidden_pool[ - point][id_] = schd.pool.main_pool[point][id_] + schd.pool.hidden_pool.setdefault(point, {id_: itask}) del schd.pool.main_pool[point][id_] # The meat of the test - remove itask from pool if it From 76838842976861527b84c28796f0cde81b994c0c Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 18 Jul 2023 14:21:24 +0100 Subject: [PATCH 5/5] Update tests/integration/test_task_pool.py Co-authored-by: Oliver Sanders --- tests/integration/test_task_pool.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 8fac4e036e0..c6f34c0adf0 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1121,7 +1121,10 @@ async def test_no_flow_tasks_dont_spawn( async def test_task_proxy_remove_from_queues( flow, one_conf, scheduler, start, ): - """TaskPool.remove deletes task proxies from queues.""" + """TaskPool.remove should delete task proxies from queues. + + See https://github.com/cylc/cylc-flow/pull/5573 + """ # Set up a scheduler with a non-default queue: one_conf['scheduling'] = { 'queues': {'queue_two': {'members': 'one, control'}},