Skip to content

Commit

Permalink
flow-nums data-store fix (cylc#6115)
Browse files Browse the repository at this point in the history
Replace flow nums for ghost tasks where necessary
  • Loading branch information
dwsutherland authored Jul 9, 2024
1 parent ae6a70d commit 29d26cf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
36 changes: 30 additions & 6 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,9 @@ def increment_graph_window(
source_tokens,
point,
flow_nums,
False,
itask
is_parent=False,
itask=itask,
replace_existing=True,
)

# Pre-populate from previous walks
Expand Down Expand Up @@ -1153,24 +1154,27 @@ def generate_ghost_task(
is_parent: bool = False,
itask: Optional['TaskProxy'] = None,
n_depth: int = 0,
replace_existing: bool = False,
) -> None:
"""Create task-point element populated with static data.
Args:
source_tokens
point
flow_nums
is_parent:
Used to determine whether to load DB state.
itask:
Update task-node from corresponding task proxy object.
is_parent: Used to determine whether to load DB state.
itask: Update task-node from corresponding task proxy object.
n_depth: n-window graph edge distance.
replace_existing: Replace any existing data for task as it may
be out of date (e.g. flow nums).
"""
tp_id = tokens.id
if (
tp_id in self.data[self.workflow_id][TASK_PROXIES]
or tp_id in self.added[TASK_PROXIES]
):
if replace_existing and itask is not None:
self.delta_from_task_proxy(itask)
return

name = tokens['task']
Expand Down Expand Up @@ -2525,6 +2529,26 @@ def delta_task_xtrigger(self, sig, satisfied):
xtrigger.time = update_time
self.updates_pending = True

def delta_from_task_proxy(self, itask: TaskProxy) -> None:
"""Create delta from existing pool task proxy.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
self._process_internal_task_proxy(itask, tp_delta)
self.updates_pending = True

# -----------
# Job Deltas
# -----------
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-show/06-past-present-future.t
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_NAME="${TEST_NAME_BASE}-show.present"
contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__
state: running
prerequisites: ('⨯': not satisfied)
1/b succeeded
1/b succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show.future"
Expand Down

0 comments on commit 29d26cf

Please sign in to comment.