Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parentless sequential clock trigger spawns #5732

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ async def configure(self, params):
self.config,
self.workflow_db_mgr,
self.task_events_mgr,
self.xtrigger_mgr,
self.data_store_mgr,
self.flow_mgr
)
Expand Down Expand Up @@ -1734,6 +1735,8 @@ async def main_loop(self) -> None:
self.pool.queue_task(itask)

if housekeep_xtriggers:
# Spawn parentless clock-triggered
self.pool.spawn_parentless_clock_triggered()
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Comment on lines 1737 to 1741
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right place to handle the spawning? The reason for the if block here is to "delete satisfied xtriggers no longer needed by any tasks" (from the xtrigger_mgr.housekeep() docstring).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that housekeep_xtriggers is satisfied if xtriggers are, so just piggy backed off that.. But could just use that spawns list.


Expand Down
26 changes: 25 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.flow_mgr import FlowMgr, FlowNums

Expand All @@ -98,6 +99,7 @@ def __init__(
config: 'WorkflowConfig',
workflow_db_mgr: 'WorkflowDatabaseManager',
task_events_mgr: 'TaskEventsManager',
xtrigger_mgr: 'XtriggerManager',
data_store_mgr: 'DataStoreMgr',
flow_mgr: 'FlowMgr'
) -> None:
Expand All @@ -108,6 +110,7 @@ def __init__(
self.task_events_mgr: 'TaskEventsManager' = task_events_mgr
# TODO this is ugly:
self.task_events_mgr.spawn_func = self.spawn_on_output
self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr

Expand Down Expand Up @@ -744,6 +747,7 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
return
if self.runahead_limit_point is None:
self.compute_runahead()
is_clock = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
ntask = self._get_spawned_or_merged_task(
Expand All @@ -752,10 +756,18 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
if ntask is not None:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)
if (
ntask.state.xtriggers
and set(ntask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.wall_clock_labels
)
):
is_clock = True
break
point = tdef.next_point(point)

# Once more (for the rh-limited task: don't rh release it!)
if point is not None and tdef.is_parentless(point):
if point is not None and tdef.is_parentless(point) and not is_clock:
ntask = self._get_spawned_or_merged_task(
point, tdef.name, flow_nums
)
Expand Down Expand Up @@ -1753,6 +1765,18 @@ def force_trigger_tasks(

return len(unmatched)

def spawn_parentless_clock_triggered(self):
"""Spawn successor(s) of parentless wall clock satisfied tasks."""
while self.xtrigger_mgr.wall_clock_spawns:
itask = self.xtrigger_mgr.wall_clock_spawns.pop()
# Will spawn out to RH limit or next parentless clock trigger
# or non-parentless.
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)

def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
"""Simulation mode: simulate task run times and set states."""
if not self.config.run_mode('simulation'):
Expand Down
13 changes: 11 additions & 2 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ def __init__(
# Signatures of active functions (waiting on callback).
self.active: list = []

# Gather named wall-clocks closet to current time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Gather named wall-clocks closet to current time.
# Gather named wall-clock triggers closest to current time.

# (no need to check or spawn future clocks of tasks respectively).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rephrase this comment? It doesn't seem to make much sense as worded!)

self.wall_clock_labels: set = set()
self.wall_clock_spawns: list = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this variable be named better? like clock_spawn_next or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will change.. maybe to broaden sequential_spawn_next


self.workflow_run_dir = workflow_run_dir

# For function arg templating.
Expand Down Expand Up @@ -316,6 +321,8 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
"""
self.validate_xtrigger(label, fctx, fdir)
self.functx_map[label] = fctx
if fctx.func_name == "wall_clock":
self.wall_clock_labels.add(label)

def mutate_trig(self, label, kwargs):
self.functx_map[label].func_kwargs.update(kwargs)
Expand Down Expand Up @@ -380,7 +387,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:

args = []
kwargs = {}
if ctx.func_name == "wall_clock":
if label in self.wall_clock_labels:
if "trigger_time" in ctx.func_kwargs:
# Internal (retry timer): trigger_time already set.
kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"]
Expand Down Expand Up @@ -420,10 +427,12 @@ def call_xtriggers_async(self, itask: TaskProxy):
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
if sig.startswith("wall_clock"):
if label in self.wall_clock_labels:
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
itask.state.xtriggers[label] = True
if itask.tdef.is_parentless(itask.point):
self.wall_clock_spawns.append(itask)
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
LOG.info('xtrigger satisfied: %s = %s', label, sig)
Expand Down