Skip to content

Commit

Permalink
Merge branch 'cylc-set-task' into cylc-set-task.fix.bad-pre
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver authored Mar 10, 2024
2 parents 3dfb29f + 59c92b6 commit aa2c91a
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 162 deletions.
7 changes: 3 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ class CylcWorkflowDAO:
["status"],
["flow_wait", {"datatype": "INTEGER"}],
["is_manual_submit", {"datatype": "INTEGER"}],
["is_complete", {"datatype": "INTEGER"}]
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -796,19 +795,19 @@ def select_prev_instances(
# Not an injection, simply putting the table name in the SQL query
# expression as a string constant local to this module.
stmt = ( # nosec
r"SELECT flow_nums,submit_num,flow_wait,is_complete FROM %(name)s"
r"SELECT flow_nums,submit_num,flow_wait,status FROM %(name)s"
r" WHERE name==? AND cycle==?"
) % {"name": self.TABLE_TASK_STATES}
ret = []
for flow_nums_str, submit_num, flow_wait, is_complete in (
for flow_nums_str, submit_num, flow_wait, status in (
self.connect().execute(stmt, (name, point,))
):
ret.append(
(
submit_num,
flow_wait == 1,
deserialise(flow_nums_str),
is_complete
status
)
)
return ret
Expand Down
203 changes: 94 additions & 109 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def db_add_new_flow_rows(self, itask: TaskProxy) -> None:

def add_to_pool(self, itask) -> None:
"""Add a task to the pool."""

self.active_tasks.setdefault(itask.point, {})
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
Expand Down Expand Up @@ -1480,36 +1481,36 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

return True

def _get_task_history(self, name, point, flow_nums):
"""Get history of previous submits for this task.
def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""

"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num = max(s[0] for s in info)
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
submit_num = 0

prev_completed = 0 # did not complete in the flow
prev_flow_wait = False # did not wait in the flow
prev_status = TASK_STATUS_WAITING
prev_flow_wait = False

for _snum, f_wait, old_fnums, is_complete in info:
# is_complete: 0: False, 1: True, 2: unknown (8.2 back compat)
for _snum, f_wait, old_fnums, status in info:
if set.intersection(flow_nums, old_fnums):
# matching flows
prev_completed = is_complete
prev_status = status
prev_flow_wait = f_wait
if prev_completed:
if prev_status in TASK_STATUSES_FINAL:
# task finished
break

# Else continue: there may be multiple entries with flow
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the complete one, if any.
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_completed, prev_flow_wait
return submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand All @@ -1532,75 +1533,57 @@ def spawn_task(
force: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Spawn a task if not already completed for this flow, or if forced.
"""Return task proxy if not completed in this flow, or if forced.
If finished previously with flow wait, just try to spawn children.
The creates the task proxy but does not add it to the pool.
Note finished tasks may be incomplete, but we don't automatically
re-run incomplete tasks in the same flow.
If completed previously with flow wait, just try to spawn children.
For every task spawned, we need a DB lookup for submit number,
and flow-wait.
"""
if not self.can_be_spawned(name, point):
return None

submit_num, prev_completion, prev_flow_wait = (
submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if prev_completion == 2:
# BACK COMPAT - completion not recorded before 8.3.0
# This code block is for a very niche case: it's only
# used if a flow-wait task is encountered after restarting
# an 8.2 workflow with 8.3.

itask = self._get_task_proxy(
point,
self.config.get_taskdef(name),
flow_nums,
submit_num=submit_num,
flow_wait=flow_wait,
transient=True
)
if not itask:
return None

# update completed outputs from the DB
self._load_historical_outputs(itask)

prev_completed = itask.is_complete()
else:
prev_completed = prev_completion == 1

# If previously completed and children spawned there is nothing
# to do, unless forced.
if (
prev_completed and not prev_flow_wait
and not force
):
LOG.warning(
f"({point}/{name} already completed"
f" in {stringify_flow_nums(flow_nums, full=True)})"
)
return None

# If previously completed we just create a transient task proxy to use
# for spawning children, else (or if forced) run it again.
if force:
transient = False
else:
transient = prev_completed

itask = self._get_task_proxy(
itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
flow_nums,
status=prev_status,
submit_num=submit_num,
flow_wait=flow_wait,
transient=transient
)
if not itask:
if itask is None:
return None

if not transient:
if prev_status in TASK_STATUSES_FINAL:
# Task finished previously.
msg = f"[{point}/{name}:{prev_status}] already finished"
if itask.is_complete():
msg += " and completed"
itask.transient = True
else:
# revive as incomplete.
msg += " incomplete"

LOG.info(
f"{msg} {stringify_flow_nums(flow_nums, full=True)})"
)
if prev_flow_wait:
self._spawn_after_flow_wait(itask)

if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
self.hold_active_task(itask)
Expand Down Expand Up @@ -1635,39 +1618,30 @@ def spawn_task(
for cycle, task, output in self.abs_outputs_done
])

if prev_flow_wait and prev_completed:
self._spawn_after_flow_wait(itask)
LOG.warning(
f"{itask.identity} already completed for flow"
f" {stringify_flow_nums(flow_nums, full=True)}"
)
return None

self.db_add_new_flow_rows(itask)
return itask

def _spawn_after_flow_wait(self, itask: TaskProxy) -> None:
LOG.info(
f"spawning children of {itask.identity} after flow wait"
)
LOG.info(f"[{itask}] spawning outputs after flow-wait")
self.spawn_on_all_outputs(itask, completed_only=True)
# update flow wait status in the DB
itask.flow_wait = False
# itask.flow_nums = orig_fnums
self.workflow_db_mgr.put_update_task_flow_wait(itask)
return None

def _get_task_proxy(
def _get_task_proxy_db_outputs(
self,
point: 'PointBase',
taskdef: 'TaskDef',
flow_nums: 'FlowNums',
status: str = TASK_STATUS_WAITING,
flow_wait: bool = False,
transient: bool = False,
is_manual_submit: bool = False,
submit_num: int = 0
submit_num: int = 0,
) -> Optional['TaskProxy']:
"""Spawn a task proxy and update its outputs from the DB. """
"""Spawn a task, update outputs from DB."""

if not self.can_be_spawned(taskdef.name, point):
return None
Expand All @@ -1677,34 +1651,36 @@ def _get_task_proxy(
taskdef,
point,
flow_nums,
status=status,
flow_wait=flow_wait,
submit_num=submit_num,
transient=transient,
is_manual_submit=is_manual_submit
)
if itask is None:
return None

if itask is not None:
# Update it with outputs that were already completed.
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
self.db_add_new_flow_rows(itask)
spawn_kids = False
for outputs_str, fnums in info.items():
if flow_nums.intersection(fnums):
if itask.flow_wait:
spawn_kids = True
for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)
# Update it with outputs that were already completed.
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
# (Note still need this if task not run before)
self.db_add_new_flow_rows(itask)
for outputs_str, fnums in info.items():
if flow_nums.intersection(fnums):
for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)
return itask

if spawn_kids:
self._spawn_after_flow_wait(itask)
def _standardise_prereqs(
self, prereqs: 'List[str]'
) -> 'Dict[Tokens, str]':
"""Convert prerequisites to a map of task messages: outputs.
return itask
(So satsify_me logs failures)
def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
"""Convert prerequisites to task output messages."""
_prereqs = []
"""
_prereqs = {}
for prereq in prereqs:
pre = Tokens(prereq, relative=True)
# add implicit "succeeded"; convert "succeed" to "succeeded" etc.
Expand All @@ -1728,12 +1704,12 @@ def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
# The CP from --pre is invalid:
LOG.warning(f'Invalid pre cycle point set:\n {exc.args[0]}')
else:
_prereqs.append(
pre.duplicate(
task_sel=msg,
cycle=cycle,
)
_prereqs[
pre.duplicate(
task_sel=msg,
cycle=standardise_point_string(pre['cycle'])
)
] = prereq
return _prereqs

def _standardise_outputs(
Expand Down Expand Up @@ -1825,8 +1801,10 @@ def set_prereqs_and_outputs(
self._set_prereqs_tdef(
point, tdef, prereqs, flow_nums, flow_wait)
else:
trans = self._get_task_proxy(
point, tdef, flow_nums, flow_wait, transient=True)
trans = self._get_task_proxy_db_outputs(
point, tdef, flow_nums,
flow_wait=flow_wait, transient=True
)
if trans is not None:
self._set_outputs_itask(trans, outputs)

Expand Down Expand Up @@ -1875,11 +1853,18 @@ def _set_prereqs_itask(
if prereqs == ["all"]:
itask.state.set_all_satisfied()
else:
if not itask.satisfy_me(
self._standardise_prereqs(prereqs)
):
# Attempt to set the given presrequisites.
# Log any that aren't valid for the task.
presus = self._standardise_prereqs(prereqs)
unmatched = itask.satisfy_me(list(presus.keys()))
for task_msg in unmatched:
LOG.warning(
f"{itask.identity} does not depend on"
f' "{presus[task_msg]}"'
)
if len(unmatched) == len(prereqs):
# No prereqs matched.
return False

if (
self.runahead_limit_point is not None
and itask.point <= self.runahead_limit_point
Expand Down Expand Up @@ -2023,7 +2008,7 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = self._get_task_history(
submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)

itask = TaskProxy(
Expand Down
Loading

0 comments on commit aa2c91a

Please sign in to comment.