diff --git a/changes.d/6499.feat.md b/changes.d/6499.feat.md new file mode 100644 index 00000000000..d2b5f058302 --- /dev/null +++ b/changes.d/6499.feat.md @@ -0,0 +1 @@ +Manually triggered tasks now run immediately even if the workflow is paused. diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index c45194752ab..594f6587d5c 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -338,7 +338,7 @@ async def reload_workflow(schd: 'Scheduler'): # flush out preparing tasks before attempting reload schd.reload_pending = 'waiting for pending tasks to submit' - while schd.release_queued_tasks(): + while schd.release_tasks_to_run(): # Run the subset of main-loop functionality required to push # preparing through the submission pipeline and keep the workflow # responsive (e.g. to the `cylc stop` command). @@ -446,9 +446,17 @@ async def force_trigger_tasks( flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None, + on_resume: bool = False ): """Manual task trigger.""" validate.is_tasks(tasks) validate.flow_opts(flow, flow_wait) + if on_resume: + LOG.warning( + "The --on-resume option is deprecated and will be removed " + "at Cylc 8.5." + ) yield - yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr) + yield schd.pool.force_trigger_tasks( + tasks, flow, flow_wait, flow_descr, on_resume + ) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 3f6534b4188..67d3c58ec84 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2213,20 +2213,32 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments): class Trigger(Mutation, TaskMutation): class Meta: description = sstrip(''' - Manually trigger tasks. + Manually trigger tasks, even in a paused workflow. - Warning: waiting tasks that are queue-limited will be queued if - triggered, to submit as normal when released by the queue; queued - tasks will submit immediately if triggered, even if that violates - the queue limit (so you may need to trigger a queue-limited task - twice to get it to submit immediately). + Triggering a task that is not yet queued will queue it. + + Triggering a queued task runs it immediately. + + Cylc queues restrict the number of jobs that can be active + (submitted or running) at once. They release tasks to run + when their active task count drops below the queue limit. + So, depending on the active task count, you may need to + trigger a task twice to make it run immediately. + + Attempts to trigger active tasks will be ignored. Valid for: paused, running workflows. ''') resolver = partial(mutator, command='force_trigger_tasks') class Arguments(TaskMutation.Arguments, FlowMutationArguments): - ... + on_resume = Boolean( + default_value=False, + description=sstrip(''' + If the workflow is paused, wait until it is resumed before + running the triggered task(s). + ''') + ) def _mut_field(cls): diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index a2d1036d1da..bbb1bc38b27 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -674,6 +674,22 @@ async def run_scheduler(self) -> None: self.restart_remote_init() await commands.run_cmd(commands.poll_tasks(self, ['*/*'])) + # If we shut down with manually triggered waiting tasks, + # submit them to run now. + # NOTE: this will run tasks that were triggered with + # the trigger "--on-resume" option, even if the workflow + # is restarted as paused. Option to be removed at 8.5.0. + pre_prep_tasks = [] + for itask in self.pool.get_tasks(): + if ( + itask.is_manual_submit + and itask.state(TASK_STATUS_WAITING) + ): + itask.waiting_on_job_prep = True + pre_prep_tasks.append(itask) + + self.start_job_submission(pre_prep_tasks) + self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( *main_loop.get_runners( @@ -775,10 +791,10 @@ async def start(self): self.uuid_str = dict(params)['uuid_str'] else: self.uuid_str = str(uuid4()) + self.task_events_mgr.uuid_str = self.uuid_str self._configure_contact() await self.configure(params) - self.task_events_mgr.uuid_str = self.uuid_str except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) @@ -862,6 +878,7 @@ def _load_pool_from_db(self): self.xtrigger_mgr.load_xtrigger_for_restart) self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart( self.pool.load_abs_outputs_for_restart) + self.pool.load_db_tasks_to_hold() self.pool.update_flow_mgr() @@ -1428,8 +1445,8 @@ def run_event_handlers(self, event, reason=""): return self.workflow_event_handler.handle(self, event, str(reason)) - def release_queued_tasks(self) -> bool: - """Release queued tasks, and submit jobs. + def release_tasks_to_run(self) -> bool: + """Release queued or manually submitted tasks, and submit jobs. The task queue manages references to task proxies in the task pool. @@ -1453,13 +1470,24 @@ def release_queued_tasks(self) -> bool: submission). """ + pre_prep_tasks: Set['TaskProxy'] = set() if ( - not self.is_paused - and self.stop_mode is None + self.stop_mode is None and self.auto_restart_time is None and self.reload_pending is False ): - pre_prep_tasks = self.pool.release_queued_tasks() + if self.pool.tasks_to_trigger_now: + # manually triggered tasks to run now, workflow paused or not + pre_prep_tasks.update(self.pool.tasks_to_trigger_now) + self.pool.tasks_to_trigger_now = set() + + if not self.is_paused: + # release queued tasks + pre_prep_tasks.update(self.pool.release_queued_tasks()) + if self.pool.tasks_to_trigger_on_resume: + # and manually triggered tasks to run once workflow resumed + pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume) + self.pool.tasks_to_trigger_on_resume = set() elif ( ( @@ -1471,19 +1499,30 @@ def release_queued_tasks(self) -> bool: self.reload_pending ) ): - # don't release queued tasks, finish processing preparing tasks - pre_prep_tasks = [ + # finish processing preparing tasks first + pre_prep_tasks = { itask for itask in self.pool.get_tasks() if itask.state(TASK_STATUS_PREPARING) - ] + } # Return, if no tasks to submit. else: return False + if not pre_prep_tasks: return False - # Start the job submission process. + return self.start_job_submission(pre_prep_tasks) + + def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool: + """Start the job submission process for some tasks. + + Return True if any were started, else False. + + """ + if self.stop_mode is not None: + return False + self.is_updated = True self.reset_inactivity_timer() @@ -1493,9 +1532,10 @@ def release_queued_tasks(self) -> bool: log = LOG.debug if self.options.reftest or self.options.genref: log = LOG.info + for itask in self.task_job_mgr.submit_task_jobs( self.workflow, - pre_prep_tasks, + itasks, self.server.curve_auth, self.server.client_pub_key_dir, run_mode=self.get_run_mode() @@ -1737,7 +1777,7 @@ async def _main_loop(self) -> None: self.broadcast_mgr.check_ext_triggers( itask, self.ext_trigger_queue) - if itask.is_ready_to_run(): + if itask.is_ready_to_run() and not itask.is_manual_submit: self.pool.queue_task(itask) if self.xtrigger_mgr.sequential_spawn_next: @@ -1746,7 +1786,7 @@ async def _main_loop(self) -> None: if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.clock_expire_tasks() - self.release_queued_tasks() + self.release_tasks_to_run() if ( self.get_run_mode() == RunMode.SIMULATION diff --git a/cylc/flow/scripts/pause.py b/cylc/flow/scripts/pause.py index 7e0d3014326..8ae7b00e8d4 100644 --- a/cylc/flow/scripts/pause.py +++ b/cylc/flow/scripts/pause.py @@ -18,9 +18,10 @@ """cylc pause [OPTIONS] ARGS -Pause a workflow. +Suspend automatic job submission. -This suspends submission of all tasks in a workflow. +Manual triggering can still run tasks immediately, even if the workflow is +paused. Examples: # pause my_workflow diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 1e6ef913696..cd1f4104aaa 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,11 +17,17 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run regardless of prerequisites. +Force task(s) to run regardless of prerequisites, even in a paused workflow. -* Triggering an unqueued waiting task queues it, regardless of prerequisites. -* Triggering a queued task submits it, regardless of queue limiting. -* Triggering an active task has no effect (it already triggered). +Triggering a task that is not yet queued will queue it. + +Triggering a queued task runs it immediately. + +Cylc queues restrict the number of jobs that can be active (submitted or +running) at once. They release tasks to run when their active task count +drops below the queue limit. + +Attempts to trigger active tasks will be ignored. Examples: # trigger task foo in cycle 1234 in test @@ -74,13 +80,15 @@ $flow: [Flow!], $flowWait: Boolean, $flowDescr: String, + $onResume: Boolean, ) { trigger ( workflows: $wFlows, tasks: $tasks, flow: $flow, flowWait: $flowWait, - flowDescr: $flowDescr + flowDescr: $flowDescr, + onResume: $onResume, ) { result } @@ -96,7 +104,20 @@ def get_option_parser() -> COP: multiworkflow=True, argdoc=[FULL_ID_MULTI_ARG_DOC], ) + add_flow_opts(parser) + + parser.add_option( + "--on-resume", + help=( + "If the workflow is paused, wait until it is resumed before " + "running the triggered task(s). DEPRECATED - this will be " + "removed at Cylc 8.5." + ), + action="store_true", + default=False, + dest="on_resume" + ) return parser @@ -114,6 +135,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): 'flow': options.flow, 'flowWait': options.flow_wait, 'flowDescr': options.flow_descr, + 'onResume': options.on_resume, } } return await pclient.async_request('graphql', mutation_kwargs) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 92a337e9a2c..4732f43d587 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -172,7 +172,10 @@ def __init__( self.task_name_list, self.config.runtime['descendants'] ) + self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set() + self.tasks_to_trigger_now: Set['TaskProxy'] = set() + self.tasks_to_trigger_on_resume: Set['TaskProxy'] = set() def set_stop_task(self, task_id): """Set stop after a task.""" @@ -950,16 +953,9 @@ def unqueue_task(self, itask: TaskProxy) -> None: self.data_store_mgr.delta_task_state(itask) self.task_queue_mgr.remove_task(itask) - def release_queued_tasks(self): - """Return list of queue-released tasks awaiting job prep. + def count_active_tasks(self): + """Count active tasks and identify pre-prep tasks.""" - Note: - Tasks can hang about for a while between being released and - entering the PREPARING state for various reasons. This method - returns tasks which are awaiting job prep irrespective of whether - they have been previously returned. - - """ # count active tasks by name # {task_name: number_of_active_instances, ...} active_task_counter = Counter() @@ -985,6 +981,20 @@ def release_queued_tasks(self): # an active task active_task_counter.update([itask.tdef.name]) + return active_task_counter, pre_prep_tasks + + def release_queued_tasks(self): + """Return list of queue-released tasks awaiting job prep. + + Note: + Tasks can hang about for a while between being released and + entering the PREPARING state for various reasons. This method + returns tasks which are awaiting job prep irrespective of whether + they have been previously returned. + + """ + active_task_counter, pre_prep_tasks = self.count_active_tasks() + # release queued tasks released = self.task_queue_mgr.release_tasks(active_task_counter) @@ -2127,48 +2137,89 @@ def _get_flow_nums( for n in flow } - def _force_trigger(self, itask): - """Assumes task is in the pool""" - # TODO is this flag still needed, and consistent with "cylc set"? + def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): + """Process a manually triggered task, ready for job submission. + + Assumes the task is in the pool. + + Triggering a queued task will: + - run it, regardless of queue limiting + + Triggering an non-queued task will: + - queue it, if the queue is limiting activity + - run it, if the queue is not limiting activity + + After state reset and queue handling: + - if on_resume is False, add the task to tasks_to_trigger_now + - if on_resume is True, add the task to tasks_to_trigger_on_resume + + The scheduler will release tasks from the tasks_to_trigger sets. + + """ itask.is_manual_submit = True itask.reset_try_timers() + if itask.state_reset(TASK_STATUS_WAITING): # (could also be unhandled failed) self.data_store_mgr.delta_task_state(itask) - # (No need to set prerequisites satisfied here). - if itask.state.is_runahead: - # Release from runahead, and queue it. - self.rh_release_and_queue(itask) + + if itask.state_reset(is_runahead=False): + # Can force trigger runahead-limited tasks. + self.data_store_mgr.delta_task_state(itask) self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), itask.flow_nums ) - else: - # De-queue it to run now. - self.task_queue_mgr.force_release_task(itask) + + if not itask.state.is_queued: + # queue it if limiting + active, _ = self.count_active_tasks() + if self.task_queue_mgr.push_task_if_limited(itask, active): + itask.state_reset(is_queued=True) + self.data_store_mgr.delta_task_state(itask) + + elif self.task_queue_mgr.remove_task(itask): + # else release it from the queue to run now + itask.state_reset(is_queued=False) + self.data_store_mgr.delta_task_state(itask) + + if not itask.state.is_queued: + # If not queued now, record the task as ready to run. + itask.waiting_on_job_prep = True + + if on_resume: + self.tasks_to_trigger_on_resume.add(itask) + # In case previously triggered without --on-resume. + # (It should have run already, but just in case). + self.tasks_to_trigger_now.discard(itask) + else: + self.tasks_to_trigger_now.add(itask) + # In case previously triggered with --on-resume. + self.tasks_to_trigger_on_resume.discard(itask) + # Task may be set running before xtrigger is satisfied, # if so check/spawn if xtrigger sequential. self.check_spawn_psx_task(itask) def force_trigger_tasks( - self, items: Iterable[str], + self, + items: Iterable[str], flow: List[str], flow_wait: bool = False, - flow_descr: Optional[str] = None + flow_descr: Optional[str] = None, + on_resume: bool = False ): - """Force a task to trigger (user command). - - Always run the task, even if a previous run was flow-waited. + """Manually trigger tasks. - If the task did not run before in the flow: - - run it, and spawn on outputs unless flow-wait is set. + If a task did not run before in the flow: + - trigger it, and spawn on outputs unless flow-wait is set. (but load the previous outputs from the DB) - Else if the task ran before in the flow: + If a task ran before in the flow: - load previous outputs If the previous run was not flow-wait - - run it, and try to spawn on outputs + - trigger it, and try to spawn on outputs Else if the previous run was flow-wait: - just spawn (if not already spawned in this flow) unless flow-wait is set. @@ -2193,12 +2244,13 @@ def force_trigger_tasks( LOG.error(f"[{itask}] ignoring trigger - already active") continue self.merge_flows(itask, flow_nums) - self._force_trigger(itask) + self._force_trigger(itask, on_resume) # Spawn and trigger inactive tasks. if not flow: # default: assign to all active flows flow_nums = self._get_active_flow_nums() + for tdef, point in inactive: if not self.can_be_spawned(tdef.name, point): continue @@ -2227,7 +2279,7 @@ def force_trigger_tasks( # run it (or run it again for incomplete flow-wait) self.add_to_pool(itask) - self._force_trigger(itask) + self._force_trigger(itask, on_resume) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 4d98609df39..8d9134a6ac4 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -467,13 +467,9 @@ def next_point(self): def is_ready_to_run(self) -> bool: """Is this task ready to run? - Takes account of all dependence: on other tasks, xtriggers, and - old-style ext-triggers. Or, manual triggering. + Return True if not held, no active try timers, and prerequisites done. """ - if self.is_manual_submit: - # Manually triggered, ignore unsatisfied prerequisites. - return True if self.state.is_held: # A held task is not ready to run. return False diff --git a/cylc/flow/task_queues/__init__.py b/cylc/flow/task_queues/__init__.py index e19b68c25e7..f8c16725875 100644 --- a/cylc/flow/task_queues/__init__.py +++ b/cylc/flow/task_queues/__init__.py @@ -40,35 +40,38 @@ def __init__(self, * descendants: runtime family dict """ - pass + raise NotImplementedError @abstractmethod def push_task(self, itask: 'TaskProxy') -> None: """Queue the given task.""" - pass + raise NotImplementedError + + @abstractmethod + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Queue the task only if the queue limit is reached. + + Requires current active task counts. + Return True if queued, else False. + """ + raise NotImplementedError @abstractmethod def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]': """Release tasks, given current active task counts.""" - pass + raise NotImplementedError @abstractmethod def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" - pass - - @abstractmethod - def force_release_task(self, itask: 'TaskProxy') -> None: - """Remove a task from whichever queue it belongs to. - - To be returned when release_tasks() is next called. - """ - pass + raise NotImplementedError @abstractmethod def adopt_tasks(self, orphans: List[str]) -> None: """Adopt tasks with defs removed by scheduler reload or restart.""" - pass + raise NotImplementedError def _expand_families(self, qconfig: dict, diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index 3bac42fe0f5..5cf2215918d 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -40,6 +40,19 @@ def push_task(self, itask: 'TaskProxy') -> None: if itask.tdef.name in self.members: self.deque.appendleft(itask) + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Queue task if in my membership and the queue limit is reached.""" + n_active = sum(active[mem] for mem in self.members) + if ( + self.limit and n_active >= self.limit + and itask.tdef.name in self.members + ): + self.deque.appendleft(itask) + return True + return False + def release(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks if below the active limit.""" # The "active" argument counts active tasks by name. @@ -113,35 +126,31 @@ def __init__(self, config["limit"], config["members"] ) - self.force_released: Set['TaskProxy'] = set() - def push_task(self, itask: 'TaskProxy') -> None: """Push a task to the appropriate queue.""" for queue in self.queues.values(): queue.push_task(itask) + def push_task_if_limited( + self, itask: 'TaskProxy', active: Counter[str] + ) -> bool: + """Push a task to its queue only if the queue limit is reached.""" + return any( + queue.push_task_if_limited(itask, active) + for queue in self.queues.values() + ) + def release_tasks(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks up to the queue limits.""" released: List['TaskProxy'] = [] for queue in self.queues.values(): released += queue.release(active) - if self.force_released: - released.extend(self.force_released) - self.force_released = set() return released def remove_task(self, itask: 'TaskProxy') -> bool: """Try to remove a task from the queues. Return True if done.""" return any(queue.remove(itask) for queue in self.queues.values()) - def force_release_task(self, itask: 'TaskProxy') -> None: - """Remove a task from whichever queue it belongs to. - - To be returned when release_tasks() is next called. - """ - if self.remove_task(itask): - self.force_released.add(itask) - def adopt_tasks(self, orphans: List[str]) -> None: """Adopt orphaned tasks to the default group.""" self.queues[self.Q_DEFAULT].adopt(orphans) diff --git a/tests/functional/cylc-trigger/07-kill-trigger.t b/tests/functional/cylc-trigger/07-kill-trigger.t new file mode 100644 index 00000000000..81e6de89284 --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger.t @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test killing a job, then re-triggering it whilst it is "held". +# See https://github.com/cylc/cylc-flow/issues/6398 + +. "$(dirname "$0")/test_header" + +set_test_number 2 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-run" \ + cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" + +purge diff --git a/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc new file mode 100644 index 00000000000..2242e2ebaed --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger/flow.cylc @@ -0,0 +1,27 @@ +[scheduler] + [[events]] + inactivity timeout = PT1M + expected task failures = "1/a" + +[scheduling] + [[graph]] + R1 = a + +[runtime] + [[a]] + script = """ + if [[ $CYLC_TASK_SUBMIT_NUMBER == 1 ]]; then + # wait for the scheduler to receive the started message, + # then kill the job + cylc__job__poll_grep_workflow_log -E '1/a.*running' + cylc kill "${CYLC_WORKFLOW_ID}//1/a" + # do not succeed immediately after issuing the kill command or the + # workflow may shut down as complete before registering task failure + # (This polling grep will never complete, but you know what I mean.) + cylc__job__poll_grep_workflow_log -E '1/a.*failed' + fi + """ + [[[events]]] + # when the scheduler receives the failed message, trigger the task + # to run again, it should run instantly + failed handlers = cylc trigger "%(workflow)s//1/a" diff --git a/tests/functional/cylc-trigger/07-kill-trigger/reference.log b/tests/functional/cylc-trigger/07-kill-trigger/reference.log new file mode 100644 index 00000000000..4176c633d2e --- /dev/null +++ b/tests/functional/cylc-trigger/07-kill-trigger/reference.log @@ -0,0 +1,2 @@ +1/a -triggered off [] in flow 1 +1/a -triggered off [] in flow 1 diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t index 455cb289592..fd47201e9f7 100644 --- a/tests/functional/restart/58-waiting-manual-triggered.t +++ b/tests/functional/restart/58-waiting-manual-triggered.t @@ -40,6 +40,7 @@ __EOF__ # It should restart and shut down normally, not stall with 2/foo waiting on 1/foo. workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}" + # Check that 2/foo job 02 did run before shutdown. grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo\/02:running\] => succeeded" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6300cefc74e..cfcd137b453 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -55,6 +55,7 @@ from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id from cylc.flow.workflow_status import StopMode +from cylc.flow.task_state import TASK_STATUS_SUBMITTED from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -425,6 +426,8 @@ def _disable_submission(schd: 'Scheduler') -> 'Set[TaskProxy]': def _submit_task_jobs(_, itasks, *args, **kwargs): nonlocal submitted_tasks + for itask in itasks: + itask.state_reset(TASK_STATUS_SUBMITTED) submitted_tasks.update(itasks) return itasks diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index fb58a82d427..5c9c31a6d56 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -185,14 +185,14 @@ async def test_doesnt_release_held_tasks( itask.state.is_held = True # Relinquish contol to the main loop. - schd.release_queued_tasks() + schd.release_tasks_to_run() assert not log_filter(contains='=> running'), msg.format('run') assert not log_filter(contains='=> succeeded'), msg.format('succeed') # Release held task and assert that it now skips successfully: schd.pool.release_held_tasks(['1/one']) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert log_filter(contains='=> running'), msg.format('run') assert log_filter(contains='=> succeeded'), msg.format('succeed') diff --git a/tests/integration/scripts/test_dump.py b/tests/integration/scripts/test_dump.py index 3189f94e6d0..e4ea8c06de3 100644 --- a/tests/integration/scripts/test_dump.py +++ b/tests/integration/scripts/test_dump.py @@ -46,7 +46,7 @@ async def test_dump_tasks(flow, scheduler, start): }) schd = scheduler(id_) async with start(schd): - # schd.release_queued_tasks() + # schd.release_tasks_to_run() await schd.update_data_structure() ret = [] await dump( diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py new file mode 100644 index 00000000000..7c38c0cec35 --- /dev/null +++ b/tests/integration/test_force_trigger.py @@ -0,0 +1,150 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from typing import ( + Any as Fixture, + Callable +) + +import logging + +async def test_trigger_workflow_paused( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + capture_submission: 'Fixture', + log_filter: Callable +): + """ + Test manual triggering when the workflow is paused. + + The usual queue limiting behaviour is expected. + + https://github.com/cylc/cylc-flow/issues/6192 + + """ + id_ = flow({ + 'scheduling': { + 'queues': { + 'default': { + 'limit': 1, + }, + }, + 'graph': { + 'R1': ''' + a => x & y & z + ''', + }, + }, + }) + schd = scheduler(id_, paused_start=True) + + # start the scheduler (but don't set the main loop running) + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + # paused at start-up so no tasks should be submitted + assert len(submitted_tasks) == 0 + + # manually trigger 1/x - it should be submitted + schd.pool.force_trigger_tasks(['1/x'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 1 + + # manually trigger 1/y - it should be queued but not submitted + # (queue limit reached) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 1 + + # manually trigger 1/y again - it should be submitted + # (triggering a queued task runs it) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2 + + # manually trigger 1/y yet again - the trigger should be ignored + # (task already active) + schd.pool.force_trigger_tasks(['1/y'], [1]) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2 + assert log_filter( + level=logging.ERROR, + contains="ignoring trigger - already active" + ) + + +async def test_trigger_on_resume( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + capture_submission: 'Fixture', + log_filter: Callable +): + """ + Test manual triggering on-resume option when the workflow is paused. + + https://github.com/cylc/cylc-flow/issues/6192 + + """ + id_ = flow({ + 'scheduling': { + 'queues': { + 'default': { + 'limit': 1, + }, + }, + 'graph': { + 'R1': ''' + a => x & y & z + ''', + }, + }, + }) + schd = scheduler(id_, paused_start=True) + + # start the scheduler (but don't set the main loop running) + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + # paused at start-up so no tasks should be submitted + assert len(submitted_tasks) == 0 + + # manually trigger 1/x - it not should be submitted + schd.pool.force_trigger_tasks(['1/x'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y - it should not be submitted + # (queue limit reached) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y again - it should not be submitted + # (triggering a queued task runs it) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # resume the workflow, both tasks should trigger now. + schd.resume_workflow() + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2 diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index fd0e6281be8..da4ff989b3e 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -192,7 +192,7 @@ async def test_expire_orthogonality(flow, scheduler, start): # wait for the task to submit while not a_1.state(TASK_STATUS_WAITING, TASK_STATUS_PREPARING): - schd.release_queued_tasks() + schd.release_tasks_to_run() # NOTE: The submit number isn't presently incremented via this code # pathway so we have to hack it here. If the task messages in this test diff --git a/tests/integration/test_queues.py b/tests/integration/test_queues.py index 7da83e1a1aa..b3bd1eb564d 100644 --- a/tests/integration/test_queues.py +++ b/tests/integration/test_queues.py @@ -87,13 +87,13 @@ async def test_queue_release( # (if scheduler is paused we should not have any submissions) # (otherwise a number of tasks up to the limit should be released) schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == expected_submissions for _ in range(3): # release runahead/queued tasks # (no further tasks should be released) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == expected_submissions @@ -125,7 +125,7 @@ async def test_queue_held_tasks( # release queued tasks # (no tasks should be released from the queues because they are held) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == 0 # un-hold tasks @@ -133,5 +133,5 @@ async def test_queue_held_tasks( # release queued tasks # (tasks should now be released from the queues) - schd.release_queued_tasks() + schd.release_tasks_to_run() assert len(submitted_tasks) == 1 diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 6f1f581e899..63f85a84253 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -24,6 +24,7 @@ from cylc.flow import commands from cylc.flow.exceptions import CylcError +from cylc.flow.flow_mgr import FLOW_ALL from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.scheduler import Scheduler, SchedulerStop from cylc.flow.task_state import ( @@ -170,7 +171,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release runahead/queued tasks # (nothing should happen because the scheduler is paused) one.pool.release_runahead_tasks() - one.release_queued_tasks() + one.release_tasks_to_run() assert submitted_tasks == set() # hold all tasks & resume the workflow @@ -179,7 +180,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (there should be no change because the task is still held) - one.release_queued_tasks() + one.release_tasks_to_run() assert submitted_tasks == set() # release all tasks @@ -187,7 +188,7 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (the task should be submitted) - one.release_queued_tasks() + one.release_tasks_to_run() assert len(submitted_tasks) == 1 @@ -331,9 +332,10 @@ async def test_restart_timeout( flow, one_conf, scheduler, - start, run, - log_filter + log_filter, + complete, + capture_submission, ): """It should wait for user input if there are no tasks in the pool. @@ -349,31 +351,28 @@ async def test_restart_timeout( id_ = flow(one_conf) # run the workflow to completion - # (by setting the only task to completed) - schd = scheduler(id_, paused_start=False) - async with start(schd) as log: - for itask in schd.pool.get_tasks(): - # (needed for job config in sim mode:) - schd.task_job_mgr.submit_task_jobs( - schd.workflow, [itask], None, None) - schd.pool.set_prereqs_and_outputs( - [itask.identity], None, None, ['all']) + schd: Scheduler = scheduler(id_, paused_start=False) + async with run(schd): + await complete(schd) # restart the completed workflow - schd = scheduler(id_) + schd = scheduler(id_, paused_start=False) async with run(schd): # it should detect that the workflow has completed and alert the user assert log_filter( + logging.WARNING, contains='This workflow already ran to completion.' ) # it should activate a timeout - assert log_filter(contains='restart timer starts NOW') + assert log_filter(logging.WARNING, contains='restart timer starts NOW') + capture_submission(schd) # when we trigger tasks the timeout should be cleared - schd.pool.force_trigger_tasks(['1/one'], {1}) + schd.pool.force_trigger_tasks(['1/one'], [FLOW_ALL]) + await asyncio.sleep(0) # yield control to the main loop - assert log_filter(contains='restart timer stopped') + assert log_filter(logging.INFO, contains='restart timer stopped') @pytest.mark.parametrize("signal", ((SIGHUP), (SIGINT), (SIGTERM))) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 3fb0becadd6..94eb780a177 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -701,7 +701,7 @@ async def test_restart_prereqs( async with start(schd): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z @@ -824,7 +824,7 @@ async def test_reload_prereqs( async with start(schd): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == expected_1 # Mark 1/a as succeeded and spawn 1/z @@ -861,7 +861,7 @@ async def _test_restart_prereqs_sat(): # Release tasks 1/a and 1/b schd.pool.release_runahead_tasks() - schd.release_queued_tasks() + schd.release_tasks_to_run() assert list_tasks(schd) == [ ('1', 'a', 'running'), ('1', 'b', 'running') @@ -2171,48 +2171,6 @@ async def list_data_store(): 'c': 'wall_clock(trigger_time=946688400)', } - -async def test_trigger_unqueued(flow, scheduler, start): - """Test triggering an unqueued active task. - - It should not add to the force_released list. - See https://github.com/cylc/cylc-flow/pull/6337 - - """ - conf = { - 'scheduler': {'allow implicit tasks': 'True'}, - 'scheduling': { - 'graph': { - 'R1': 'a & b => c' - } - } - } - schd = scheduler( - flow(conf), - run_mode='simulation', - paused_start=False - ) - - async with start(schd): - # Release tasks 1/a and 1/b - schd.pool.release_runahead_tasks() - schd.release_queued_tasks() - assert pool_get_task_ids(schd.pool) == ['1/a', '1/b'] - - # Mark 1/a as succeeded and spawn 1/c - task_a = schd.pool.get_task(IntegerPoint("1"), "a") - schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded') - assert pool_get_task_ids(schd.pool) == ['1/b', '1/c'] - - # Trigger the partially satisified (and not queued) task 1/c - schd.pool.force_trigger_tasks(['1/c'], [FLOW_ALL]) - - # It should not add to the queue managers force_released list. - assert not schd.pool.task_queue_mgr.force_released, ( - "Triggering an unqueued task should not affect the force_released list" - ) - - @pytest.mark.parametrize('expire_type', ['clock-expire', 'manual']) async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type): """An expired waiting task should be removed from any queues. diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index ccd5f5dfed5..9163b9e3567 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -91,8 +91,8 @@ def test_should_auto_restart_now( assert Scheduler.should_auto_restart_now(mock_schd) == expected -def test_release_queued_tasks__auto_restart(): - """Test that Scheduler.release_queued_tasks() works as expected +def test_release_tasks_to_run__auto_restart(): + """Test that Scheduler.release_tasks_to_run() works as expected during auto restart.""" mock_schd = Mock( auto_restart_time=(time() - 100), @@ -107,10 +107,12 @@ def test_release_queued_tasks__auto_restart(): options=RunOptions(), task_job_mgr=MagicMock() ) - Scheduler.release_queued_tasks(mock_schd) + Scheduler.release_tasks_to_run(mock_schd) # Should not actually release any more tasks, just submit the # preparing ones mock_schd.pool.release_queued_tasks.assert_not_called() + + Scheduler.start_job_submission(mock_schd, mock_schd.pool.get_tasks()) mock_schd.task_job_mgr.submit_task_jobs.assert_called()