Skip to content

Commit

Permalink
Convert from ids to tokens (#6503)
Browse files Browse the repository at this point in the history
Co-authored-by: Ronnie Dutta <[email protected]>
  • Loading branch information
oliver-sanders and MetRonnie authored Dec 3, 2024
1 parent 1e3699a commit b004e54
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
get_user,
is_remote_platform,
)
from cylc.flow.id import Tokens, quick_relative_id
from cylc.flow.id import Tokens
from cylc.flow.log_level import verbosity_to_env, verbosity_to_opts
from cylc.flow.loggingutil import (
ReferenceLogFileHandler,
Expand Down Expand Up @@ -1067,17 +1067,17 @@ def remove_tasks(

if flow_nums is None:
flow_nums = set()
# Mapping of task IDs to removed flow numbers:
removed: Dict[str, FlowNums] = {}
not_removed: Set[str] = set()
# Mapping of *relative* task IDs to removed flow numbers:
removed: Dict[Tokens, FlowNums] = {}
not_removed: Set[Tokens] = set()
to_kill: List[TaskProxy] = []

for itask in active:
fnums_to_remove = itask.match_flows(flow_nums)
if not fnums_to_remove:
not_removed.add(itask.identity)
not_removed.add(itask.tokens.task)
continue
removed[itask.identity] = fnums_to_remove
removed[itask.tokens.task] = fnums_to_remove
if fnums_to_remove == itask.flow_nums:
# Need to remove the task from the pool.
# Spawn next occurrence of xtrigger sequential task (otherwise
Expand All @@ -1089,19 +1089,20 @@ def remove_tasks(
itask.flow_nums.difference_update(fnums_to_remove)

# All the matched tasks (including inactive & applicable active tasks):
matched_task_ids = {
matched_tasks = {
*removed.keys(),
*(quick_relative_id(cycle, task) for task, cycle in inactive),
*(Tokens(cycle=str(cycle), task=task) for task, cycle in inactive),
}

for id_ in matched_task_ids:
point_str, name = id_.split('/', 1)
tdef = self.config.taskdefs[name]
for tokens in matched_tasks:
tdef = self.config.taskdefs[tokens['task']]

# Go through any tasks downstream of this matched task to see if
# any need to stand down as a result of this task being removed:
for child in set(itertools.chain.from_iterable(
generate_graph_children(tdef, get_point(point_str)).values()
generate_graph_children(
tdef, get_point(tokens['cycle'])
).values()
)):
child_itask = self.pool.get_task(child.point, child.name)
if not child_itask:
Expand All @@ -1116,9 +1117,11 @@ def remove_tasks(
):
# Unset any prereqs naturally satisfied by these tasks
# (do not unset those satisfied by `cylc set --pre`):
if prereq.unset_naturally_satisfied(id_):
if prereq.unset_naturally_satisfied(tokens.relative_id):
prereqs_changed = True
removed.setdefault(id_, set()).update(fnums_to_remove)
removed.setdefault(tokens, set()).update(
fnums_to_remove
)
if not prereqs_changed:
continue
self.data_store_mgr.delta_task_prerequisite(child_itask)
Expand All @@ -1135,7 +1138,7 @@ def remove_tasks(
# Check if downstream task should remain spawned:
if (
# Ignoring tasks we are already dealing with:
child_itask.identity in matched_task_ids
child_itask.tokens.task in matched_tasks
or child_itask.state.any_satisfied_prerequisite_outputs()
):
continue
Expand All @@ -1149,32 +1152,34 @@ def remove_tasks(

# Remove the matched tasks from the flows in the DB tables:
db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows(
point_str, name, flow_nums
tokens['cycle'], tokens['task'], flow_nums,
)
if db_removed_fnums:
removed.setdefault(id_, set()).update(db_removed_fnums)
removed.setdefault(tokens, set()).update(db_removed_fnums)

if to_kill:
self.kill_tasks(to_kill, warn=False)

if removed:
tasks_str_list = []
for task, fnums in removed.items():
self.data_store_mgr.delta_remove_task_flow_nums(task, fnums)
self.data_store_mgr.delta_remove_task_flow_nums(
task.relative_id, fnums
)
tasks_str_list.append(
f"{task} {repr_flow_nums(fnums, full=True)}"
f"{task.relative_id} {repr_flow_nums(fnums, full=True)}"
)
LOG.info(f"Removed task(s): {', '.join(sorted(tasks_str_list))}")

not_removed.update(matched_task_ids.difference(removed))
not_removed.update(matched_tasks.difference(removed))
if not_removed:
fnums_str = (
repr_flow_nums(flow_nums, full=True) if flow_nums else ''
)
LOG.warning(
"Task(s) not removable: "
f"{', '.join(sorted(not_removed))} {fnums_str}"
tasks_str = ', '.join(
sorted(tokens.relative_id for tokens in not_removed)
)
LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}")

if removed and self.pool.compute_runahead():
self.pool.release_runahead_tasks()
Expand Down

0 comments on commit b004e54

Please sign in to comment.