Skip to content

Commit

Permalink
Merge pull request #6499 from hjoliver/trigger-when-paused_2
Browse files Browse the repository at this point in the history
Trigger now if workflow paused
  • Loading branch information
hjoliver authored Dec 15, 2024
2 parents d21c9e5 + 35a8e0e commit 6a4ec1b
Show file tree
Hide file tree
Showing 23 changed files with 482 additions and 164 deletions.
1 change: 1 addition & 0 deletions changes.d/6499.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Manually triggered tasks now run immediately even if the workflow is paused.
12 changes: 10 additions & 2 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def reload_workflow(schd: 'Scheduler'):

# flush out preparing tasks before attempting reload
schd.reload_pending = 'waiting for pending tasks to submit'
while schd.release_queued_tasks():
while schd.release_tasks_to_run():
# Run the subset of main-loop functionality required to push
# preparing through the submission pipeline and keep the workflow
# responsive (e.g. to the `cylc stop` command).
Expand Down Expand Up @@ -446,9 +446,17 @@ async def force_trigger_tasks(
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None,
on_resume: bool = False
):
"""Manual task trigger."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait)
if on_resume:
LOG.warning(
"The --on-resume option is deprecated and will be removed "
"at Cylc 8.5."
)
yield
yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr)
yield schd.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr, on_resume
)
26 changes: 19 additions & 7 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2213,20 +2213,32 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments):
class Trigger(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Manually trigger tasks.
Manually trigger tasks, even in a paused workflow.
Warning: waiting tasks that are queue-limited will be queued if
triggered, to submit as normal when released by the queue; queued
tasks will submit immediately if triggered, even if that violates
the queue limit (so you may need to trigger a queue-limited task
twice to get it to submit immediately).
Triggering a task that is not yet queued will queue it.
Triggering a queued task runs it immediately.
Cylc queues restrict the number of jobs that can be active
(submitted or running) at once. They release tasks to run
when their active task count drops below the queue limit.
So, depending on the active task count, you may need to
trigger a task twice to make it run immediately.
Attempts to trigger active tasks will be ignored.
Valid for: paused, running workflows.
''')
resolver = partial(mutator, command='force_trigger_tasks')

class Arguments(TaskMutation.Arguments, FlowMutationArguments):
...
on_resume = Boolean(
default_value=False,
description=sstrip('''
If the workflow is paused, wait until it is resumed before
running the triggered task(s).
''')
)


def _mut_field(cls):
Expand Down
66 changes: 53 additions & 13 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,22 @@ async def run_scheduler(self) -> None:
self.restart_remote_init()
await commands.run_cmd(commands.poll_tasks(self, ['*/*']))

# If we shut down with manually triggered waiting tasks,
# submit them to run now.
# NOTE: this will run tasks that were triggered with
# the trigger "--on-resume" option, even if the workflow
# is restarted as paused. Option to be removed at 8.5.0.
pre_prep_tasks = []
for itask in self.pool.get_tasks():
if (
itask.is_manual_submit
and itask.state(TASK_STATUS_WAITING)
):
itask.waiting_on_job_prep = True
pre_prep_tasks.append(itask)

self.start_job_submission(pre_prep_tasks)

self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
*main_loop.get_runners(
Expand Down Expand Up @@ -775,10 +791,10 @@ async def start(self):
self.uuid_str = dict(params)['uuid_str']
else:
self.uuid_str = str(uuid4())
self.task_events_mgr.uuid_str = self.uuid_str

self._configure_contact()
await self.configure(params)
self.task_events_mgr.uuid_str = self.uuid_str
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -862,6 +878,7 @@ def _load_pool_from_db(self):
self.xtrigger_mgr.load_xtrigger_for_restart)
self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)

self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()

Expand Down Expand Up @@ -1428,8 +1445,8 @@ def run_event_handlers(self, event, reason=""):
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> bool:
"""Release queued tasks, and submit jobs.
def release_tasks_to_run(self) -> bool:
"""Release queued or manually submitted tasks, and submit jobs.
The task queue manages references to task proxies in the task pool.
Expand All @@ -1453,13 +1470,24 @@ def release_queued_tasks(self) -> bool:
submission).
"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
not self.is_paused
and self.stop_mode is None
self.stop_mode is None
and self.auto_restart_time is None
and self.reload_pending is False
):
pre_prep_tasks = self.pool.release_queued_tasks()
if self.pool.tasks_to_trigger_now:
# manually triggered tasks to run now, workflow paused or not
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())
if self.pool.tasks_to_trigger_on_resume:
# and manually triggered tasks to run once workflow resumed
pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume)
self.pool.tasks_to_trigger_on_resume = set()

elif (
(
Expand All @@ -1471,19 +1499,30 @@ def release_queued_tasks(self) -> bool:
self.reload_pending
)
):
# don't release queued tasks, finish processing preparing tasks
pre_prep_tasks = [
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
]
}

# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

# Start the job submission process.
return self.start_job_submission(pre_prep_tasks)

def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
"""Start the job submission process for some tasks.
Return True if any were started, else False.
"""
if self.stop_mode is not None:
return False

self.is_updated = True
self.reset_inactivity_timer()

Expand All @@ -1493,9 +1532,10 @@ def release_queued_tasks(self) -> bool:
log = LOG.debug
if self.options.reftest or self.options.genref:
log = LOG.info

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
pre_prep_tasks,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.get_run_mode()
Expand Down Expand Up @@ -1737,7 +1777,7 @@ async def _main_loop(self) -> None:
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if itask.is_ready_to_run():
if itask.is_ready_to_run() and not itask.is_manual_submit:
self.pool.queue_task(itask)

if self.xtrigger_mgr.sequential_spawn_next:
Expand All @@ -1746,7 +1786,7 @@ async def _main_loop(self) -> None:
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
self.pool.clock_expire_tasks()
self.release_queued_tasks()
self.release_tasks_to_run()

if (
self.get_run_mode() == RunMode.SIMULATION
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/scripts/pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

"""cylc pause [OPTIONS] ARGS
Pause a workflow.
Suspend automatic job submission.
This suspends submission of all tasks in a workflow.
Manual triggering can still run tasks immediately, even if the workflow is
paused.
Examples:
# pause my_workflow
Expand Down
32 changes: 27 additions & 5 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

"""cylc trigger [OPTIONS] ARGS
Force tasks to run regardless of prerequisites.
Force task(s) to run regardless of prerequisites, even in a paused workflow.
* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
Triggering a task that is not yet queued will queue it.
Triggering a queued task runs it immediately.
Cylc queues restrict the number of jobs that can be active (submitted or
running) at once. They release tasks to run when their active task count
drops below the queue limit.
Attempts to trigger active tasks will be ignored.
Examples:
# trigger task foo in cycle 1234 in test
Expand Down Expand Up @@ -74,13 +80,15 @@
$flow: [Flow!],
$flowWait: Boolean,
$flowDescr: String,
$onResume: Boolean,
) {
trigger (
workflows: $wFlows,
tasks: $tasks,
flow: $flow,
flowWait: $flowWait,
flowDescr: $flowDescr
flowDescr: $flowDescr,
onResume: $onResume,
) {
result
}
Expand All @@ -96,7 +104,20 @@ def get_option_parser() -> COP:
multiworkflow=True,
argdoc=[FULL_ID_MULTI_ARG_DOC],
)

add_flow_opts(parser)

parser.add_option(
"--on-resume",
help=(
"If the workflow is paused, wait until it is resumed before "
"running the triggered task(s). DEPRECATED - this will be "
"removed at Cylc 8.5."
),
action="store_true",
default=False,
dest="on_resume"
)
return parser


Expand All @@ -114,6 +135,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
'flow': options.flow,
'flowWait': options.flow_wait,
'flowDescr': options.flow_descr,
'onResume': options.on_resume,
}
}
return await pclient.async_request('graphql', mutation_kwargs)
Expand Down
Loading

0 comments on commit 6a4ec1b

Please sign in to comment.