Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger and set: fix default flow assignment for n=0 tasks. #6367

Merged
merged 17 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6367.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default.
25 changes: 16 additions & 9 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)
Expand All @@ -39,40 +39,47 @@
def flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.

Note the schema defaults flows to ["all"].
Note the schema defaults flows to [].

Examples:
Good:
>>> flow_opts([], False)
>>> flow_opts(["new"], False)
>>> flow_opts(["1", "2"], False)
>>> flow_opts(["1", "2"], True)

Bad:
>>> flow_opts(["none", "1"], False)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... must all be integer valued
cylc.flow.exceptions.InputError: Cannot combine --flow=none with other
flow values

>>> flow_opts(["cheese", "2"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... or 'all', 'new', or 'none'

>>> flow_opts(["new"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
cylc.flow.exceptions.InputError: --wait is not compatible with
--flow=new or --flow=none

"""
if not flows:
return

flows = [val.strip() for val in flows]

for val in flows:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}:
if len(flows) != 1:
raise InputError(ERR_OPT_FLOW_INT)
raise InputError(ERR_OPT_FLOW_COMBINE.format(val))
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))
raise InputError(ERR_OPT_FLOW_VAL)

if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]:
if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}:
raise InputError(ERR_OPT_FLOW_WAIT)


Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ def apply_task_proxy_db_history(self):
itask, is_parent = self.db_load_task_proxies[relative_id]
itask.submit_num = submit_num
flow_nums = deserialise_set(flow_nums_str)
# Do not set states and outputs for future tasks in flow.
# Do not set states and outputs for inactive tasks in flow.
if (
itask.flow_nums and
flow_nums != itask.flow_nums and
Expand Down
23 changes: 13 additions & 10 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,17 +1998,20 @@ class Arguments:
class FlowMutationArguments:
flow = graphene.List(
graphene.NonNull(Flow),
default_value=[FLOW_ALL],
default_value=[],
description=sstrip(f'''
The flow(s) to trigger these tasks in.

This should be a list of flow numbers OR a single-item list
containing one of the following three strings:

* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
The flow(s) to trigger/set these tasks in.

By default:
* active tasks (n=0) keep their existing flow assignment
* inactive tasks (n>0) get assigned all active flows
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

Otherwise you can assign (inactive tasks) or add to (active tasks):
* a list of integer flow numbers
or a single-item list containing one of the following strings:
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
* {FLOW_ALL} - all active flows
* {FLOW_NEW} - an automatically generated new flow number
* {FLOW_NONE} - (ignored for active tasks): no flow
''')
)
flow_wait = Boolean(
Expand Down
19 changes: 13 additions & 6 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

"""cylc trigger [OPTIONS] ARGS

Force tasks to run despite unsatisfied prerequisites.
Force tasks to run regardless of prerequisites.

* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).

Incomplete and active-waiting tasks in the n=0 window already belong to a flow.
Triggering them queues them to run (or rerun) in the same flow.
Future tasks (n>0) do not already belong to a flow.
* by default they are assigned to all active flows
* otherwise, according to the --flow option
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

Beyond n=0, triggered tasks get all current active flow numbers by default, or
specified flow numbers via the --flow option. Those flows - if/when they catch
up - will see tasks that ran after triggering event as having run already.
Active tasks (n=0) already belong to a flow.
* by default they run in the same flow
* with --flow=all, they are assigned to all active flows
* with --flow=INT or --flow=new, the new flow merges with the old one
* --flow=none is ignored

Note --flow=new increments the global flow counter so if you need multiple
commands to start a single new flow only use --flow=new in the first command,
then use the actual new flow number (e.g. read it from the scheduler log).
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

Examples:
# trigger task foo in cycle 1234 in test
Expand Down
117 changes: 54 additions & 63 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,35 +1280,35 @@ def set_hold_point(self, point: 'PointBase') -> None:
def hold_tasks(self, items: Iterable[str]) -> int:
"""Hold tasks with IDs matching the specified items."""
# Hold active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
warn=False,
future=True,
)
for itask in itasks:
self.hold_active_task(itask)
# Set future tasks to be held:
for name, cycle in future_tasks:
# Set inactive tasks to be held:
for name, cycle in inactive_tasks:
self.data_store_mgr.delta_task_held((name, cycle, True))
self.tasks_to_hold.update(future_tasks)
self.tasks_to_hold.update(inactive_tasks)
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")
return len(unmatched)

def release_held_tasks(self, items: Iterable[str]) -> int:
"""Release held tasks with IDs matching any specified items."""
# Release active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
warn=False,
future=True,
)
for itask in itasks:
self.release_held_active_task(itask)
# Unhold future tasks:
for name, cycle in future_tasks:
# Unhold inactive tasks:
for name, cycle in inactive_tasks:
self.data_store_mgr.delta_task_held((name, cycle, False))
self.tasks_to_hold.difference_update(future_tasks)
self.tasks_to_hold.difference_update(inactive_tasks)
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")
return len(unmatched)
Expand Down Expand Up @@ -1887,7 +1887,7 @@ def set_prereqs_and_outputs(

Task matching restrictions (for now):
- globs (cycle and name) only match in the pool
- future tasks must be specified individually
- inactive tasks must be specified individually
- family names are not expanded to members

Uses a transient task proxy to spawn children. (Even if parent was
Expand All @@ -1908,27 +1908,28 @@ def set_prereqs_and_outputs(
flow_descr: description of new flow

"""
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
# Illegal flow command opts
return

hjoliver marked this conversation as resolved.
Show resolved Hide resolved
# Get matching pool tasks and future task definitions.
itasks, future_tasks, unmatched = self.filter_task_proxies(
# Get matching pool tasks and inactive task definitions.
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

flow_nums = self._get_flow_nums(flow, flow_descr)

# Set existing task proxies.
for itask in itasks:
# Existing task proxies.
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
else:
self._set_outputs_itask(itask, outputs)

for name, point in future_tasks:
# Spawn and set inactive tasks.
if not flow:
# default: assign to all active flows
flow_nums = self._get_active_flow_nums()
for name, point in inactive_tasks:
tdef = self.config.get_taskdef(name)
if prereqs:
self._set_prereqs_tdef(
Expand Down Expand Up @@ -2015,7 +2016,7 @@ def _set_prereqs_itask(
def _set_prereqs_tdef(
self, point, taskdef, prereqs, flow_nums, flow_wait
):
"""Spawn a future task and set prerequisites on it."""
"""Spawn an inactive task and set prerequisites on it."""

itask = self.spawn_task(taskdef.name, point, flow_nums, flow_wait)
if itask is None:
Expand Down Expand Up @@ -2063,38 +2064,30 @@ def remove_tasks(self, items):
return len(bad_items)

def _get_flow_nums(
self,
flow: List[str],
meta: Optional[str] = None,
) -> Optional[Set[int]]:
"""Get correct flow numbers given user command options."""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return None
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_flow_num(meta=meta)}
elif flow[0] == FLOW_NONE:
flow_nums = set()
else:
try:
flow_nums = {
self.flow_mgr.get_flow_num(
flow_num=int(n), meta=meta
)
for n in flow
}
except ValueError:
LOG.warning(
f"Ignoring command: illegal flow values {flow}"
)
return None
return flow_nums
self,
flow: List[str],
meta: Optional[str] = None,
) -> Set[int]:
"""Return flow numbers corresponding to user command options.

Arg should have been validated already during command validation.

In the default case (--flow option not provided), stick with the
existing flows (so return empty set) - NOTE this only applies for
active tasks.

"""
if flow == [FLOW_NONE]:
return set()
if flow == [FLOW_ALL]:
return self._get_active_flow_nums()
if flow == [FLOW_NEW]:
return {self.flow_mgr.get_flow_num(meta=meta)}
# else specific flow numbers:
return {
self.flow_mgr.get_flow_num(flow_num=int(n), meta=meta)
for n in flow
}

def _force_trigger(self, itask):
"""Assumes task is in the pool"""
Expand Down Expand Up @@ -2157,30 +2150,28 @@ def force_trigger_tasks(
unless flow-wait is set.

"""
# Get flow numbers for the tasks to be triggered.
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
return

# Get matching tasks proxies, and matching future task IDs.
# Get matching tasks proxies, and matching inactive task IDs.
existing_tasks, future_ids, unmatched = self.filter_task_proxies(
items, future=True, warn=False,
)

# Trigger existing tasks.
flow_nums = self._get_flow_nums(flow, flow_descr)

# Trigger active tasks.
for itask in existing_tasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
self._force_trigger(itask)

# Spawn and trigger future tasks.
# Spawn and trigger inactive tasks.
if not flow:
# default: assign to all active flows
flow_nums = self._get_active_flow_nums()
for name, point in future_ids:

if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)
Expand Down Expand Up @@ -2321,13 +2312,13 @@ def filter_task_proxies(
)
future_matched: 'Set[Tuple[str, PointBase]]' = set()
if future and unmatched:
future_matched, unmatched = self.match_future_tasks(
future_matched, unmatched = self.match_inactive_tasks(
unmatched
)

return matched, future_matched, unmatched

def match_future_tasks(
def match_inactive_tasks(
self,
ids: Iterable[str],
) -> Tuple[Set[Tuple[str, 'PointBase']], List[str]]:
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-set/04-switch/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Set outputs of future task to direct the flow at an optional branch point.
# Set outputs of inactive task to direct the flow at an optional branch point.

[scheduler]
[[events]]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-set/05-expire/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Expire a future task, so it won't run.
# Expire an inactive task, so it won't run.

[scheduler]
[[events]]
Expand Down
15 changes: 11 additions & 4 deletions tests/functional/triggering/08-fam-finish-any/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
[[graph]]
R1 = """FAM:finish-any => foo"""
[runtime]
[[root]]
script = true
[[FAM]]
script = sleep 10
[[a,c]]
[[a]]
inherit = FAM
script = """
cylc__job__poll_grep_workflow_log -E "1/b.*succeeded"
"""
[[b]]
inherit = FAM
script = true
[[c]]
inherit = FAM
script = """
cylc__job__poll_grep_workflow_log -E "1/b.*succeeded"
"""
[[foo]]
script = true
Loading
Loading