diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 53bba0e0756..0a0e91e4e39 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1349,6 +1349,8 @@ def spawn_on_output(self, itask, output, forced=False): def remove_if_complete(self, itask: TaskProxy) -> bool: """Remove a finished task if required outputs are complete. + Return True if removed else False. + Cylc 8: - if complete: - remove task and recompute runahead @@ -1363,39 +1365,41 @@ def remove_if_complete(self, itask: TaskProxy) -> bool: (C7 failed tasks don't count toward runahead limit) """ - ret = False - if not itask.state(*TASK_STATUSES_FINAL): - return ret + return False + + if itask.identity == self.stop_task_id: + self.stop_task_finished = True if cylc.flow.flags.cylc7_back_compat: + ret = False if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED): self.remove(itask) ret = True + # Recompute runhead either way; failed tasks don't count in C7. + if self.compute_runahead(): + self.release_runahead_tasks() return ret if itask.state(TASK_STATUS_EXPIRED): - reason = "expired" - else: - incomplete = itask.state.outputs.get_incomplete() - if incomplete: - # Retain as an incomplete task. - LOG.warning( - f"[{itask}] did not complete required outputs:" - f" {incomplete}" - ) - return ret - reason = None + self.remove(itask, "expired") + if self.compute_runahead(): + self.release_runahead_tasks() + return True - if itask.identity == self.stop_task_id: - self.stop_task_finished = True + incomplete = itask.state.outputs.get_incomplete() + if incomplete: + # Retain as an incomplete task. + LOG.warning( + f"[{itask}] did not complete required outputs:" + f" {incomplete}" + ) + return False - self.remove(itask, reason) - ret = True + self.remove(itask) if self.compute_runahead(): self.release_runahead_tasks() - - return ret + return True def spawn_on_all_outputs( self, itask: TaskProxy, completed_only: bool = False