From 45b24ccfa943561881716af50b490d8ec6ccce57 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:32:14 +0000 Subject: [PATCH 1/2] Make set trigger outputs in order Prevent warning messages caused by succeeded being triggered before custom outputs. --- cylc/flow/task_outputs.py | 17 +++++++++++++++ cylc/flow/task_pool.py | 1 + tests/integration/test_task_pool.py | 32 +++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 6c0bd8f8ea2..e7755ca5922 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -299,3 +299,20 @@ def msg_sort_key(item): except ValueError: ind = 999 return (ind, item[_MESSAGE] or '') + + @staticmethod + def output_sort_key(item): + """Compare by output order. + + Examples: + + >>> this = TaskOutputs.output_sort_key + >>> sorted(['finished', 'started', 'custom'], key=this) + ['started', 'custom', 'finished'] + """ + task_outputs = {o: i for i, o in enumerate(TASK_OUTPUTS)} + if item in TASK_OUTPUTS: + return task_outputs[item] + else: + # Sorts custom outputs after started. + return TASK_OUTPUTS.index(TASK_OUTPUT_STARTED) + .5 diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index dbe60fb7292..d9c418e7137 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1820,6 +1820,7 @@ def _set_outputs_itask( itask.point, itask.tdef, outputs) changed = False + outputs = sorted(outputs, key=itask.state.outputs.output_sort_key) for output in outputs: if itask.state.outputs.is_completed(output): LOG.info(f"output {itask.identity}:{output} completed already") diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index be1f335d45b..e076e6a4495 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1499,6 +1499,38 @@ async def test_set_outputs_live( ) +async def test_set_outputs_live2( + flow, + scheduler, + start, + log_filter, +): + """Assert that optional outputs are satisfied before completion + outputs to prevent incomplete task warnings. + """ + id_ = flow( + { + 'scheduler': {'allow implicit tasks': 'True'}, + 'scheduling': {'graph': { + 'R1': """ + foo:a => apple + foo:b => boat + """}}, + 'runtime': {'foo': {'outputs': { + 'a': 'xylophone', + 'b': 'yacht'}}} + } + ) + schd = scheduler(id_) + + async with start(schd) as log: + schd.pool.set_prereqs_and_outputs(["1/foo"], None, None, ['all']) + assert not log_filter( + log, + contains="did not complete required outputs: ['a', 'b']" + ) + + async def test_set_outputs_future( flow, scheduler, From a286483bbe7078ca0a75f473d39f65c6d127f4a7 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:51:32 +0000 Subject: [PATCH 2/2] Update cylc/flow/task_outputs.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_outputs.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index e7755ca5922..644b7b0dd3c 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -310,9 +310,7 @@ def output_sort_key(item): >>> sorted(['finished', 'started', 'custom'], key=this) ['started', 'custom', 'finished'] """ - task_outputs = {o: i for i, o in enumerate(TASK_OUTPUTS)} if item in TASK_OUTPUTS: - return task_outputs[item] - else: - # Sorts custom outputs after started. - return TASK_OUTPUTS.index(TASK_OUTPUT_STARTED) + .5 + return TASK_OUTPUTS.index(item) + # Sort custom outputs after started. + return TASK_OUTPUTS.index(TASK_OUTPUT_STARTED) + .5