Skip to content

Commit

Permalink
convert from ids to tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Nov 28, 2024
1 parent 9ff50f8 commit ada98bf
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,16 +1068,16 @@ def remove_tasks(
if flow_nums is None:
flow_nums = set()
# Mapping of task IDs to removed flow numbers:
removed: Dict[str, FlowNums] = {}
not_removed: Set[str] = set()
removed: Dict[Tokens, FlowNums] = {}
not_removed: Set[Tokens] = set()
to_kill: List[TaskProxy] = []

for itask in active:
fnums_to_remove = itask.match_flows(flow_nums)
if not fnums_to_remove:
not_removed.add(itask.identity)
not_removed.add(itask.tokens)
continue
removed[itask.identity] = fnums_to_remove
removed[itask.tokens] = fnums_to_remove
if fnums_to_remove == itask.flow_nums:
# Need to remove the task from the pool.
# Spawn next occurrence of xtrigger sequential task (otherwise
Expand All @@ -1091,17 +1091,18 @@ def remove_tasks(
# All the matched tasks (including inactive & applicable active tasks):
matched_task_ids = {
*removed.keys(),
*(quick_relative_id(cycle, task) for task, cycle in inactive),
*(Tokens(cycle=str(cycle), task=task) for task, cycle in inactive),
}

for id_ in matched_task_ids:
point_str, name = id_.split('/', 1)
tdef = self.config.taskdefs[name]
for tokens in matched_task_ids:
tdef = self.config.taskdefs[tokens['task']]

# Go through any tasks downstream of this matched task to see if
# any need to stand down as a result of this task being removed:
for child in set(itertools.chain.from_iterable(
generate_graph_children(tdef, get_point(point_str)).values()
generate_graph_children(
tdef, get_point(tokens['cycle'])
).values()
)):
child_itask = self.pool.get_task(child.point, child.name)
if not child_itask:
Expand All @@ -1116,9 +1117,11 @@ def remove_tasks(
):
# Unset any prereqs naturally satisfied by these tasks
# (do not unset those satisfied by `cylc set --pre`):
if prereq.unset_naturally_satisfied(id_):
if prereq.unset_naturally_satisfied(tokens.relative_id):
prereqs_changed = True
removed.setdefault(id_, set()).update(fnums_to_remove)
removed.setdefault(tokens, set()).update(
fnums_to_remove
)
if not prereqs_changed:
continue
self.data_store_mgr.delta_task_prerequisite(child_itask)
Expand Down Expand Up @@ -1149,20 +1152,22 @@ def remove_tasks(

# Remove the matched tasks from the flows in the DB tables:
db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows(
point_str, name, flow_nums
tokens['cycle'], tokens['task'], flow_nums,
)
if db_removed_fnums:
removed.setdefault(id_, set()).update(db_removed_fnums)
removed.setdefault(tokens, set()).update(db_removed_fnums)

if to_kill:
self.kill_tasks(to_kill, warn=False)

if removed:
tasks_str_list = []
for task, fnums in removed.items():
self.data_store_mgr.delta_remove_task_flow_nums(task, fnums)
self.data_store_mgr.delta_remove_task_flow_nums(
task.relative_id, fnums
)
tasks_str_list.append(
f"{task} {repr_flow_nums(fnums, full=True)}"
f"{task.relative_id} {repr_flow_nums(fnums, full=True)}"
)
LOG.info(f"Removed task(s): {', '.join(sorted(tasks_str_list))}")

Expand All @@ -1171,10 +1176,10 @@ def remove_tasks(
fnums_str = (
repr_flow_nums(flow_nums, full=True) if flow_nums else ''
)
LOG.warning(
"Task(s) not removable: "
f"{', '.join(sorted(not_removed))} {fnums_str}"
tasks_str = ', '.join(
sorted(tokens.relative_id for tokens in not_removed)
)
LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}")

if removed and self.pool.compute_runahead():
self.pool.release_runahead_tasks()
Expand Down

0 comments on commit ada98bf

Please sign in to comment.