Skip to content

Commit

Permalink
Trigger and set: fix default flow assignment for n=0 tasks. (#6367)
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver authored Oct 15, 2024
1 parent da7700d commit e1741c5
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 177 deletions.
1 change: 1 addition & 0 deletions changes.d/6367.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default.
25 changes: 16 additions & 9 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)
Expand All @@ -39,40 +39,47 @@
def flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.
Note the schema defaults flows to ["all"].
Note the schema defaults flows to [].
Examples:
Good:
>>> flow_opts([], False)
>>> flow_opts(["new"], False)
>>> flow_opts(["1", "2"], False)
>>> flow_opts(["1", "2"], True)
Bad:
>>> flow_opts(["none", "1"], False)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... must all be integer valued
cylc.flow.exceptions.InputError: Cannot combine --flow=none with other
flow values
>>> flow_opts(["cheese", "2"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... or 'all', 'new', or 'none'
>>> flow_opts(["new"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
cylc.flow.exceptions.InputError: --wait is not compatible with
--flow=new or --flow=none
"""
if not flows:
return

flows = [val.strip() for val in flows]

for val in flows:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}:
if len(flows) != 1:
raise InputError(ERR_OPT_FLOW_INT)
raise InputError(ERR_OPT_FLOW_COMBINE.format(val))
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))
raise InputError(ERR_OPT_FLOW_VAL)

if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]:
if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}:
raise InputError(ERR_OPT_FLOW_WAIT)


Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ def apply_task_proxy_db_history(self):
itask, is_parent = self.db_load_task_proxies[relative_id]
itask.submit_num = submit_num
flow_nums = deserialise_set(flow_nums_str)
# Do not set states and outputs for future tasks in flow.
# Do not set states and outputs for inactive tasks in flow.
if (
itask.flow_nums and
flow_nums != itask.flow_nums and
Expand Down
23 changes: 13 additions & 10 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,17 +1998,20 @@ class Arguments:
class FlowMutationArguments:
flow = graphene.List(
graphene.NonNull(Flow),
default_value=[FLOW_ALL],
default_value=[],
description=sstrip(f'''
The flow(s) to trigger these tasks in.
This should be a list of flow numbers OR a single-item list
containing one of the following three strings:
* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
The flow(s) to trigger/set these tasks in.
By default:
* active tasks (n=0) keep their existing flow assignment
* inactive tasks (n>0) get assigned all active flows
Otherwise you can assign (inactive tasks) or add to (active tasks):
* a list of integer flow numbers
or one of the following strings:
* {FLOW_ALL} - all active flows
* {FLOW_NEW} - an automatically generated new flow number
* {FLOW_NONE} - (ignored for active tasks): no flow
''')
)
flow_wait = Boolean(
Expand Down
24 changes: 16 additions & 8 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@

"""cylc trigger [OPTIONS] ARGS
Force tasks to run despite unsatisfied prerequisites.
Force tasks to run regardless of prerequisites.
* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
Incomplete and active-waiting tasks in the n=0 window already belong to a flow.
Triggering them queues them to run (or rerun) in the same flow.
Beyond n=0, triggered tasks get all current active flow numbers by default, or
specified flow numbers via the --flow option. Those flows - if/when they catch
up - will see tasks that ran after triggering event as having run already.
Examples:
# trigger task foo in cycle 1234 in test
$ cylc trigger test//1234/foo
Expand All @@ -39,6 +32,21 @@
# start a new flow by triggering 1234/foo in test
$ cylc trigger --flow=new test//1234/foo
Flows:
Active tasks (in the n=0 window) already belong to a flow.
* by default, if triggered, they run in the same flow
* or with --flow=all, they are assigned all active flows
* or with --flow=INT or --flow=new, the original and new flows are merged
* (--flow=none is ignored for active tasks)
Inactive tasks (n>0) do not already belong to a flow.
* by default they are assigned all active flows
* otherwise, they are assigned the --flow value
Note --flow=new increments the global flow counter with each use. If it
takes multiple commands to start a new flow use the actual flow number
after the first command (you can read it from the scheduler log).
"""

from functools import partial
Expand Down
117 changes: 54 additions & 63 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,35 +1290,35 @@ def set_hold_point(self, point: 'PointBase') -> None:
def hold_tasks(self, items: Iterable[str]) -> int:
"""Hold tasks with IDs matching the specified items."""
# Hold active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
warn=False,
future=True,
)
for itask in itasks:
self.hold_active_task(itask)
# Set future tasks to be held:
for name, cycle in future_tasks:
# Set inactive tasks to be held:
for name, cycle in inactive_tasks:
self.data_store_mgr.delta_task_held((name, cycle, True))
self.tasks_to_hold.update(future_tasks)
self.tasks_to_hold.update(inactive_tasks)
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")
return len(unmatched)

def release_held_tasks(self, items: Iterable[str]) -> int:
"""Release held tasks with IDs matching any specified items."""
# Release active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
warn=False,
future=True,
)
for itask in itasks:
self.release_held_active_task(itask)
# Unhold future tasks:
for name, cycle in future_tasks:
# Unhold inactive tasks:
for name, cycle in inactive_tasks:
self.data_store_mgr.delta_task_held((name, cycle, False))
self.tasks_to_hold.difference_update(future_tasks)
self.tasks_to_hold.difference_update(inactive_tasks)
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")
return len(unmatched)
Expand Down Expand Up @@ -1895,7 +1895,7 @@ def set_prereqs_and_outputs(
Task matching restrictions (for now):
- globs (cycle and name) only match in the pool
- future tasks must be specified individually
- inactive tasks must be specified individually
- family names are not expanded to members
Uses a transient task proxy to spawn children. (Even if parent was
Expand All @@ -1916,27 +1916,28 @@ def set_prereqs_and_outputs(
flow_descr: description of new flow
"""
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
# Illegal flow command opts
return

# Get matching pool tasks and future task definitions.
itasks, future_tasks, unmatched = self.filter_task_proxies(
# Get matching pool tasks and inactive task definitions.
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

flow_nums = self._get_flow_nums(flow, flow_descr)

# Set existing task proxies.
for itask in itasks:
# Existing task proxies.
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
else:
self._set_outputs_itask(itask, outputs)

for name, point in future_tasks:
# Spawn and set inactive tasks.
if not flow:
# default: assign to all active flows
flow_nums = self._get_active_flow_nums()
for name, point in inactive_tasks:
tdef = self.config.get_taskdef(name)
if prereqs:
self._set_prereqs_tdef(
Expand Down Expand Up @@ -2023,7 +2024,7 @@ def _set_prereqs_itask(
def _set_prereqs_tdef(
self, point, taskdef, prereqs, flow_nums, flow_wait
):
"""Spawn a future task and set prerequisites on it."""
"""Spawn an inactive task and set prerequisites on it."""

itask = self.spawn_task(
taskdef.name, point, flow_nums, flow_wait=flow_wait
Expand Down Expand Up @@ -2073,38 +2074,30 @@ def remove_tasks(self, items):
return len(bad_items)

def _get_flow_nums(
self,
flow: List[str],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return None
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
elif flow[0] == FLOW_NONE:
flow_nums = set()
else:
try:
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
for n in flow
}
except ValueError:
LOG.warning(
f"Ignoring command: illegal flow values {flow}"
)
return None
return flow_nums
self,
flow: List[str],
meta: Optional[str] = None,
) -> Set[int]:
"""Return flow numbers corresponding to user command options.
Arg should have been validated already during command validation.
In the default case (--flow option not provided), stick with the
existing flows (so return empty set) - NOTE this only applies for
active tasks.
"""
if flow == [FLOW_NONE]:
return set()
if flow == [FLOW_ALL]:
return self._get_active_flow_nums()
if flow == [FLOW_NEW]:
return {self.flow_mgr.get_flow_num(meta=meta)}
# else specific flow numbers:
return {
self.flow_mgr.get_flow_num(flow_num=int(n), meta=meta)
for n in flow
}

def _force_trigger(self, itask):
"""Assumes task is in the pool"""
Expand Down Expand Up @@ -2167,30 +2160,28 @@ def force_trigger_tasks(
unless flow-wait is set.
"""
# Get flow numbers for the tasks to be triggered.
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
return

# Get matching tasks proxies, and matching future task IDs.
# Get matching tasks proxies, and matching inactive task IDs.
existing_tasks, future_ids, unmatched = self.filter_task_proxies(
items, future=True, warn=False,
)

# Trigger existing tasks.
flow_nums = self._get_flow_nums(flow, flow_descr)

# Trigger active tasks.
for itask in existing_tasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
self._force_trigger(itask)

# Spawn and trigger future tasks.
# Spawn and trigger inactive tasks.
if not flow:
# default: assign to all active flows
flow_nums = self._get_active_flow_nums()
for name, point in future_ids:

if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)
Expand Down Expand Up @@ -2331,13 +2322,13 @@ def filter_task_proxies(
)
future_matched: 'Set[Tuple[str, PointBase]]' = set()
if future and unmatched:
future_matched, unmatched = self.match_future_tasks(
future_matched, unmatched = self.match_inactive_tasks(
unmatched
)

return matched, future_matched, unmatched

def match_future_tasks(
def match_inactive_tasks(
self,
ids: Iterable[str],
) -> Tuple[Set[Tuple[str, 'PointBase']], List[str]]:
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-set/04-switch/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Set outputs of future task to direct the flow at an optional branch point.
# Set outputs of inactive task to direct the flow at an optional branch point.

[scheduler]
[[events]]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-set/05-expire/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Expire a future task, so it won't run.
# Expire an inactive task, so it won't run.

[scheduler]
[[events]]
Expand Down
Loading

0 comments on commit e1741c5

Please sign in to comment.