diff --git a/changes.d/6367.fix.md b/changes.d/6367.fix.md new file mode 100644 index 00000000000..44045a632a6 --- /dev/null +++ b/changes.d/6367.fix.md @@ -0,0 +1 @@ +Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default. diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index 34b7a0f1460..d87c0711a8d 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -30,7 +30,7 @@ ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'" -ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued" +ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values" ERR_OPT_FLOW_WAIT = ( f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}" ) @@ -39,10 +39,11 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: """Check validity of flow-related CLI options. - Note the schema defaults flows to ["all"]. + Note the schema defaults flows to []. Examples: Good: + >>> flow_opts([], False) >>> flow_opts(["new"], False) >>> flow_opts(["1", "2"], False) >>> flow_opts(["1", "2"], True) @@ -50,7 +51,8 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: Bad: >>> flow_opts(["none", "1"], False) Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... must all be integer valued + cylc.flow.exceptions.InputError: Cannot combine --flow=none with other + flow values >>> flow_opts(["cheese", "2"], True) Traceback (most recent call last): @@ -58,21 +60,26 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: >>> flow_opts(["new"], True) Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... + cylc.flow.exceptions.InputError: --wait is not compatible with + --flow=new or --flow=none """ + if not flows: + return + + flows = [val.strip() for val in flows] + for val in flows: - val = val.strip() - if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: + if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}: if len(flows) != 1: - raise InputError(ERR_OPT_FLOW_INT) + raise InputError(ERR_OPT_FLOW_COMBINE.format(val)) else: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) + raise InputError(ERR_OPT_FLOW_VAL) - if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]: + if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 3b49050f0ff..8d65adfe305 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -1418,7 +1418,7 @@ def apply_task_proxy_db_history(self): itask, is_parent = self.db_load_task_proxies[relative_id] itask.submit_num = submit_num flow_nums = deserialise_set(flow_nums_str) - # Do not set states and outputs for future tasks in flow. + # Do not set states and outputs for inactive tasks in flow. if ( itask.flow_nums and flow_nums != itask.flow_nums and diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 70e40232c1d..5fc277fb607 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1998,17 +1998,20 @@ class Arguments: class FlowMutationArguments: flow = graphene.List( graphene.NonNull(Flow), - default_value=[FLOW_ALL], + default_value=[], description=sstrip(f''' - The flow(s) to trigger these tasks in. - - This should be a list of flow numbers OR a single-item list - containing one of the following three strings: - - * {FLOW_ALL} - Triggered tasks belong to all active flows - (default). - * {FLOW_NEW} - Triggered tasks are assigned to a new flow. - * {FLOW_NONE} - Triggered tasks do not belong to any flow. + The flow(s) to trigger/set these tasks in. + + By default: + * active tasks (n=0) keep their existing flow assignment + * inactive tasks (n>0) get assigned all active flows + + Otherwise you can assign (inactive tasks) or add to (active tasks): + * a list of integer flow numbers + or one of the following strings: + * {FLOW_ALL} - all active flows + * {FLOW_NEW} - an automatically generated new flow number + * {FLOW_NONE} - (ignored for active tasks): no flow ''') ) flow_wait = Boolean( diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index de788481cfe..1e6ef913696 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,19 +17,12 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run despite unsatisfied prerequisites. +Force tasks to run regardless of prerequisites. * Triggering an unqueued waiting task queues it, regardless of prerequisites. * Triggering a queued task submits it, regardless of queue limiting. * Triggering an active task has no effect (it already triggered). -Incomplete and active-waiting tasks in the n=0 window already belong to a flow. -Triggering them queues them to run (or rerun) in the same flow. - -Beyond n=0, triggered tasks get all current active flow numbers by default, or -specified flow numbers via the --flow option. Those flows - if/when they catch -up - will see tasks that ran after triggering event as having run already. - Examples: # trigger task foo in cycle 1234 in test $ cylc trigger test//1234/foo @@ -39,6 +32,21 @@ # start a new flow by triggering 1234/foo in test $ cylc trigger --flow=new test//1234/foo + +Flows: + Active tasks (in the n=0 window) already belong to a flow. + * by default, if triggered, they run in the same flow + * or with --flow=all, they are assigned all active flows + * or with --flow=INT or --flow=new, the original and new flows are merged + * (--flow=none is ignored for active tasks) + + Inactive tasks (n>0) do not already belong to a flow. + * by default they are assigned all active flows + * otherwise, they are assigned the --flow value + + Note --flow=new increments the global flow counter with each use. If it + takes multiple commands to start a new flow use the actual flow number + after the first command (you can read it from the scheduler log). """ from functools import partial diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ff75eb67d7d..3ddf991cb9c 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1290,17 +1290,17 @@ def set_hold_point(self, point: 'PointBase') -> None: def hold_tasks(self, items: Iterable[str]) -> int: """Hold tasks with IDs matching the specified items.""" # Hold active tasks: - itasks, future_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, warn=False, future=True, ) for itask in itasks: self.hold_active_task(itask) - # Set future tasks to be held: - for name, cycle in future_tasks: + # Set inactive tasks to be held: + for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, True)) - self.tasks_to_hold.update(future_tasks) + self.tasks_to_hold.update(inactive_tasks) self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1308,17 +1308,17 @@ def hold_tasks(self, items: Iterable[str]) -> int: def release_held_tasks(self, items: Iterable[str]) -> int: """Release held tasks with IDs matching any specified items.""" # Release active tasks: - itasks, future_tasks, unmatched = self.filter_task_proxies( + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, warn=False, future=True, ) for itask in itasks: self.release_held_active_task(itask) - # Unhold future tasks: - for name, cycle in future_tasks: + # Unhold inactive tasks: + for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, False)) - self.tasks_to_hold.difference_update(future_tasks) + self.tasks_to_hold.difference_update(inactive_tasks) self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1895,7 +1895,7 @@ def set_prereqs_and_outputs( Task matching restrictions (for now): - globs (cycle and name) only match in the pool - - future tasks must be specified individually + - inactive tasks must be specified individually - family names are not expanded to members Uses a transient task proxy to spawn children. (Even if parent was @@ -1916,27 +1916,28 @@ def set_prereqs_and_outputs( flow_descr: description of new flow """ - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - # Illegal flow command opts - return - - # Get matching pool tasks and future task definitions. - itasks, future_tasks, unmatched = self.filter_task_proxies( + # Get matching pool tasks and inactive task definitions. + itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) + flow_nums = self._get_flow_nums(flow, flow_descr) + + # Set existing task proxies. for itask in itasks: - # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: self._set_outputs_itask(itask, outputs) - for name, point in future_tasks: + # Spawn and set inactive tasks. + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() + for name, point in inactive_tasks: tdef = self.config.get_taskdef(name) if prereqs: self._set_prereqs_tdef( @@ -2023,7 +2024,7 @@ def _set_prereqs_itask( def _set_prereqs_tdef( self, point, taskdef, prereqs, flow_nums, flow_wait ): - """Spawn a future task and set prerequisites on it.""" + """Spawn an inactive task and set prerequisites on it.""" itask = self.spawn_task( taskdef.name, point, flow_nums, flow_wait=flow_wait @@ -2073,38 +2074,30 @@ def remove_tasks(self, items): return len(bad_items) def _get_flow_nums( - self, - flow: List[str], - meta: Optional[str] = None, - ) -> Optional[Set[int]]: - """Get correct flow numbers given user command options.""" - if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): - if len(flow) != 1: - LOG.warning( - f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}' - ' cannot be used in combination with integer flow numbers.' - ) - return None - if flow[0] == FLOW_ALL: - flow_nums = self._get_active_flow_nums() - elif flow[0] == FLOW_NEW: - flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} - elif flow[0] == FLOW_NONE: - flow_nums = set() - else: - try: - flow_nums = { - self.flow_mgr.get_flow_num( - flow_num=int(n), meta=meta - ) - for n in flow - } - except ValueError: - LOG.warning( - f"Ignoring command: illegal flow values {flow}" - ) - return None - return flow_nums + self, + flow: List[str], + meta: Optional[str] = None, + ) -> Set[int]: + """Return flow numbers corresponding to user command options. + + Arg should have been validated already during command validation. + + In the default case (--flow option not provided), stick with the + existing flows (so return empty set) - NOTE this only applies for + active tasks. + + """ + if flow == [FLOW_NONE]: + return set() + if flow == [FLOW_ALL]: + return self._get_active_flow_nums() + if flow == [FLOW_NEW]: + return {self.flow_mgr.get_flow_num(meta=meta)} + # else specific flow numbers: + return { + self.flow_mgr.get_flow_num(flow_num=int(n), meta=meta) + for n in flow + } def _force_trigger(self, itask): """Assumes task is in the pool""" @@ -2167,17 +2160,14 @@ def force_trigger_tasks( unless flow-wait is set. """ - # Get flow numbers for the tasks to be triggered. - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - - # Get matching tasks proxies, and matching future task IDs. + # Get matching tasks proxies, and matching inactive task IDs. existing_tasks, future_ids, unmatched = self.filter_task_proxies( items, future=True, warn=False, ) - # Trigger existing tasks. + flow_nums = self._get_flow_nums(flow, flow_descr) + + # Trigger active tasks. for itask in existing_tasks: if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") @@ -2185,12 +2175,13 @@ def force_trigger_tasks( self.merge_flows(itask, flow_nums) self._force_trigger(itask) - # Spawn and trigger future tasks. + # Spawn and trigger inactive tasks. + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() for name, point in future_ids: - if not self.can_be_spawned(name, point): continue - submit_num, _, prev_fwait = ( self._get_task_history(name, point, flow_nums) ) @@ -2331,13 +2322,13 @@ def filter_task_proxies( ) future_matched: 'Set[Tuple[str, PointBase]]' = set() if future and unmatched: - future_matched, unmatched = self.match_future_tasks( + future_matched, unmatched = self.match_inactive_tasks( unmatched ) return matched, future_matched, unmatched - def match_future_tasks( + def match_inactive_tasks( self, ids: Iterable[str], ) -> Tuple[Set[Tuple[str, 'PointBase']], List[str]]: diff --git a/tests/functional/cylc-set/04-switch/flow.cylc b/tests/functional/cylc-set/04-switch/flow.cylc index 18402c7b64c..8f7c4329af6 100644 --- a/tests/functional/cylc-set/04-switch/flow.cylc +++ b/tests/functional/cylc-set/04-switch/flow.cylc @@ -1,4 +1,4 @@ -# Set outputs of future task to direct the flow at an optional branch point. +# Set outputs of inactive task to direct the flow at an optional branch point. [scheduler] [[events]] diff --git a/tests/functional/cylc-set/05-expire/flow.cylc b/tests/functional/cylc-set/05-expire/flow.cylc index 9717664132f..4e5ca9f0608 100644 --- a/tests/functional/cylc-set/05-expire/flow.cylc +++ b/tests/functional/cylc-set/05-expire/flow.cylc @@ -1,4 +1,4 @@ -# Expire a future task, so it won't run. +# Expire an inactive task, so it won't run. [scheduler] [[events]] diff --git a/tests/functional/triggering/08-fam-finish-any/flow.cylc b/tests/functional/triggering/08-fam-finish-any/flow.cylc index 6d8790a829f..6ecb0bf9781 100644 --- a/tests/functional/triggering/08-fam-finish-any/flow.cylc +++ b/tests/functional/triggering/08-fam-finish-any/flow.cylc @@ -2,12 +2,19 @@ [[graph]] R1 = """FAM:finish-any => foo""" [runtime] + [[root]] + script = true [[FAM]] - script = sleep 10 - [[a,c]] + [[a]] inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[b]] inherit = FAM - script = true + [[c]] + inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[foo]] - script = true diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 906b1ac052d..f50333b6944 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -301,7 +301,7 @@ def test_delta_task_prerequisite(harness): [t.identity for t in schd.pool.get_tasks()], [(TASK_STATUS_SUCCEEDED,)], [], - "all" + flow=[] ) assert all({ p.satisfied diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py new file mode 100644 index 00000000000..5816b08527f --- /dev/null +++ b/tests/integration/test_flow_assignment.py @@ -0,0 +1,155 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test for flow-assignment in triggered/set tasks.""" + +import functools +import time +from typing import Callable + +import pytest + +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE +from cylc.flow.scheduler import Scheduler + + +async def test_trigger_no_flows(one, start): + """Test triggering a task with no flows present. + + It should get the flow numbers of the most recent active tasks. + """ + async with start(one): + + # Remove the task (flow 1) --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task, with new flow nums. + time.sleep(2) # The flows need different timestamps! + one.pool.force_trigger_tasks([task.identity], flow=['5', '9']) + assert len(one.pool.get_tasks()) == 1 + + # Ensure the new flow is in the db. + one.pool.workflow_db_mgr.process_queued_ops() + + # Remove the task --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task; it should get flow nums 5, 9 + one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) + assert len(one.pool.get_tasks()) == 1 + task = one.pool.get_tasks()[0] + assert task.flow_nums == {5, 9} + + +async def test_get_flow_nums(one: Scheduler, start): + """Test the task pool _get_flow_nums() method.""" + async with start(one): + # flow 1 is already present + task = one.pool.get_tasks()[0] + assert one.pool._get_flow_nums([FLOW_NEW]) == {2} + one.pool.merge_flows(task, {2}) + # now we have flows {1, 2}: + + assert one.pool._get_flow_nums([FLOW_NONE]) == set() + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + assert one.pool._get_flow_nums([FLOW_NEW]) == {3} + assert one.pool._get_flow_nums(['4', '5']) == {4, 5} + # the only active task still only has flows {1, 2} + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + + +@pytest.mark.parametrize('command', ['trigger', 'set']) +async def test_flow_assignment(flow, scheduler, start, command: str): + """Test flow assignment when triggering/setting tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + }, + 'runtime': { + 'foo': { + 'outputs': {'x': 'x'} + } + }, + } + id_ = flow(conf) + schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd): + if command == 'set': + do_command: Callable = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + else: + do_command = schd.pool.force_trigger_tasks + + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == {1} + assert active_b.flow_nums == {1, 2} + + # -----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + do_command([active_a.identity], flow=[]) + assert active_a.flow_nums == {1} + + # Else merge existing flow with requested flows. + do_command([active_a.identity], flow=[FLOW_ALL]) + assert active_a.flow_nums == {1, 2} + + # (no-flow is ignored for active tasks) + do_command([active_a.identity], flow=[FLOW_NONE]) + assert active_a.flow_nums == {1, 2} + + do_command([active_a.identity], flow=[FLOW_NEW]) + assert active_a.flow_nums == {1, 2, 3} + + # -----(2. Test inactive tasks)----- + if command == 'set': + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) + + # By default inactive tasks get all active flows. + do_command(['1/a'], flow=[]) + assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} + + # Else assign requested flows. + do_command(['1/b'], flow=[FLOW_NONE]) + assert schd.pool._get_task_by_id('1/b').flow_nums == set() + + do_command(['1/c'], flow=[FLOW_NEW]) + assert schd.pool._get_task_by_id('1/c').flow_nums == {4} + + do_command(['1/d'], flow=[FLOW_ALL]) + assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} + do_command(['1/e'], flow=[7]) + assert schd.pool._get_task_by_id('1/e').flow_nums == {7} diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 35ead99264f..c8ac305c09a 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -342,7 +342,7 @@ async def test_match_taskdefs( [ param( ['1/foo', '3/asd'], ['1/foo', '3/asd'], [], - id="Active & future tasks" + id="Active & inactive tasks" ), param( ['1/*', '2/*', '3/*', '6/*'], @@ -367,7 +367,7 @@ async def test_match_taskdefs( ['1/foo:waiting', '1/foo:failed', '6/bar:waiting'], ['1/foo'], ["No active tasks matching: 1/foo:failed", "No active tasks matching: 6/bar:waiting"], - id="Specifying task state works for active tasks, not future tasks" + id="Specifying task state works for active tasks, not inactive tasks" ) ] ) @@ -412,7 +412,7 @@ async def test_release_held_tasks( ) -> None: """Test TaskPool.release_held_tasks(). - For a workflow with held active tasks 1/foo & 1/bar, and held future task + For a workflow with held active tasks 1/foo & 1/bar, and held inactive task 3/asd. We skip testing the matching logic here because it would be slow using the @@ -1347,7 +1347,7 @@ async def test_set_prereqs( "20400101T0000Z/foo"] ) - # set one prereq of future task 20400101T0000Z/qux + # set one prereq of inactive task 20400101T0000Z/qux schd.pool.set_prereqs_and_outputs( ["20400101T0000Z/qux"], None, @@ -1526,7 +1526,7 @@ async def test_set_outputs_future( start, log_filter, ): - """Check manual setting of future task outputs. + """Check manual setting of inactive task outputs. """ id_ = flow( @@ -1556,7 +1556,7 @@ async def test_set_outputs_future( # it should start up with just 1/a assert pool_get_task_ids(schd.pool) == ["1/a"] - # setting future task b succeeded should spawn c but not b + # setting inactive task b succeeded should spawn c but not b schd.pool.set_prereqs_and_outputs( ["1/b"], ["succeeded"], None, ['all']) assert ( diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py deleted file mode 100644 index 30ae3404ed8..00000000000 --- a/tests/integration/test_trigger.py +++ /dev/null @@ -1,73 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import logging - -from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE - -import pytest -import time - - -@pytest.mark.parametrize( - 'flow_strs', - ( - [FLOW_ALL, '1'], - ['1', FLOW_ALL], - [FLOW_NEW, '1'], - [FLOW_NONE, '1'], - ['a'], - ['1', 'a'], - ) -) -async def test_trigger_invalid(mod_one, start, log_filter, flow_strs): - """Ensure invalid flow values are rejected.""" - async with start(mod_one) as log: - log.clear() - assert mod_one.pool.force_trigger_tasks(['*'], flow_strs) is None - assert len(log_filter(log, level=logging.WARN)) == 1 - - -async def test_trigger_no_flows(one, start, log_filter): - """Test triggering a task with no flows present. - - It should get the flow numbers of the most recent active tasks. - """ - async with start(one): - - # Remove the task (flow 1) --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task, with new flow nums. - time.sleep(2) # The flows need different timestamps! - one.pool.force_trigger_tasks([task.identity], [5, 9]) - assert len(one.pool.get_tasks()) == 1 - - # Ensure the new flow is in the db. - one.pool.workflow_db_mgr.process_queued_ops() - - # Remove the task --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task; it should get flow nums 5, 9 - one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) - assert len(one.pool.get_tasks()) == 1 - task = one.pool.get_tasks()[0] - assert task.flow_nums == {5, 9} diff --git a/tests/unit/test_command_validation.py b/tests/unit/test_command_validation.py new file mode 100644 index 00000000000..42fdda5aedf --- /dev/null +++ b/tests/unit/test_command_validation.py @@ -0,0 +1,41 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest + +from cylc.flow.command_validation import ( + ERR_OPT_FLOW_COMBINE, + ERR_OPT_FLOW_VAL, + flow_opts, +) +from cylc.flow.exceptions import InputError +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE + + +@pytest.mark.parametrize('flow_strs, expected_msg', [ + ([FLOW_ALL, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + (['1', FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + ([FLOW_NEW, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NEW)), + ([FLOW_NONE, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + ([FLOW_NONE, FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + (['a'], ERR_OPT_FLOW_VAL), + (['1', 'a'], ERR_OPT_FLOW_VAL), +]) +async def test_trigger_invalid(flow_strs, expected_msg): + """Ensure invalid flow values are rejected during command validation.""" + with pytest.raises(InputError) as exc_info: + flow_opts(flow_strs, False) + assert str(exc_info.value) == expected_msg