Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger now if workflow paused #6499

Merged
merged 19 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6499.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Manually triggered tasks now run immediately even if the workflow is paused.
12 changes: 10 additions & 2 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@

# flush out preparing tasks before attempting reload
schd.reload_pending = 'waiting for pending tasks to submit'
while schd.release_queued_tasks():
while schd.release_tasks_to_run():
# Run the subset of main-loop functionality required to push
# preparing through the submission pipeline and keep the workflow
# responsive (e.g. to the `cylc stop` command).
Expand Down Expand Up @@ -446,9 +446,17 @@
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None,
on_resume: bool = False
):
"""Manual task trigger."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait)
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
if on_resume:
LOG.warning(

Check warning on line 455 in cylc/flow/commands.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/commands.py#L455

Added line #L455 was not covered by tests
"The --on-resume option is deprecated and will be removed "
"at Cylc 8.5."
)
yield
yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr)
yield schd.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr, on_resume
)
26 changes: 19 additions & 7 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2213,20 +2213,32 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments):
class Trigger(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Manually trigger tasks.
Manually trigger tasks, even in a paused workflow.

Warning: waiting tasks that are queue-limited will be queued if
triggered, to submit as normal when released by the queue; queued
tasks will submit immediately if triggered, even if that violates
the queue limit (so you may need to trigger a queue-limited task
twice to get it to submit immediately).
Triggering a task that is not yet queued will queue it.

Triggering a queued task runs it immediately.

Cylc queues restrict the number of jobs that can be active
(submitted or running) at once. They release tasks to run
when their active task count drops below the queue limit.
So, depending on the active task count, you may need to
trigger a task twice to make it run immediately.

Attempts to trigger active tasks will be ignored.

Valid for: paused, running workflows.
''')
resolver = partial(mutator, command='force_trigger_tasks')

class Arguments(TaskMutation.Arguments, FlowMutationArguments):
...
on_resume = Boolean(
default_value=False,
description=sstrip('''
If the workflow is paused, wait until it is resumed before
running the triggered task(s).
''')
)


def _mut_field(cls):
Expand Down
66 changes: 53 additions & 13 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,22 @@
self.restart_remote_init()
await commands.run_cmd(commands.poll_tasks(self, ['*/*']))

# If we shut down with manually triggered waiting tasks,
# submit them to run now.
# NOTE: this will run tasks that were triggered with
# the trigger "--on-resume" option, even if the workflow
# is restarted as paused. Option to be removed at 8.5.0.
pre_prep_tasks = []
for itask in self.pool.get_tasks():
if (
itask.is_manual_submit
and itask.state(TASK_STATUS_WAITING)
):
itask.waiting_on_job_prep = True
pre_prep_tasks.append(itask)

self.start_job_submission(pre_prep_tasks)

self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
*main_loop.get_runners(
Expand Down Expand Up @@ -778,10 +794,10 @@
self.uuid_str = dict(params)['uuid_str']
else:
self.uuid_str = str(uuid4())
self.task_events_mgr.uuid_str = self.uuid_str

self._configure_contact()
await self.configure(params)
self.task_events_mgr.uuid_str = self.uuid_str
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -865,6 +881,7 @@
self.xtrigger_mgr.load_xtrigger_for_restart)
self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)

self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()

Expand Down Expand Up @@ -1434,8 +1451,8 @@
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> bool:
"""Release queued tasks, and submit jobs.
def release_tasks_to_run(self) -> bool:
"""Release queued or manually submitted tasks, and submit jobs.

The task queue manages references to task proxies in the task pool.

Expand All @@ -1459,13 +1476,24 @@
submission).

"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
not self.is_paused
and self.stop_mode is None
self.stop_mode is None
and self.auto_restart_time is None
and self.reload_pending is False
):
pre_prep_tasks = self.pool.release_queued_tasks()
if self.pool.tasks_to_trigger_now:
# manually triggered tasks to run now, workflow paused or not
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())
if self.pool.tasks_to_trigger_on_resume:
# and manually triggered tasks to run once workflow resumed
pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume)
self.pool.tasks_to_trigger_on_resume = set()

Check warning on line 1496 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1495-L1496

Added lines #L1495 - L1496 were not covered by tests
Comment on lines +1487 to +1490
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think codecov is right, these lines are untested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I was waiting for agreement to add the option before adding tests for it, then failed to come back to the PR till now.


elif (
(
Expand All @@ -1477,19 +1505,30 @@
self.reload_pending
)
):
# don't release queued tasks, finish processing preparing tasks
pre_prep_tasks = [
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
]
}

# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

# Start the job submission process.
return self.start_job_submission(pre_prep_tasks)

def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
"""Start the job submission process for some tasks.

Return True if any were started, else False.

"""
if self.stop_mode is not None:
return False

self.is_updated = True
self.reset_inactivity_timer()

Expand All @@ -1499,9 +1538,10 @@
log = LOG.debug
if self.options.reftest or self.options.genref:
log = LOG.info

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
pre_prep_tasks,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.get_run_mode()
Expand Down Expand Up @@ -1743,7 +1783,7 @@
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if itask.is_ready_to_run():
if itask.is_ready_to_run() and not itask.is_manual_submit:
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
self.pool.queue_task(itask)

if self.xtrigger_mgr.sequential_spawn_next:
Expand All @@ -1752,7 +1792,7 @@
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
self.pool.clock_expire_tasks()
self.release_queued_tasks()
self.release_tasks_to_run()

if (
self.get_run_mode() == RunMode.SIMULATION
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/scripts/pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

"""cylc pause [OPTIONS] ARGS

Pause a workflow.
Suspend automatic job submission.

This suspends submission of all tasks in a workflow.
Manual triggering can still run tasks immediately, even if the workflow is
paused.

Examples:
# pause my_workflow
Expand Down
32 changes: 27 additions & 5 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

"""cylc trigger [OPTIONS] ARGS

Force tasks to run regardless of prerequisites.
Force task(s) to run regardless of prerequisites, even in a paused workflow.

* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
Triggering a task that is not yet queued will queue it.

Triggering a queued task runs it immediately.

Cylc queues restrict the number of jobs that can be active (submitted or
running) at once. They release tasks to run when their active task count
drops below the queue limit.

Attempts to trigger active tasks will be ignored.

Examples:
# trigger task foo in cycle 1234 in test
Expand Down Expand Up @@ -74,13 +80,15 @@
$flow: [Flow!],
$flowWait: Boolean,
$flowDescr: String,
$onResume: Boolean,
) {
trigger (
workflows: $wFlows,
tasks: $tasks,
flow: $flow,
flowWait: $flowWait,
flowDescr: $flowDescr
flowDescr: $flowDescr,
onResume: $onResume,
) {
result
}
Expand All @@ -96,7 +104,20 @@ def get_option_parser() -> COP:
multiworkflow=True,
argdoc=[FULL_ID_MULTI_ARG_DOC],
)

add_flow_opts(parser)

parser.add_option(
"--on-resume",
help=(
"If the workflow is paused, wait until it is resumed before "
"running the triggered task(s). DEPRECATED - this will be "
"removed at Cylc 8.5."
),
action="store_true",
default=False,
dest="on_resume"
)
return parser


Expand All @@ -114,6 +135,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
'flow': options.flow,
'flowWait': options.flow_wait,
'flowDescr': options.flow_descr,
'onResume': options.on_resume,
}
}
return await pclient.async_request('graphql', mutation_kwargs)
Expand Down
Loading
Loading