From 68ce6a15e5c868f0f431d65dd78d0d7c3f99a309 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Fri, 29 Nov 2024 13:17:50 +1300 Subject: [PATCH 01/18] Manual trigger: run now even when paused. --- changes.d/6499.feat.md | 1 + cylc/flow/commands.py | 7 +- cylc/flow/network/schema.py | 18 +- cylc/flow/scheduler.py | 63 +++++-- cylc/flow/scripts/pause.py | 3 +- cylc/flow/scripts/trigger.py | 24 ++- cylc/flow/task_pool.py | 114 ++++++++---- cylc/flow/task_proxy.py | 6 +- cylc/flow/task_queues/__init__.py | 27 ++- cylc/flow/task_queues/independent.py | 36 +++- .../restart/58-waiting-manual-triggered.t | 1 + tests/integration/conftest.py | 3 + tests/integration/scripts/test_dump.py | 2 +- tests/integration/test_flow_assignment.py | 163 ++++++++++++++---- tests/integration/test_force_trigger.py | 92 ++++++++++ tests/integration/test_optional_outputs.py | 2 +- tests/integration/test_queues.py | 8 +- tests/integration/test_scheduler.py | 42 ++++- tests/integration/test_task_pool.py | 48 +----- tests/unit/test_scheduler.py | 8 +- 20 files changed, 496 insertions(+), 172 deletions(-) create mode 100644 changes.d/6499.feat.md create mode 100644 tests/integration/test_force_trigger.py diff --git a/changes.d/6499.feat.md b/changes.d/6499.feat.md new file mode 100644 index 00000000000..747b066c717 --- /dev/null +++ b/changes.d/6499.feat.md @@ -0,0 +1 @@ +Manual trigger: run tasks immediately even if the workflow is paused. diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index c45194752ab..667bc6653f2 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -338,7 +338,7 @@ async def reload_workflow(schd: 'Scheduler'): # flush out preparing tasks before attempting reload schd.reload_pending = 'waiting for pending tasks to submit' - while schd.release_queued_tasks(): + while schd.release_tasks_to_run(): # Run the subset of main-loop functionality required to push # preparing through the submission pipeline and keep the workflow # responsive (e.g. to the `cylc stop` command). @@ -446,9 +446,12 @@ async def force_trigger_tasks( flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None, + on_resume: bool = False ): """Manual task trigger.""" validate.is_tasks(tasks) validate.flow_opts(flow, flow_wait) yield - yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr) + yield schd.pool.force_trigger_tasks( + tasks, flow, flow_wait, flow_descr, on_resume + ) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 3f6534b4188..ccc992ef1ec 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2213,20 +2213,24 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments): class Trigger(Mutation, TaskMutation): class Meta: description = sstrip(''' - Manually trigger tasks. + Manually trigger tasks, even in a paused workflow. - Warning: waiting tasks that are queue-limited will be queued if - triggered, to submit as normal when released by the queue; queued - tasks will submit immediately if triggered, even if that violates - the queue limit (so you may need to trigger a queue-limited task - twice to get it to submit immediately). + Triggering an unqueued task queues it, to run when queue-released. + Triggering a queued task runs it now regardless of queue limiting. + + The "on resume" option waits for a paused workflow to be resumed. Valid for: paused, running workflows. ''') resolver = partial(mutator, command='force_trigger_tasks') class Arguments(TaskMutation.Arguments, FlowMutationArguments): - ... + on_resume = Boolean( + default_value=False, + description=sstrip(''' + Run triggered tasks once the paused workflow is resumed. + ''') + ) def _mut_field(cls): diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 3339423640f..fb3dd373e1f 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -677,6 +677,19 @@ async def run_scheduler(self) -> None: self.restart_remote_init() await commands.run_cmd(commands.poll_tasks(self, ['*/*'])) + # If we shut down with manually triggered waiting tasks, + # submit them to run now. + pre_prep_tasks = [] + for itask in self.pool.get_tasks(): + if ( + itask.is_manual_submit + and itask.state(TASK_STATUS_WAITING) + ): + itask.waiting_on_job_prep = True + pre_prep_tasks.append(itask) + + self.start_job_submission(pre_prep_tasks) + self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( *main_loop.get_runners( @@ -778,10 +791,10 @@ async def start(self): self.uuid_str = dict(params)['uuid_str'] else: self.uuid_str = str(uuid4()) + self.task_events_mgr.uuid_str = self.uuid_str self._configure_contact() await self.configure(params) - self.task_events_mgr.uuid_str = self.uuid_str except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) @@ -865,6 +878,7 @@ def _load_pool_from_db(self): self.xtrigger_mgr.load_xtrigger_for_restart) self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart( self.pool.load_abs_outputs_for_restart) + self.pool.load_db_tasks_to_hold() self.pool.update_flow_mgr() @@ -1434,8 +1448,8 @@ def run_event_handlers(self, event, reason=""): return self.workflow_event_handler.handle(self, event, str(reason)) - def release_queued_tasks(self) -> bool: - """Release queued tasks, and submit jobs. + def release_tasks_to_run(self) -> bool: + """Release queued or manually submitted tasks, and submit jobs. The task queue manages references to task proxies in the task pool. @@ -1459,13 +1473,24 @@ def release_queued_tasks(self) -> bool: submission). """ + pre_prep_tasks: Set['TaskProxy'] = set() if ( - not self.is_paused - and self.stop_mode is None + self.stop_mode is None and self.auto_restart_time is None and self.reload_pending is False ): - pre_prep_tasks = self.pool.release_queued_tasks() + if self.pool.tasks_to_trigger_now: + # manually triggered tasks to run now, workflow paused or not + pre_prep_tasks.update(self.pool.tasks_to_trigger_now) + self.pool.tasks_to_trigger_now = set() + + if not self.is_paused: + # release queued tasks + pre_prep_tasks.update(self.pool.release_queued_tasks()) + if self.pool.tasks_to_trigger_on_resume: + # and manually triggered tasks to run once workflow resumed + pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume) + self.pool.tasks_to_trigger_on_resume = set() elif ( ( @@ -1477,19 +1502,30 @@ def release_queued_tasks(self) -> bool: self.reload_pending ) ): - # don't release queued tasks, finish processing preparing tasks - pre_prep_tasks = [ + # finish processing preparing tasks first + pre_prep_tasks = { itask for itask in self.pool.get_tasks() if itask.state(TASK_STATUS_PREPARING) - ] + } # Return, if no tasks to submit. else: return False + if not pre_prep_tasks: return False - # Start the job submission process. + return self.start_job_submission(pre_prep_tasks) + + def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool: + """Start the job submission process for some tasks. + + Return True if any were started, else False. + + """ + if self.stop_mode is not None: + return False + self.is_updated = True self.reset_inactivity_timer() @@ -1499,9 +1535,10 @@ def release_queued_tasks(self) -> bool: log = LOG.debug if self.options.reftest or self.options.genref: log = LOG.info + for itask in self.task_job_mgr.submit_task_jobs( self.workflow, - pre_prep_tasks, + itasks, self.server.curve_auth, self.server.client_pub_key_dir, run_mode=self.get_run_mode() @@ -1743,7 +1780,7 @@ async def _main_loop(self) -> None: self.broadcast_mgr.check_ext_triggers( itask, self.ext_trigger_queue) - if itask.is_ready_to_run(): + if itask.is_ready_to_run() and not itask.is_manual_submit: self.pool.queue_task(itask) if self.xtrigger_mgr.sequential_spawn_next: @@ -1752,7 +1789,7 @@ async def _main_loop(self) -> None: if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.clock_expire_tasks() - self.release_queued_tasks() + self.release_tasks_to_run() if ( self.get_run_mode() == RunMode.SIMULATION diff --git a/cylc/flow/scripts/pause.py b/cylc/flow/scripts/pause.py index 7e0d3014326..ee969251bba 100644 --- a/cylc/flow/scripts/pause.py +++ b/cylc/flow/scripts/pause.py @@ -20,7 +20,8 @@ Pause a workflow. -This suspends submission of all tasks in a workflow. +This suspends submission of all tasks until the workflow is resumed, except +for tasks manually triggered "now" with `cylc trigger --now`. Examples: # pause my_workflow diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 1e6ef913696..1e110c5dc5a 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,11 +17,13 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run regardless of prerequisites. +Manually trigger tasks regardless of prerequisites, even in a paused workflow. -* Triggering an unqueued waiting task queues it, regardless of prerequisites. -* Triggering a queued task submits it, regardless of queue limiting. -* Triggering an active task has no effect (it already triggered). +Triggering an unqueued task queues it, to run when released by the queue. +Triggering a queued task runs it immediately regardless of queue limiting. +So you may need to trigger a task twice if queue limiting is in effect. + +Attempts to trigger active tasks (submitted or running) will be ignored. Examples: # trigger task foo in cycle 1234 in test @@ -74,13 +76,15 @@ $flow: [Flow!], $flowWait: Boolean, $flowDescr: String, + $onResume: Boolean, ) { trigger ( workflows: $wFlows, tasks: $tasks, flow: $flow, flowWait: $flowWait, - flowDescr: $flowDescr + flowDescr: $flowDescr, + onResume: $onResume, ) { result } @@ -96,7 +100,16 @@ def get_option_parser() -> COP: multiworkflow=True, argdoc=[FULL_ID_MULTI_ARG_DOC], ) + add_flow_opts(parser) + + parser.add_option( + "--on-resume", + help=r"Run triggered tasks once a paused workflow is resumed.", + action="store_true", + default=False, + dest="on_resume" + ) return parser @@ -114,6 +127,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): 'flow': options.flow, 'flowWait': options.flow_wait, 'flowDescr': options.flow_descr, + 'onResume': options.on_resume, } } return await pclient.async_request('graphql', mutation_kwargs) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index c771af215cd..79cc07f1588 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -171,7 +171,10 @@ def __init__( self.task_name_list, self.config.runtime['descendants'] ) + self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set() + self.tasks_to_trigger_now: Set['TaskProxy'] = set() + self.tasks_to_trigger_on_resume: Set['TaskProxy'] = set() def set_stop_task(self, task_id): """Set stop after a task.""" @@ -949,16 +952,9 @@ def unqueue_task(self, itask: TaskProxy) -> None: self.data_store_mgr.delta_task_state(itask) self.task_queue_mgr.remove_task(itask) - def release_queued_tasks(self): - """Return list of queue-released tasks awaiting job prep. + def count_active_tasks(self): + """Count active tasks and identify pre-prep tasks.""" - Note: - Tasks can hang about for a while between being released and - entering the PREPARING state for various reasons. This method - returns tasks which are awaiting job prep irrespective of whether - they have been previously returned. - - """ # count active tasks by name # {task_name: number_of_active_instances, ...} active_task_counter = Counter() @@ -984,6 +980,20 @@ def release_queued_tasks(self): # an active task active_task_counter.update([itask.tdef.name]) + return active_task_counter, pre_prep_tasks + + def release_queued_tasks(self): + """Return list of queue-released tasks awaiting job prep. + + Note: + Tasks can hang about for a while between being released and + entering the PREPARING state for various reasons. This method + returns tasks which are awaiting job prep irrespective of whether + they have been previously returned. + + """ + active_task_counter, pre_prep_tasks = self.count_active_tasks() + # release queued tasks released = self.task_queue_mgr.release_tasks(active_task_counter) @@ -2127,48 +2137,91 @@ def _get_flow_nums( for n in flow } - def _force_trigger(self, itask): - """Assumes task is in the pool""" - # TODO is this flag still needed, and consistent with "cylc set"? + def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): + """Process a manually triggered task, ready for job submission. + + Assumes the task is in the pool. + + Triggering a queued task will: + - run it, regardless of queue limiting + + Triggering an unqueued task will: + - queue it, if the queue is limiting activity + - run it, if the queue is not limiting activity + + After state reset and queue handling: + - if on_resume is False, add the task to tasks_to_trigger_now + - if on_resume is True, add the task to tasks_to_trigger_on_resume + + The scheduler will release tasks from the tasks_to_trigger sets. + + """ itask.is_manual_submit = True itask.reset_try_timers() + if itask.state_reset(TASK_STATUS_WAITING): # (could also be unhandled failed) self.data_store_mgr.delta_task_state(itask) - # (No need to set prerequisites satisfied here). - if itask.state.is_runahead: - # Release from runahead, and queue it. - self.rh_release_and_queue(itask) + + if itask.state_reset(is_runahead=False): + # Can force trigger runahead-limited tasks. + self.data_store_mgr.delta_task_state(itask) self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), itask.flow_nums ) - else: - # De-queue it to run now. - self.task_queue_mgr.force_release_task(itask) + + if not itask.state.is_queued: + # queue it if limiting + active, _ = self.count_active_tasks() + if self.task_queue_mgr.push_task_if_limited(itask, active): + itask.state_reset(is_queued=True) + self.data_store_mgr.delta_task_state(itask) + + elif self.task_queue_mgr.force_release_task(itask): + # else release it from the queue to run now + itask.state_reset(is_queued=False) + self.data_store_mgr.delta_task_state(itask) + + if not itask.state.is_queued: + # If not queued now, record the task as ready to run. + itask.waiting_on_job_prep = True + + if on_resume: + with suppress(KeyError): + # In case previously triggered without --on-resume. + # (It should have run already, but just in case). + self.tasks_to_trigger_now.remove(itask) + self.tasks_to_trigger_on_resume.add(itask) + else: + with suppress(KeyError): + # In case previously triggered with --on-resume. + self.tasks_to_trigger_on_resume.remove(itask) + self.tasks_to_trigger_now.add(itask) + # Task may be set running before xtrigger is satisfied, # if so check/spawn if xtrigger sequential. self.check_spawn_psx_task(itask) def force_trigger_tasks( - self, items: Iterable[str], + self, + items: Iterable[str], flow: List[str], flow_wait: bool = False, - flow_descr: Optional[str] = None + flow_descr: Optional[str] = None, + now: bool = False ): - """Force a task to trigger (user command). - - Always run the task, even if a previous run was flow-waited. + """Manually trigger tasks. - If the task did not run before in the flow: - - run it, and spawn on outputs unless flow-wait is set. + If a task did not run before in the flow: + - trigger it, and spawn on outputs unless flow-wait is set. (but load the previous outputs from the DB) - Else if the task ran before in the flow: + If a task ran before in the flow: - load previous outputs If the previous run was not flow-wait - - run it, and try to spawn on outputs + - trigger it, and try to spawn on outputs Else if the previous run was flow-wait: - just spawn (if not already spawned in this flow) unless flow-wait is set. @@ -2193,12 +2246,13 @@ def force_trigger_tasks( LOG.error(f"[{itask}] ignoring trigger - already active") continue self.merge_flows(itask, flow_nums) - self._force_trigger(itask) + self._force_trigger(itask, now) # Spawn and trigger inactive tasks. if not flow: # default: assign to all active flows flow_nums = self._get_active_flow_nums() + for name, point in inactive_ids: if not self.can_be_spawned(name, point): continue @@ -2227,7 +2281,7 @@ def force_trigger_tasks( # run it (or run it again for incomplete flow-wait) self.add_to_pool(itask) - self._force_trigger(itask) + self._force_trigger(itask, now) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 4d98609df39..8d9134a6ac4 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -467,13 +467,9 @@ def next_point(self): def is_ready_to_run(self) -> bool: """Is this task ready to run? - Takes account of all dependence: on other tasks, xtriggers, and - old-style ext-triggers. Or, manual triggering. + Return True if not held, no active try timers, and prerequisites done. """ - if self.is_manual_submit: - # Manually triggered, ignore unsatisfied prerequisites. - return True if self.state.is_held: # A held task is not ready to run. return False diff --git a/cylc/flow/task_queues/__init__.py b/cylc/flow/task_queues/__init__.py index e19b68c25e7..b6d2f8a8241 100644 --- a/cylc/flow/task_queues/__init__.py +++ b/cylc/flow/task_queues/__init__.py @@ -40,35 +40,46 @@ def __init__(self, * descendants: runtime family dict """ - pass + raise NotImplementedError @abstractmethod def push_task(self, itask: 'TaskProxy') -> None: """Queue the given task.""" - pass + raise NotImplementedError + + @abstractmethod + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Queue the task only if the queue limit is reached. + + Requires current active task counts. + Return True if queued, else False. + """ + raise NotImplementedError @abstractmethod def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]': """Release tasks, given current active task counts.""" - pass + raise NotImplementedError @abstractmethod def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" - pass + raise NotImplementedError @abstractmethod - def force_release_task(self, itask: 'TaskProxy') -> None: + def force_release_task(self, itask: 'TaskProxy') -> bool: """Remove a task from whichever queue it belongs to. - To be returned when release_tasks() is next called. + Return True if released, else False """ - pass + raise NotImplementedError @abstractmethod def adopt_tasks(self, orphans: List[str]) -> None: """Adopt tasks with defs removed by scheduler reload or restart.""" - pass + raise NotImplementedError def _expand_families(self, qconfig: dict, diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index 3bac42fe0f5..4cc5c8e2183 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -40,6 +40,21 @@ def push_task(self, itask: 'TaskProxy') -> None: if itask.tdef.name in self.members: self.deque.appendleft(itask) + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Queue task if in my membership and the queue limit is reached.""" + n_active: int = 0 + for mem in self.members: + n_active += active[mem] + if ( + self.limit and n_active >= self.limit + and itask.tdef.name in self.members + ): + self.deque.appendleft(itask) + return True + return False + def release(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks if below the active limit.""" # The "active" argument counts active tasks by name. @@ -113,34 +128,37 @@ def __init__(self, config["limit"], config["members"] ) - self.force_released: Set['TaskProxy'] = set() - def push_task(self, itask: 'TaskProxy') -> None: """Push a task to the appropriate queue.""" for queue in self.queues.values(): queue.push_task(itask) + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Push a task to its queue only if the queue limit is reached.""" + return any( + queue.push_task_if_limited(itask, active) + for queue in self.queues.values() + ) + def release_tasks(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks up to the queue limits.""" released: List['TaskProxy'] = [] for queue in self.queues.values(): released += queue.release(active) - if self.force_released: - released.extend(self.force_released) - self.force_released = set() return released def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" return any(queue.remove(itask) for queue in self.queues.values()) - def force_release_task(self, itask: 'TaskProxy') -> None: + def force_release_task(self, itask: 'TaskProxy') -> bool: """Remove a task from whichever queue it belongs to. - To be returned when release_tasks() is next called. + Return True if released, else False. """ - if self.remove_task(itask): - self.force_released.add(itask) + return self.remove_task(itask) def adopt_tasks(self, orphans: List[str]) -> None: """Adopt orphaned tasks to the default group.""" diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t index 455cb289592..fd47201e9f7 100644 --- a/tests/functional/restart/58-waiting-manual-triggered.t +++ b/tests/functional/restart/58-waiting-manual-triggered.t @@ -40,6 +40,7 @@ __EOF__ # It should restart and shut down normally, not stall with 2/foo waiting on 1/foo. workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}" + # Check that 2/foo job 02 did run before shutdown. grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo\/02:running\] => succeeded" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6300cefc74e..cfcd137b453 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,6 +55,7 @@ from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id from cylc.flow.workflow_status import StopMode +from cylc.flow.task_state import TASK_STATUS_SUBMITTED from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -425,6 +426,8 @@ def _disable_submission(schd: 'Scheduler') -> 'Set[TaskProxy]': def _submit_task_jobs(_, itasks, *args, **kwargs): nonlocal submitted_tasks + for itask in itasks: + itask.state_reset(TASK_STATUS_SUBMITTED) submitted_tasks.update(itasks) return itasks diff --git a/tests/integration/scripts/test_dump.py b/tests/integration/scripts/test_dump.py index 3189f94e6d0..e4ea8c06de3 100644 --- a/tests/integration/scripts/test_dump.py +++ b/tests/integration/scripts/test_dump.py @@ -46,7 +46,7 @@ async def test_dump_tasks(flow, scheduler, start): }) schd = scheduler(id_) async with start(schd): - # schd.release_queued_tasks() + # schd.release_tasks_to_run() await schd.update_data_structure() ret = [] await dump( diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py index ea729efeb7b..7f186e86525 100644 --- a/tests/integration/test_flow_assignment.py +++ b/tests/integration/test_flow_assignment.py @@ -81,11 +81,14 @@ async def test_get_flow_nums(one: Scheduler, start): assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} -@pytest.mark.parametrize('command', ['trigger', 'set']) -async def test_flow_assignment( - flow, scheduler, start, command: str, log_filter: Callable +async def test_flow_assignment_set( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + log_filter: Callable, + capture_submission: 'Fixture', ): - """Test flow assignment when triggering/setting tasks. + """Test flow assignment when setting tasks. Active tasks: By default keep existing flows, else merge with requested flows. @@ -110,53 +113,65 @@ async def test_flow_assignment( } id_ = flow(conf) schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd): - if command == 'set': - do_command: Callable = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] - ) - else: - do_command = schd.pool.force_trigger_tasks - - active_a, active_b = schd.pool.get_tasks() - schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_a.flow_nums == {1} - assert active_b.flow_nums == {1, 2} + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + command = "set" + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + + # get active foo and bar (which is which doesn't matter) + # ("active" here means in the active task pool). + active_x, active_y = schd.pool.get_tasks() + schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_x.flow_nums == {1} + assert active_y.flow_nums == {1, 2} # -----(1. Test active tasks)----- + # Note this also tests that setting prerequisites + # By default active tasks keep existing flow assignment. - do_command([active_a.identity], flow=[]) - assert active_a.flow_nums == {1} + do_command([active_x.identity], flow=[]) + assert active_x.flow_nums == {1} # Else merge existing flow with requested flows. - do_command([active_a.identity], flow=[FLOW_ALL]) - assert active_a.flow_nums == {1, 2} + do_command([active_x.identity], flow=[FLOW_ALL]) + assert active_x.flow_nums == {1, 2} # (no-flow is ignored for active tasks) - do_command([active_a.identity], flow=[FLOW_NONE]) - assert active_a.flow_nums == {1, 2} + do_command([active_x.identity], flow=[FLOW_NONE]) + assert active_x.flow_nums == {1, 2} assert log_filter( contains=( - f'[{active_a}] ignoring \'flow=none\' {command}: ' - f'task already has {repr_flow_nums(active_a.flow_nums)}' + f'[{active_x}] ignoring \'flow=none\' {command}: ' + f'task already has {repr_flow_nums(active_x.flow_nums)}' ), level=logging.ERROR ) - do_command([active_a.identity], flow=[FLOW_NEW]) - assert active_a.flow_nums == {1, 2, 3} + do_command([active_x.identity], flow=[FLOW_NEW]) + assert active_x.flow_nums == {1, 2, 3} # -----(2. Test inactive tasks)----- - if command == 'set': - do_command = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] - ) + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) # By default inactive tasks get all active flows. do_command(['1/a'], flow=[]) assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} + # set --pre=all should not cause a task to submit in a paused workflow + assert len(submitted_tasks) == 0 + # but triggering it should + schd.pool.force_trigger_tasks(['1/a'], []) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 1 + # Else assign requested flows. do_command(['1/b'], flow=[FLOW_NONE]) assert schd.pool._get_task_by_id('1/b').flow_nums == set() @@ -166,5 +181,93 @@ async def test_flow_assignment( do_command(['1/d'], flow=[FLOW_ALL]) assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} + + do_command(['1/e'], flow=[7]) + assert schd.pool._get_task_by_id('1/e').flow_nums == {7} + + +async def test_flow_assignment_trigger( + flow, scheduler, start, log_filter: Callable +): + """Test flow assignment when triggering tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + Note this differs from the 'set' test above because triggering a + task once makes it active (even in a paused workflow) after which + additional triggers get ignored. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + }, + 'runtime': { + 'foo': { + 'outputs': {'x': 'x'} + } + }, + } + id_ = flow(conf) + schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd) as log: + + command = 'trigger' + do_command = schd.pool.force_trigger_tasks + + # get active foo and bar (which is which doesn't matter) + # ("active" here means in the active task pool). + active_x, active_y = schd.pool.get_tasks() + schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_x.flow_nums == {1} + assert active_y.flow_nums == {1, 2} + + # -----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + # This trigger makes the task literally active (submitted, running) + do_command([active_x.identity], flow=[]) + schd.release_tasks_to_run() + assert active_x.flow_nums == {1} + + # Triggering it again will be ignored (already active). + do_command([active_x.identity], flow=[FLOW_ALL]) + schd.release_tasks_to_run() + assert active_x.flow_nums == {1} + assert log_filter( + contains=( + f'[{active_x}] ignoring trigger - already active' + ), + level=logging.ERROR + ) + + # -----(2. Test inactive tasks)----- + # By default inactive tasks get all active flows. + do_command(['1/a'], flow=[]) + schd.release_tasks_to_run() + assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2} + + # Else assign requested flows. + do_command(['1/b'], flow=[FLOW_NONE]) + schd.release_tasks_to_run() + assert schd.pool._get_task_by_id('1/b').flow_nums == set() + + do_command(['1/c'], flow=[FLOW_NEW]) + schd.release_tasks_to_run() + assert schd.pool._get_task_by_id('1/c').flow_nums == {3} + + do_command(['1/d'], flow=[FLOW_ALL]) + schd.release_tasks_to_run() + assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3} + do_command(['1/e'], flow=[7]) + schd.release_tasks_to_run() assert schd.pool._get_task_by_id('1/e').flow_nums == {7} diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py new file mode 100644 index 00000000000..e84c862fc97 --- /dev/null +++ b/tests/integration/test_force_trigger.py @@ -0,0 +1,92 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from typing import ( + Any as Fixture, + Callable +) + +import logging + +async def test_trigger_workflow_paused( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + capture_submission: 'Fixture', + log_filter: Callable +): + """ + Test manual triggering when the workflow is paused. + + The usual queue limiting behaviour is expected. + + https://github.com/cylc/cylc-flow/issues/6192 + + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True, + }, + 'scheduling': { + 'queues': { + 'default': { + 'limit': 1, + }, + }, + 'graph': { + 'R1': ''' + a => x & y & z + ''', + }, + }, + }) + schd = scheduler(id_, paused_start=True) + + # start the scheduler (but don't set the main loop running) + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + # paused at start-up so no tasks should be submitted + assert len(submitted_tasks) == 0 + + # manually trigger 1/x - it should be submitted + schd.pool.force_trigger_tasks(['1/x'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 1 + + # manually trigger 1/y - it should not be queued but not submitted + # (queue limit reached) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 1 + + # manually trigger 1/y again - it should be submitted + # (triggering a queued task runs it) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2 + + # manually trigger 1/y yet again - the trigger should be ignored + # (task already active) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2 + assert log_filter( + level=logging.ERROR, + contains="ignoring trigger - already active" + ) diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index d5c4e41ce81..5888e686c6e 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -193,7 +193,7 @@ async def test_expire_orthogonality(flow, scheduler, start): # wait for the task to submit while not a_1.state(TASK_STATUS_WAITING, TASK_STATUS_PREPARING): - schd.release_queued_tasks() + schd.release_tasks_to_run() # NOTE: The submit number isn't presently incremented via this code # pathway so we have to hack it here. If the task messages in this test diff --git a/tests/integration/test_queues.py b/tests/integration/test_queues.py index 7da83e1a1aa..b3bd1eb564d 100644 --- a/tests/integration/test_queues.py +++ b/tests/integration/test_queues.py @@ -87,13 +87,13 @@ async def test_queue_release( # (if scheduler is paused we should not have any submissions) # (otherwise a number of tasks up to the limit should be released) schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == expected_submissions for _ in range(3): # release runahead/queued tasks # (no further tasks should be released) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == expected_submissions @@ -125,7 +125,7 @@ async def test_queue_held_tasks( # release queued tasks # (no tasks should be released from the queues because they are held) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == 0 # un-hold tasks @@ -133,5 +133,5 @@ async def test_queue_held_tasks( # release queued tasks # (tasks should now be released from the queues) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == 1 diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 6f1f581e899..f21abf39049 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -170,7 +170,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release runahead/queued tasks # (nothing should happen because the scheduler is paused) one.pool.release_runahead_tasks() - one.release_queued_tasks() + one.release_tasks_to_run() assert submitted_tasks == set() # hold all tasks & resume the workflow @@ -179,7 +179,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (there should be no change because the task is still held) - one.release_queued_tasks() + one.release_tasks_to_run() assert submitted_tasks == set() # release all tasks @@ -187,7 +187,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (the task should be submitted) - one.release_queued_tasks() + one.release_tasks_to_run() assert len(submitted_tasks) == 1 @@ -346,6 +346,17 @@ async def test_restart_timeout( See: https://github.com/cylc/cylc-flow/issues/5078 """ + one_conf.update( + { + 'runtime': { + 'one': { + 'simulation': { + 'default run length': 'PT5S' + } + } + } + } + ) id_ = flow(one_conf) # run the workflow to completion @@ -360,20 +371,35 @@ async def test_restart_timeout( [itask.identity], None, None, ['all']) # restart the completed workflow - schd = scheduler(id_) - async with run(schd): + schd = scheduler(id_, paused_start=False) + async with run(schd) as log: + # allow start-up process to complete? + # without this we get KeyError from platform['install target'] + await asyncio.sleep(0) + # it should detect that the workflow has completed and alert the user assert log_filter( + logging.WARNING, contains='This workflow already ran to completion.' ) # it should activate a timeout - assert log_filter(contains='restart timer starts NOW') + assert log_filter( + logging.WARNING, + contains='restart timer starts NOW' + ) # when we trigger tasks the timeout should be cleared schd.pool.force_trigger_tasks(['1/one'], {1}) - await asyncio.sleep(0) # yield control to the main loop - assert log_filter(contains='restart timer stopped') + schd.release_tasks_to_run() + + # wait for log to update + await asyncio.sleep(3) + + assert log_filter( + logging.INFO, + contains='restart timer stopped' + ) @pytest.mark.parametrize("signal", ((SIGHUP), (SIGINT), (SIGTERM))) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 404ec8da87f..e346b1faaed 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -701,7 +701,7 @@ async def test_restart_prereqs( async with start(schd): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z @@ -824,7 +824,7 @@ async def test_reload_prereqs( async with start(schd): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z @@ -861,7 +861,7 @@ async def _test_restart_prereqs_sat(): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == [ ('1', 'a', 'running'), ('1', 'b', 'running') @@ -2171,48 +2171,6 @@ async def list_data_store(): 'c': 'wall_clock(trigger_time=946688400)', } - -async def test_trigger_unqueued(flow, scheduler, start): - """Test triggering an unqueued active task. - - It should not add to the force_released list. - See https://github.com/cylc/cylc-flow/pull/6337 - - """ - conf = { - 'scheduler': {'allow implicit tasks': 'True'}, - 'scheduling': { - 'graph': { - 'R1': 'a & b => c' - } - } - } - schd = scheduler( - flow(conf), - run_mode='simulation', - paused_start=False - ) - - async with start(schd): - # Release tasks 1/a and 1/b - schd.pool.release_runahead_tasks() - schd.release_queued_tasks() - assert pool_get_task_ids(schd.pool) == ['1/a', '1/b'] - - # Mark 1/a as succeeded and spawn 1/c - task_a = schd.pool.get_task(IntegerPoint("1"), "a") - schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') - assert pool_get_task_ids(schd.pool) == ['1/b', '1/c'] - - # Trigger the partially satisified (and not queued) task 1/c - schd.pool.force_trigger_tasks(['1/c'], [FLOW_ALL]) - - # It should not add to the queue managers force_released list. - assert not schd.pool.task_queue_mgr.force_released, ( - "Triggering an unqueued task should not affect the force_released list" - ) - - @pytest.mark.parametrize('expire_type', ['clock-expire', 'manual']) async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type): """An expired waiting task should be removed from any queues. diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index ccd5f5dfed5..9163b9e3567 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -91,8 +91,8 @@ def test_should_auto_restart_now( assert Scheduler.should_auto_restart_now(mock_schd) == expected -def test_release_queued_tasks__auto_restart(): - """Test that Scheduler.release_queued_tasks() works as expected +def test_release_tasks_to_run__auto_restart(): + """Test that Scheduler.release_tasks_to_run() works as expected during auto restart.""" mock_schd = Mock( auto_restart_time=(time() - 100), @@ -107,10 +107,12 @@ def test_release_queued_tasks__auto_restart(): options=RunOptions(), task_job_mgr=MagicMock() ) - Scheduler.release_queued_tasks(mock_schd) + Scheduler.release_tasks_to_run(mock_schd) # Should not actually release any more tasks, just submit the # preparing ones mock_schd.pool.release_queued_tasks.assert_not_called() + + Scheduler.start_job_submission(mock_schd, mock_schd.pool.get_tasks()) mock_schd.task_job_mgr.submit_task_jobs.assert_called() From 8a1d7e3b32bc4e8570e6f197363e5d069843e785 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 2 Dec 2024 10:10:49 +1300 Subject: [PATCH 02/18] Fix command help text [skip ci] --- cylc/flow/scripts/pause.py | 5 ++--- cylc/flow/scripts/trigger.py | 6 +++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cylc/flow/scripts/pause.py b/cylc/flow/scripts/pause.py index ee969251bba..42ff2efd56f 100644 --- a/cylc/flow/scripts/pause.py +++ b/cylc/flow/scripts/pause.py @@ -18,10 +18,9 @@ """cylc pause [OPTIONS] ARGS -Pause a workflow. +Suspend all automatic job submission until the workflow is resumed. -This suspends submission of all tasks until the workflow is resumed, except -for tasks manually triggered "now" with `cylc trigger --now`. +Manual triggering can still run tasks immediately, if the workflow is paused. Examples: # pause my_workflow diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 1e110c5dc5a..1392958f92d 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -21,7 +21,11 @@ Triggering an unqueued task queues it, to run when released by the queue. Triggering a queued task runs it immediately regardless of queue limiting. -So you may need to trigger a task twice if queue limiting is in effect. +So you may need to trigger tasks twice if queue limiting is in effect. + +If the workflow is paused queued waiting tasks will not run (unless manually +triggered) until the workflow is resumed, even if the queue empties out. This +includes tasks queued by manual triggering, when queue limits are in effect. Attempts to trigger active tasks (submitted or running) will be ignored. From 7d6c18e3c3e6efab330f9b8e205b29e2f292008f Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:05:21 +0000 Subject: [PATCH 03/18] Simplify integration test --- tests/integration/test_scheduler.py | 53 +++++++---------------------- 1 file changed, 13 insertions(+), 40 deletions(-) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index f21abf39049..63f85a84253 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -24,6 +24,7 @@ from cylc.flow import commands from cylc.flow.exceptions import CylcError +from cylc.flow.flow_mgr import FLOW_ALL from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.scheduler import Scheduler, SchedulerStop from cylc.flow.task_state import ( @@ -331,9 +332,10 @@ async def test_restart_timeout( flow, one_conf, scheduler, - start, run, - log_filter + log_filter, + complete, + capture_submission, ): """It should wait for user input if there are no tasks in the pool. @@ -346,37 +348,16 @@ async def test_restart_timeout( See: https://github.com/cylc/cylc-flow/issues/5078 """ - one_conf.update( - { - 'runtime': { - 'one': { - 'simulation': { - 'default run length': 'PT5S' - } - } - } - } - ) id_ = flow(one_conf) # run the workflow to completion - # (by setting the only task to completed) - schd = scheduler(id_, paused_start=False) - async with start(schd) as log: - for itask in schd.pool.get_tasks(): - # (needed for job config in sim mode:) - schd.task_job_mgr.submit_task_jobs( - schd.workflow, [itask], None, None) - schd.pool.set_prereqs_and_outputs( - [itask.identity], None, None, ['all']) + schd: Scheduler = scheduler(id_, paused_start=False) + async with run(schd): + await complete(schd) # restart the completed workflow schd = scheduler(id_, paused_start=False) - async with run(schd) as log: - # allow start-up process to complete? - # without this we get KeyError from platform['install target'] - await asyncio.sleep(0) - + async with run(schd): # it should detect that the workflow has completed and alert the user assert log_filter( logging.WARNING, @@ -384,22 +365,14 @@ async def test_restart_timeout( ) # it should activate a timeout - assert log_filter( - logging.WARNING, - contains='restart timer starts NOW' - ) + assert log_filter(logging.WARNING, contains='restart timer starts NOW') + capture_submission(schd) # when we trigger tasks the timeout should be cleared - schd.pool.force_trigger_tasks(['1/one'], {1}) - schd.release_tasks_to_run() + schd.pool.force_trigger_tasks(['1/one'], [FLOW_ALL]) - # wait for log to update - await asyncio.sleep(3) - - assert log_filter( - logging.INFO, - contains='restart timer stopped' - ) + await asyncio.sleep(0) # yield control to the main loop + assert log_filter(logging.INFO, contains='restart timer stopped') @pytest.mark.parametrize("signal", ((SIGHUP), (SIGINT), (SIGTERM))) From 02c9f013a96f1df18819b382a250ac6c5eb4c053 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 4 Dec 2024 10:55:34 +1300 Subject: [PATCH 04/18] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- changes.d/6499.feat.md | 2 +- cylc/flow/network/schema.py | 3 ++- cylc/flow/scripts/pause.py | 5 +++-- cylc/flow/scripts/trigger.py | 8 ++++++-- tests/integration/test_force_trigger.py | 3 --- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/changes.d/6499.feat.md b/changes.d/6499.feat.md index 747b066c717..d2b5f058302 100644 --- a/changes.d/6499.feat.md +++ b/changes.d/6499.feat.md @@ -1 +1 @@ -Manual trigger: run tasks immediately even if the workflow is paused. +Manually triggered tasks now run immediately even if the workflow is paused. diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index ccc992ef1ec..e5394eecdc6 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2228,7 +2228,8 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments): on_resume = Boolean( default_value=False, description=sstrip(''' - Run triggered tasks once the paused workflow is resumed. + If the workflow is paused, wait until it is resumed before + running the triggered task(s). ''') ) diff --git a/cylc/flow/scripts/pause.py b/cylc/flow/scripts/pause.py index 42ff2efd56f..8ae7b00e8d4 100644 --- a/cylc/flow/scripts/pause.py +++ b/cylc/flow/scripts/pause.py @@ -18,9 +18,10 @@ """cylc pause [OPTIONS] ARGS -Suspend all automatic job submission until the workflow is resumed. +Suspend automatic job submission. -Manual triggering can still run tasks immediately, if the workflow is paused. +Manual triggering can still run tasks immediately, even if the workflow is +paused. Examples: # pause my_workflow diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 1392958f92d..ab3f8ba8762 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,7 +17,7 @@ """cylc trigger [OPTIONS] ARGS -Manually trigger tasks regardless of prerequisites, even in a paused workflow. +Force task(s) to run regardless of prerequisites, even in a paused workflow. Triggering an unqueued task queues it, to run when released by the queue. Triggering a queued task runs it immediately regardless of queue limiting. @@ -109,7 +109,11 @@ def get_option_parser() -> COP: parser.add_option( "--on-resume", - help=r"Run triggered tasks once a paused workflow is resumed.", + help=( + "If the workflow is paused, wait until it is resumed before " + "running the triggered task(s). DEPRECATED - this will be " + "removed at Cylc 8.5." + ), action="store_true", default=False, dest="on_resume" diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py index e84c862fc97..b5f2c4b3f4d 100644 --- a/tests/integration/test_force_trigger.py +++ b/tests/integration/test_force_trigger.py @@ -37,9 +37,6 @@ async def test_trigger_workflow_paused( """ id_ = flow({ - 'scheduler': { - 'allow implicit tasks': True, - }, 'scheduling': { 'queues': { 'default': { From 374fe0ba6b24e0c4400b32a77edc581b467062fd Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 4 Dec 2024 11:00:05 +1300 Subject: [PATCH 05/18] Update cylc/flow/task_queues/independent.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_queues/independent.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index 4cc5c8e2183..fd904334c7a 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -44,9 +44,7 @@ def push_task_if_limited( self, itask: 'TaskProxy', active: Counter[str] ) -> bool: """Queue task if in my membership and the queue limit is reached.""" - n_active: int = 0 - for mem in self.members: - n_active += active[mem] + n_active = sum(active[mem] for mem in self.members) if ( self.limit and n_active >= self.limit and itask.tdef.name in self.members From 3bd604026b730c5c8b27cbd9231330cd4613f27f Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 4 Dec 2024 11:29:45 +1300 Subject: [PATCH 06/18] Remove a redundant queue method. --- cylc/flow/task_pool.py | 2 +- cylc/flow/task_queues/__init__.py | 8 -------- cylc/flow/task_queues/independent.py | 7 ------- 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 79cc07f1588..87009a00fdc 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2179,7 +2179,7 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): itask.state_reset(is_queued=True) self.data_store_mgr.delta_task_state(itask) - elif self.task_queue_mgr.force_release_task(itask): + elif self.task_queue_mgr.remove_task(itask): # else release it from the queue to run now itask.state_reset(is_queued=False) self.data_store_mgr.delta_task_state(itask) diff --git a/cylc/flow/task_queues/__init__.py b/cylc/flow/task_queues/__init__.py index b6d2f8a8241..f8c16725875 100644 --- a/cylc/flow/task_queues/__init__.py +++ b/cylc/flow/task_queues/__init__.py @@ -68,14 +68,6 @@ def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" raise NotImplementedError - @abstractmethod - def force_release_task(self, itask: 'TaskProxy') -> bool: - """Remove a task from whichever queue it belongs to. - - Return True if released, else False - """ - raise NotImplementedError - @abstractmethod def adopt_tasks(self, orphans: List[str]) -> None: """Adopt tasks with defs removed by scheduler reload or restart.""" diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index fd904334c7a..5cf2215918d 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -151,13 +151,6 @@ def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" return any(queue.remove(itask) for queue in self.queues.values()) - def force_release_task(self, itask: 'TaskProxy') -> bool: - """Remove a task from whichever queue it belongs to. - - Return True if released, else False. - """ - return self.remove_task(itask) - def adopt_tasks(self, orphans: List[str]) -> None: """Adopt orphaned tasks to the default group.""" self.queues[self.Q_DEFAULT].adopt(orphans) From cce3c8aebdebd79976426003cd4f67cef1fbb75b Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 5 Dec 2024 15:59:41 +1300 Subject: [PATCH 07/18] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/network/schema.py | 11 +++++++---- cylc/flow/scripts/trigger.py | 12 ++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index e5394eecdc6..d5f174e5b96 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2215,10 +2215,13 @@ class Meta: description = sstrip(''' Manually trigger tasks, even in a paused workflow. - Triggering an unqueued task queues it, to run when queue-released. - Triggering a queued task runs it now regardless of queue limiting. - - The "on resume" option waits for a paused workflow to be resumed. + Triggering a task that is not yet queued will queue it. + + Triggering a queued task runs it immediately. + + Queues release tasks to run when their active task count + drops below the queue limit. So, depending on the task count, you + may need to trigger a task twice to make it run immediately. Valid for: paused, running workflows. ''') diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index ab3f8ba8762..489a82adce1 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -19,13 +19,13 @@ Force task(s) to run regardless of prerequisites, even in a paused workflow. -Triggering an unqueued task queues it, to run when released by the queue. -Triggering a queued task runs it immediately regardless of queue limiting. -So you may need to trigger tasks twice if queue limiting is in effect. +Triggering a task that is not yet queued will queue it. -If the workflow is paused queued waiting tasks will not run (unless manually -triggered) until the workflow is resumed, even if the queue empties out. This -includes tasks queued by manual triggering, when queue limits are in effect. +Triggering a queued task runs it immediately. + +Queues release tasks to run when their active task count drops below the queue +limit. So, depending on the task count, you may need to trigger a task twice to make +it run immediately. Attempts to trigger active tasks (submitted or running) will be ignored. From da043577b5a654664730d16c54e13db75484af3b Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 5 Dec 2024 10:38:14 +0000 Subject: [PATCH 08/18] test https://github.com/cylc/cylc-flow/issues/6398 --- .../functional/cylc-trigger/07-kill-trigger.t | 32 +++++++++++++++++++ .../cylc-trigger/07-kill-trigger/flow.cylc | 22 +++++++++++++ .../07-kill-trigger/reference.log | 2 ++ 3 files changed, 56 insertions(+) create mode 100644 tests/functional/cylc-trigger/07-kill-trigger.t create mode 100644 tests/functional/cylc-trigger/07-kill-trigger/flow.cylc create mode 100644 tests/functional/cylc-trigger/07-kill-trigger/reference.log diff --git a/tests/functional/cylc-trigger/07-kill-trigger.t b/tests/functional/cylc-trigger/07-kill-trigger.t new file mode 100644 index 00000000000..23ae84a6f37 --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger.t @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test killing a job, then re-triggering it whilst it is "held". +# See https://github.com/cylc/cylc-flow/issues/6398 + +. "$(dirname "$0")/test_header" + +set_test_number 2 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-run" \ + cylc play --debug --no-detach "${WORKFLOW_NAME}" + +purge diff --git a/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc new file mode 100644 index 00000000000..f0e6dbaefd6 --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc @@ -0,0 +1,22 @@ +[scheduler] + [[events]] + inactivity timeout = PT1M + +[scheduling] + [[graph]] + R1 = a + +[runtime] + [[a]] + script = """ + if [[ $CYLC_TASK_SUBMIT_NUMBER == 1 ]]; then + # wait for the scheduler to receive the started message, + # then kill the job + cylc__job__poll_grep_workflow_log -E '1/a.*running' + cylc kill "${CYLC_WORKFLOW_ID}//1/a" + fi + """ + [[[events]]] + # when the scheduler receives the failed message, trigger the task + # to run again, it should run instantly + failed handlers = cylc trigger "%(workflow)s//1/a" diff --git a/tests/functional/cylc-trigger/07-kill-trigger/reference.log b/tests/functional/cylc-trigger/07-kill-trigger/reference.log new file mode 100644 index 00000000000..4176c633d2e --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger/reference.log @@ -0,0 +1,2 @@ +1/a -triggered off [] in flow 1 +1/a -triggered off [] in flow 1 From fae72ed0b0d3df954f5655ad2112ef47ae14f1c0 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 09:53:37 +1300 Subject: [PATCH 09/18] Update cylc/flow/commands.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/commands.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 667bc6653f2..594f6587d5c 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -451,6 +451,11 @@ async def force_trigger_tasks( """Manual task trigger.""" validate.is_tasks(tasks) validate.flow_opts(flow, flow_wait) + if on_resume: + LOG.warning( + "The --on-resume option is deprecated and will be removed " + "at Cylc 8.5." + ) yield yield schd.pool.force_trigger_tasks( tasks, flow, flow_wait, flow_descr, on_resume From 9cf0fb683d5d53a4d548130ac7c9d1b01ab28005 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 09:54:13 +1300 Subject: [PATCH 10/18] Update cylc/flow/scripts/trigger.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/scripts/trigger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 489a82adce1..196bdfd11d9 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -24,8 +24,8 @@ Triggering a queued task runs it immediately. Queues release tasks to run when their active task count drops below the queue -limit. So, depending on the task count, you may need to trigger a task twice to make -it run immediately. +limit. So, depending on the task count, you may need to trigger a task twice +to make it run immediately. Attempts to trigger active tasks (submitted or running) will be ignored. From 9c5b722cb51052919b865b9a8f2d105339b3a776 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 09:54:43 +1300 Subject: [PATCH 11/18] Update cylc/flow/task_pool.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 87009a00fdc..3887c242241 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2145,7 +2145,7 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): Triggering a queued task will: - run it, regardless of queue limiting - Triggering an unqueued task will: + Triggering an non-queued task will: - queue it, if the queue is limiting activity - run it, if the queue is not limiting activity From 00f9c39eac3a8348340dc3bd7723b23ebb570df7 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 09:55:49 +1300 Subject: [PATCH 12/18] Update cylc/flow/task_pool.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_pool.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3887c242241..6408fb86c1e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2189,16 +2189,14 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): itask.waiting_on_job_prep = True if on_resume: - with suppress(KeyError): - # In case previously triggered without --on-resume. - # (It should have run already, but just in case). - self.tasks_to_trigger_now.remove(itask) self.tasks_to_trigger_on_resume.add(itask) + # In case previously triggered without --on-resume. + # (It should have run already, but just in case). + self.tasks_to_trigger_now.discard(itask) else: - with suppress(KeyError): - # In case previously triggered with --on-resume. - self.tasks_to_trigger_on_resume.remove(itask) self.tasks_to_trigger_now.add(itask) + # In case previously triggered with --on-resume. + self.tasks_to_trigger_on_resume.discard(itask) # Task may be set running before xtrigger is satisfied, # if so check/spawn if xtrigger sequential. From bdbcaa03d615525f3436e1601d9447512bbfa57d Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 10 Dec 2024 21:45:50 +0000 Subject: [PATCH 13/18] Update queue info in trigger help. --- cylc/flow/network/schema.py | 14 +++++++++----- cylc/flow/scripts/trigger.py | 8 ++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index d5f174e5b96..67d3c58ec84 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2216,12 +2216,16 @@ class Meta: Manually trigger tasks, even in a paused workflow. Triggering a task that is not yet queued will queue it. - + Triggering a queued task runs it immediately. - - Queues release tasks to run when their active task count - drops below the queue limit. So, depending on the task count, you - may need to trigger a task twice to make it run immediately. + + Cylc queues restrict the number of jobs that can be active + (submitted or running) at once. They release tasks to run + when their active task count drops below the queue limit. + So, depending on the active task count, you may need to + trigger a task twice to make it run immediately. + + Attempts to trigger active tasks will be ignored. Valid for: paused, running workflows. ''') diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 196bdfd11d9..cd1f4104aaa 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -23,11 +23,11 @@ Triggering a queued task runs it immediately. -Queues release tasks to run when their active task count drops below the queue -limit. So, depending on the task count, you may need to trigger a task twice -to make it run immediately. +Cylc queues restrict the number of jobs that can be active (submitted or +running) at once. They release tasks to run when their active task count +drops below the queue limit. -Attempts to trigger active tasks (submitted or running) will be ignored. +Attempts to trigger active tasks will be ignored. Examples: # trigger task foo in cycle 1234 in test From 38e5ab96913c646b4a5c087f2aea069795a2db91 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 00:49:28 +0000 Subject: [PATCH 14/18] Update a test. --- tests/integration/run_modes/test_skip.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index fb58a82d427..5c9c31a6d56 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -185,14 +185,14 @@ async def test_doesnt_release_held_tasks( itask.state.is_held = True # Relinquish contol to the main loop. - schd.release_queued_tasks() + schd.release_tasks_to_run() assert not log_filter(contains='=> running'), msg.format('run') assert not log_filter(contains='=> succeeded'), msg.format('succeed') # Release held task and assert that it now skips successfully: schd.pool.release_held_tasks(['1/one']) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert log_filter(contains='=> running'), msg.format('run') assert log_filter(contains='=> succeeded'), msg.format('succeed') From c4ef803d92da1fc5faa76eb5dd083ae4ffba569d Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 01:21:43 +0000 Subject: [PATCH 15/18] Fix new functional test. --- tests/functional/cylc-trigger/07-kill-trigger.t | 2 +- tests/functional/cylc-trigger/07-kill-trigger/flow.cylc | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/functional/cylc-trigger/07-kill-trigger.t b/tests/functional/cylc-trigger/07-kill-trigger.t index 23ae84a6f37..81e6de89284 100644 --- a/tests/functional/cylc-trigger/07-kill-trigger.t +++ b/tests/functional/cylc-trigger/07-kill-trigger.t @@ -27,6 +27,6 @@ install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" workflow_run_ok "${TEST_NAME_BASE}-run" \ - cylc play --debug --no-detach "${WORKFLOW_NAME}" + cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" purge diff --git a/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc index f0e6dbaefd6..2242e2ebaed 100644 --- a/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc +++ b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc @@ -1,6 +1,7 @@ [scheduler] [[events]] inactivity timeout = PT1M + expected task failures = "1/a" [scheduling] [[graph]] @@ -14,6 +15,10 @@ # then kill the job cylc__job__poll_grep_workflow_log -E '1/a.*running' cylc kill "${CYLC_WORKFLOW_ID}//1/a" + # do not succeed immediately after issuing the kill command or the + # workflow may shut down as complete before registering task failure + # (This polling grep will never complete, but you know what I mean.) + cylc__job__poll_grep_workflow_log -E '1/a.*failed' fi """ [[[events]]] From 7d6d82e9af3a60e08a9efdf8823d962cd5270df1 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 11 Dec 2024 01:59:59 +0000 Subject: [PATCH 16/18] Add a comment. --- cylc/flow/scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index fb3dd373e1f..157de87e846 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -679,6 +679,9 @@ async def run_scheduler(self) -> None: # If we shut down with manually triggered waiting tasks, # submit them to run now. + # NOTE: this will run tasks that were triggered with + # the trigger "--on-resume" option, even if the workflow + # is restarted as paused. Option to be removed at 8.5.0. pre_prep_tasks = [] for itask in self.pool.get_tasks(): if ( From a24623691237cda7abe5daffdbc77cb45f852740 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 15 Dec 2024 22:48:05 +0000 Subject: [PATCH 17/18] Revert flow-assignment test. --- tests/integration/test_flow_assignment.py | 163 ++++------------------ 1 file changed, 30 insertions(+), 133 deletions(-) diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py index 7f186e86525..ea729efeb7b 100644 --- a/tests/integration/test_flow_assignment.py +++ b/tests/integration/test_flow_assignment.py @@ -81,14 +81,11 @@ async def test_get_flow_nums(one: Scheduler, start): assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} -async def test_flow_assignment_set( - flow: 'Fixture', - scheduler: 'Fixture', - start: 'Fixture', - log_filter: Callable, - capture_submission: 'Fixture', +@pytest.mark.parametrize('command', ['trigger', 'set']) +async def test_flow_assignment( + flow, scheduler, start, command: str, log_filter: Callable ): - """Test flow assignment when setting tasks. + """Test flow assignment when triggering/setting tasks. Active tasks: By default keep existing flows, else merge with requested flows. @@ -113,65 +110,53 @@ async def test_flow_assignment_set( } id_ = flow(conf) schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd) as log: - - # capture task submissions (prevents real submissions) - submitted_tasks = capture_submission(schd) - - command = "set" - do_command = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] - ) - - # get active foo and bar (which is which doesn't matter) - # ("active" here means in the active task pool). - active_x, active_y = schd.pool.get_tasks() - schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_x.flow_nums == {1} - assert active_y.flow_nums == {1, 2} + async with start(schd): + if command == 'set': + do_command: Callable = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + else: + do_command = schd.pool.force_trigger_tasks + + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == {1} + assert active_b.flow_nums == {1, 2} # -----(1. Test active tasks)----- - # Note this also tests that setting prerequisites - # By default active tasks keep existing flow assignment. - do_command([active_x.identity], flow=[]) - assert active_x.flow_nums == {1} + do_command([active_a.identity], flow=[]) + assert active_a.flow_nums == {1} # Else merge existing flow with requested flows. - do_command([active_x.identity], flow=[FLOW_ALL]) - assert active_x.flow_nums == {1, 2} + do_command([active_a.identity], flow=[FLOW_ALL]) + assert active_a.flow_nums == {1, 2} # (no-flow is ignored for active tasks) - do_command([active_x.identity], flow=[FLOW_NONE]) - assert active_x.flow_nums == {1, 2} + do_command([active_a.identity], flow=[FLOW_NONE]) + assert active_a.flow_nums == {1, 2} assert log_filter( contains=( - f'[{active_x}] ignoring \'flow=none\' {command}: ' - f'task already has {repr_flow_nums(active_x.flow_nums)}' + f'[{active_a}] ignoring \'flow=none\' {command}: ' + f'task already has {repr_flow_nums(active_a.flow_nums)}' ), level=logging.ERROR ) - do_command([active_x.identity], flow=[FLOW_NEW]) - assert active_x.flow_nums == {1, 2, 3} + do_command([active_a.identity], flow=[FLOW_NEW]) + assert active_a.flow_nums == {1, 2, 3} # -----(2. Test inactive tasks)----- - do_command = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] - ) + if command == 'set': + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) # By default inactive tasks get all active flows. do_command(['1/a'], flow=[]) assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} - # set --pre=all should not cause a task to submit in a paused workflow - assert len(submitted_tasks) == 0 - # but triggering it should - schd.pool.force_trigger_tasks(['1/a'], []) - schd.release_tasks_to_run() - assert len(submitted_tasks) == 1 - # Else assign requested flows. do_command(['1/b'], flow=[FLOW_NONE]) assert schd.pool._get_task_by_id('1/b').flow_nums == set() @@ -181,93 +166,5 @@ async def test_flow_assignment_set( do_command(['1/d'], flow=[FLOW_ALL]) assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} - - do_command(['1/e'], flow=[7]) - assert schd.pool._get_task_by_id('1/e').flow_nums == {7} - - -async def test_flow_assignment_trigger( - flow, scheduler, start, log_filter: Callable -): - """Test flow assignment when triggering tasks. - - Active tasks: - By default keep existing flows, else merge with requested flows. - Inactive tasks: - By default assign active flows; else assign requested flows. - - Note this differs from the 'set' test above because triggering a - task once makes it active (even in a paused workflow) after which - additional triggers get ignored. - - """ - conf = { - 'scheduler': { - 'allow implicit tasks': 'True' - }, - 'scheduling': { - 'graph': { - 'R1': "foo & bar => a & b & c & d & e" - } - }, - 'runtime': { - 'foo': { - 'outputs': {'x': 'x'} - } - }, - } - id_ = flow(conf) - schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd) as log: - - command = 'trigger' - do_command = schd.pool.force_trigger_tasks - - # get active foo and bar (which is which doesn't matter) - # ("active" here means in the active task pool). - active_x, active_y = schd.pool.get_tasks() - schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_x.flow_nums == {1} - assert active_y.flow_nums == {1, 2} - - # -----(1. Test active tasks)----- - - # By default active tasks keep existing flow assignment. - # This trigger makes the task literally active (submitted, running) - do_command([active_x.identity], flow=[]) - schd.release_tasks_to_run() - assert active_x.flow_nums == {1} - - # Triggering it again will be ignored (already active). - do_command([active_x.identity], flow=[FLOW_ALL]) - schd.release_tasks_to_run() - assert active_x.flow_nums == {1} - assert log_filter( - contains=( - f'[{active_x}] ignoring trigger - already active' - ), - level=logging.ERROR - ) - - # -----(2. Test inactive tasks)----- - # By default inactive tasks get all active flows. - do_command(['1/a'], flow=[]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2} - - # Else assign requested flows. - do_command(['1/b'], flow=[FLOW_NONE]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/b').flow_nums == set() - - do_command(['1/c'], flow=[FLOW_NEW]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/c').flow_nums == {3} - - do_command(['1/d'], flow=[FLOW_ALL]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3} - do_command(['1/e'], flow=[7]) - schd.release_tasks_to_run() assert schd.pool._get_task_by_id('1/e').flow_nums == {7} From 35a8e0eb3696f60fd01f6eac9af06f88afc2c5f9 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 15 Dec 2024 23:02:33 +0000 Subject: [PATCH 18/18] Add integration test. --- cylc/flow/task_pool.py | 6 +-- tests/integration/test_force_trigger.py | 63 ++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 5b35773d8ca..4732f43d587 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2208,7 +2208,7 @@ def force_trigger_tasks( flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None, - now: bool = False + on_resume: bool = False ): """Manually trigger tasks. @@ -2244,7 +2244,7 @@ def force_trigger_tasks( LOG.error(f"[{itask}] ignoring trigger - already active") continue self.merge_flows(itask, flow_nums) - self._force_trigger(itask, now) + self._force_trigger(itask, on_resume) # Spawn and trigger inactive tasks. if not flow: @@ -2279,7 +2279,7 @@ def force_trigger_tasks( # run it (or run it again for incomplete flow-wait) self.add_to_pool(itask) - self._force_trigger(itask, now) + self._force_trigger(itask, on_resume) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py index b5f2c4b3f4d..7c38c0cec35 100644 --- a/tests/integration/test_force_trigger.py +++ b/tests/integration/test_force_trigger.py @@ -66,7 +66,7 @@ async def test_trigger_workflow_paused( schd.release_tasks_to_run() assert len(submitted_tasks) == 1 - # manually trigger 1/y - it should not be queued but not submitted + # manually trigger 1/y - it should be queued but not submitted # (queue limit reached) schd.pool.force_trigger_tasks(['1/y'], [1]) schd.release_tasks_to_run() @@ -87,3 +87,64 @@ async def test_trigger_workflow_paused( level=logging.ERROR, contains="ignoring trigger - already active" ) + + +async def test_trigger_on_resume( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + capture_submission: 'Fixture', + log_filter: Callable +): + """ + Test manual triggering on-resume option when the workflow is paused. + + https://github.com/cylc/cylc-flow/issues/6192 + + """ + id_ = flow({ + 'scheduling': { + 'queues': { + 'default': { + 'limit': 1, + }, + }, + 'graph': { + 'R1': ''' + a => x & y & z + ''', + }, + }, + }) + schd = scheduler(id_, paused_start=True) + + # start the scheduler (but don't set the main loop running) + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + # paused at start-up so no tasks should be submitted + assert len(submitted_tasks) == 0 + + # manually trigger 1/x - it not should be submitted + schd.pool.force_trigger_tasks(['1/x'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y - it should not be submitted + # (queue limit reached) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y again - it should not be submitted + # (triggering a queued task runs it) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # resume the workflow, both tasks should trigger now. + schd.resume_workflow() + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2