diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..2cf8e40d671 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -467,6 +467,7 @@ async def configure(self, params): self.config, self.workflow_db_mgr, self.task_events_mgr, + self.xtrigger_mgr, self.data_store_mgr, self.flow_mgr ) @@ -1734,6 +1735,8 @@ async def main_loop(self) -> None: self.pool.queue_task(itask) if housekeep_xtriggers: + # Spawn parentless clock-triggered + self.pool.spawn_parentless_clock_triggered() # (Could do this periodically?) self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 92905becf1e..082714bb7aa 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -79,6 +79,7 @@ from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager + from cylc.flow.xtrigger_mgr import XtriggerManager from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.flow_mgr import FlowMgr, FlowNums @@ -98,6 +99,7 @@ def __init__( config: 'WorkflowConfig', workflow_db_mgr: 'WorkflowDatabaseManager', task_events_mgr: 'TaskEventsManager', + xtrigger_mgr: 'XtriggerManager', data_store_mgr: 'DataStoreMgr', flow_mgr: 'FlowMgr' ) -> None: @@ -108,6 +110,7 @@ def __init__( self.task_events_mgr: 'TaskEventsManager' = task_events_mgr # TODO this is ugly: self.task_events_mgr.spawn_func = self.spawn_on_output + self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr self.data_store_mgr: 'DataStoreMgr' = data_store_mgr self.flow_mgr: 'FlowMgr' = flow_mgr @@ -744,6 +747,7 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: return if self.runahead_limit_point is None: self.compute_runahead() + is_clock = False while point is not None and (point <= self.runahead_limit_point): if tdef.is_parentless(point): ntask = self._get_spawned_or_merged_task( @@ -752,10 +756,18 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: if ntask is not None: self.add_to_pool(ntask) self.rh_release_and_queue(ntask) + if ( + ntask.state.xtriggers + and set(ntask.state.xtriggers.keys()).intersection( + self.xtrigger_mgr.wall_clock_labels + ) + ): + is_clock = True + break point = tdef.next_point(point) # Once more (for the rh-limited task: don't rh release it!) - if point is not None and tdef.is_parentless(point): + if point is not None and tdef.is_parentless(point) and not is_clock: ntask = self._get_spawned_or_merged_task( point, tdef.name, flow_nums ) @@ -1753,6 +1765,18 @@ def force_trigger_tasks( return len(unmatched) + def spawn_parentless_clock_triggered(self): + """Spawn successor(s) of parentless wall clock satisfied tasks.""" + while self.xtrigger_mgr.wall_clock_spawns: + itask = self.xtrigger_mgr.wall_clock_spawns.pop() + # Will spawn out to RH limit or next parentless clock trigger + # or non-parentless. + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: """Simulation mode: simulate task run times and set states.""" if not self.config.run_mode('simulation'): diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..1d1482d7317 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -211,6 +211,11 @@ def __init__( # Signatures of active functions (waiting on callback). self.active: list = [] + # Gather named wall-clocks closet to current time. + # (no need to check or spawn future clocks of tasks respectively). + self.wall_clock_labels: set = set() + self.wall_clock_spawns: list = [] + self.workflow_run_dir = workflow_run_dir # For function arg templating. @@ -316,6 +321,8 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: """ self.validate_xtrigger(label, fctx, fdir) self.functx_map[label] = fctx + if fctx.func_name == "wall_clock": + self.wall_clock_labels.add(label) def mutate_trig(self, label, kwargs): self.functx_map[label].func_kwargs.update(kwargs) @@ -380,7 +387,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: args = [] kwargs = {} - if ctx.func_name == "wall_clock": + if label in self.wall_clock_labels: if "trigger_time" in ctx.func_kwargs: # Internal (retry timer): trigger_time already set. kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"] @@ -420,10 +427,12 @@ def call_xtriggers_async(self, itask: TaskProxy): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): - if sig.startswith("wall_clock"): + if label in self.wall_clock_labels: # Special case: quick synchronous clock check. if wall_clock(*ctx.func_args, **ctx.func_kwargs): itask.state.xtriggers[label] = True + if itask.tdef.is_parentless(itask.point): + self.wall_clock_spawns.append(itask) self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) LOG.info('xtrigger satisfied: %s = %s', label, sig)