Skip to content

Commit

Permalink
Fix usage of task_outputs DB table
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jun 5, 2024
1 parent 6ecf0c8 commit 2a6b0ba
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def check_task_output(
self,
cycle: str,
task: str,
output: str,
output_msg: str,
flow_nums: 'FlowNums',
) -> Union[str, bool]:
"""Returns truthy if the specified output is satisfied in the DB."""
Expand All @@ -444,10 +444,20 @@ def check_task_output(
# loop through matching tasks
if flow_nums.intersection(task_flow_nums):
# this task is in the right flow
task_outputs = json.loads(task_outputs)
# BACK COMPAT: In Cylc >8.0.0,<8.3.0, only the task
# messages were stored in the DB as a list.
# from: 8.0.0
# to: 8.3.0
outputs: Union[
Dict[str, str], List[str]
] = json.loads(task_outputs)
messages = (
outputs.values() if isinstance(outputs, dict)
else outputs
)
return (
'satisfied from database'
if output in task_outputs
if output_msg in messages
else False
)
else:
Expand Down Expand Up @@ -539,14 +549,14 @@ def load_db_task_pool_for_restart(self, row_idx, row):

# Update prerequisite satisfaction status from DB
sat = {}
for prereq_name, prereq_cycle, prereq_output, satisfied in (
for prereq_name, prereq_cycle, prereq_output_msg, satisfied in (
self.workflow_db_mgr.pri_dao.select_task_prerequisites(
cycle, name, flow_nums,
)
):
# Prereq satisfaction as recorded in the DB.
sat[
(prereq_cycle, prereq_name, prereq_output)
(prereq_cycle, prereq_name, prereq_output_msg)
] = satisfied if satisfied != '0' else False

for itask_prereq in itask.state.prerequisites:
Expand All @@ -558,12 +568,12 @@ def load_db_task_pool_for_restart(self, row_idx, row):
# added to an already-spawned task before restart.
# Look through task outputs to see if is has been
# satisfied
prereq_cycle, prereq_task, prereq_output = key
prereq_cycle, prereq_task, prereq_output_msg = key
itask_prereq.satisfied[key] = (
self.check_task_output(
prereq_cycle,
prereq_task,
prereq_output,
prereq_output_msg,
itask.flow_nums,
)
)
Expand Down Expand Up @@ -1612,7 +1622,7 @@ def _get_task_history(

return never_spawned, submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
"""Load a task's historical outputs from the DB."""
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
Expand All @@ -1622,7 +1632,18 @@ def _load_historical_outputs(self, itask):
else:
for outputs_str, fnums in info.items():
if itask.flow_nums.intersection(fnums):
for msg in json.loads(outputs_str):
# BACK COMPAT: In Cylc >8.0.0,<8.3.0, only the task
# messages were stored in the DB as a list.
# from: 8.0.0
# to: 8.3.0
outputs: Union[
Dict[str, str], List[str]
] = json.loads(outputs_str)
messages = (
outputs.values() if isinstance(outputs, dict)
else outputs
)
for msg in messages:
itask.state.outputs.set_message_complete(msg)

def spawn_task(
Expand Down Expand Up @@ -1771,15 +1792,7 @@ def _get_task_proxy_db_outputs(
return None

# Update it with outputs that were already completed.
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
# (Note still need this if task not run before)
self.db_add_new_flow_rows(itask)
for outputs_str, fnums in info.items():
if flow_nums.intersection(fnums):
for msg in json.loads(outputs_str):
itask.state.outputs.set_message_complete(msg)
self._load_historical_outputs(itask)
return itask

def _standardise_prereqs(
Expand Down

0 comments on commit 2a6b0ba

Please sign in to comment.