From 29d26cf492c4db4c13242b845e6b1c3543a9aa08 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 9 Jul 2024 23:39:49 +1200 Subject: [PATCH] flow-nums data-store fix (#6115) Replace flow nums for ghost tasks where necessary --- cylc/flow/data_store_mgr.py | 36 +++++++++++++++---- .../cylc-show/06-past-present-future.t | 2 +- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 0befc1b4dad..46279cfb2bf 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -792,8 +792,9 @@ def increment_graph_window( source_tokens, point, flow_nums, - False, - itask + is_parent=False, + itask=itask, + replace_existing=True, ) # Pre-populate from previous walks @@ -1153,6 +1154,7 @@ def generate_ghost_task( is_parent: bool = False, itask: Optional['TaskProxy'] = None, n_depth: int = 0, + replace_existing: bool = False, ) -> None: """Create task-point element populated with static data. @@ -1160,17 +1162,19 @@ def generate_ghost_task( source_tokens point flow_nums - is_parent: - Used to determine whether to load DB state. - itask: - Update task-node from corresponding task proxy object. + is_parent: Used to determine whether to load DB state. + itask: Update task-node from corresponding task proxy object. n_depth: n-window graph edge distance. + replace_existing: Replace any existing data for task as it may + be out of date (e.g. flow nums). """ tp_id = tokens.id if ( tp_id in self.data[self.workflow_id][TASK_PROXIES] or tp_id in self.added[TASK_PROXIES] ): + if replace_existing and itask is not None: + self.delta_from_task_proxy(itask) return name = tokens['task'] @@ -2525,6 +2529,26 @@ def delta_task_xtrigger(self, sig, satisfied): xtrigger.time = update_time self.updates_pending = True + def delta_from_task_proxy(self, itask: TaskProxy) -> None: + """Create delta from existing pool task proxy. + + Args: + itask (cylc.flow.task_proxy.TaskProxy): + Update task-node from corresponding task proxy + objects from the workflow task pool. + + """ + tproxy: Optional[PbTaskProxy] + tp_id, tproxy = self.store_node_fetcher(itask.tokens) + if not tproxy: + return + update_time = time() + tp_delta = self.updated[TASK_PROXIES].setdefault( + tp_id, PbTaskProxy(id=tp_id)) + tp_delta.stamp = f'{tp_id}@{update_time}' + self._process_internal_task_proxy(itask, tp_delta) + self.updates_pending = True + # ----------- # Job Deltas # ----------- diff --git a/tests/functional/cylc-show/06-past-present-future.t b/tests/functional/cylc-show/06-past-present-future.t index 7ff9762212d..a67636bc613 100644 --- a/tests/functional/cylc-show/06-past-present-future.t +++ b/tests/functional/cylc-show/06-past-present-future.t @@ -46,7 +46,7 @@ TEST_NAME="${TEST_NAME_BASE}-show.present" contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__ state: running prerequisites: ('⨯': not satisfied) - ✓ 1/b succeeded + ⨯ 1/b succeeded __END__ TEST_NAME="${TEST_NAME_BASE}-show.future"