diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 32407d5c70..ee30b3fe78 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1068,16 +1068,16 @@ def remove_tasks( if flow_nums is None: flow_nums = set() # Mapping of task IDs to removed flow numbers: - removed: Dict[str, FlowNums] = {} - not_removed: Set[str] = set() + removed: Dict[Tokens, FlowNums] = {} + not_removed: Set[Tokens] = set() to_kill: List[TaskProxy] = [] for itask in active: fnums_to_remove = itask.match_flows(flow_nums) if not fnums_to_remove: - not_removed.add(itask.identity) + not_removed.add(itask.tokens) continue - removed[itask.identity] = fnums_to_remove + removed[itask.tokens] = fnums_to_remove if fnums_to_remove == itask.flow_nums: # Need to remove the task from the pool. # Spawn next occurrence of xtrigger sequential task (otherwise @@ -1091,17 +1091,18 @@ def remove_tasks( # All the matched tasks (including inactive & applicable active tasks): matched_task_ids = { *removed.keys(), - *(quick_relative_id(cycle, task) for task, cycle in inactive), + *(Tokens(cycle=str(cycle), task=task) for task, cycle in inactive), } - for id_ in matched_task_ids: - point_str, name = id_.split('/', 1) - tdef = self.config.taskdefs[name] + for tokens in matched_task_ids: + tdef = self.config.taskdefs[tokens['task']] # Go through any tasks downstream of this matched task to see if # any need to stand down as a result of this task being removed: for child in set(itertools.chain.from_iterable( - generate_graph_children(tdef, get_point(point_str)).values() + generate_graph_children( + tdef, get_point(tokens['cycle']) + ).values() )): child_itask = self.pool.get_task(child.point, child.name) if not child_itask: @@ -1116,9 +1117,11 @@ def remove_tasks( ): # Unset any prereqs naturally satisfied by these tasks # (do not unset those satisfied by `cylc set --pre`): - if prereq.unset_naturally_satisfied(id_): + if prereq.unset_naturally_satisfied(tokens.relative_id): prereqs_changed = True - removed.setdefault(id_, set()).update(fnums_to_remove) + removed.setdefault(tokens, set()).update( + fnums_to_remove + ) if not prereqs_changed: continue self.data_store_mgr.delta_task_prerequisite(child_itask) @@ -1149,10 +1152,10 @@ def remove_tasks( # Remove the matched tasks from the flows in the DB tables: db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows( - point_str, name, flow_nums + tokens['cycle'], tokens['task'], flow_nums, ) if db_removed_fnums: - removed.setdefault(id_, set()).update(db_removed_fnums) + removed.setdefault(tokens, set()).update(db_removed_fnums) if to_kill: self.kill_tasks(to_kill, warn=False) @@ -1160,9 +1163,11 @@ def remove_tasks( if removed: tasks_str_list = [] for task, fnums in removed.items(): - self.data_store_mgr.delta_remove_task_flow_nums(task, fnums) + self.data_store_mgr.delta_remove_task_flow_nums( + task.relative_id, fnums + ) tasks_str_list.append( - f"{task} {repr_flow_nums(fnums, full=True)}" + f"{task.relative_id} {repr_flow_nums(fnums, full=True)}" ) LOG.info(f"Removed task(s): {', '.join(sorted(tasks_str_list))}") @@ -1171,10 +1176,10 @@ def remove_tasks( fnums_str = ( repr_flow_nums(flow_nums, full=True) if flow_nums else '' ) - LOG.warning( - "Task(s) not removable: " - f"{', '.join(sorted(not_removed))} {fnums_str}" + tasks_str = ', '.join( + sorted(tokens.relative_id for tokens in not_removed) ) + LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}") if removed and self.pool.compute_runahead(): self.pool.release_runahead_tasks()