diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index a2d13fbe4c9..b4abfe737dc 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -18,7 +18,7 @@ import math import re -from typing import Iterable, List, TYPE_CHECKING +from typing import Iterable, Set, TYPE_CHECKING from cylc.flow.cycling.loader import get_point from cylc.flow.exceptions import TriggerExpressionError @@ -202,25 +202,25 @@ def _conditional_is_satisfied(self): '"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg)) return res - def satisfy_me(self, outputs: Iterable['Tokens']) -> List['Tokens']: + def satisfy_me(self, outputs: Iterable['Tokens']) -> Set[str]: """Attempt to satisfy me with given outputs. Updates cache with the result. - Return which outputs were not used. + Return outputs that match. """ - not_used = [] + valid = set() for output in outputs: prereq = output.to_prereq_tuple() if prereq not in self.satisfied: - not_used.append(output) continue + valid.add(output.relative_id_with_selectors) self.satisfied[prereq] = self.DEP_STATE_SATISFIED if self.conditional_expression is None: self._all_satisfied = all(self.satisfied.values()) else: self._all_satisfied = self._conditional_is_satisfied() - return not_used + return valid def api_dump(self): """Return list of populated Protobuf data objects.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index c5be53cab41..6a47f3a6d59 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -505,17 +505,17 @@ def state_reset( return False def satisfy_me(self, outputs: Iterable[str]) -> None: - """Attempt to satisfy my prerequisites with given outputs. + """Try to satisfy my prerequisites with given outputs. The output strings are of the form "cycle/task:message" + Log a warning for outputs that I don't depend on. """ tokens = [Tokens(p, relative=True) for p in outputs] - not_used = self.state.satisfy_me(tokens) - for output in not_used: + used = self.state.satisfy_me(tokens) + for output in set(outputs) - used: LOG.warning( - f"{self.identity} does not depend on" - f" {output.relative_id_with_selectors}" + f"{self.identity} does not depend on {output}" ) def clock_expire(self) -> bool: diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 3f1fea4198a..26b651fe613 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -17,7 +17,7 @@ """Task state related logic.""" -from typing import List, Iterable, TYPE_CHECKING +from typing import List, Iterable, Set, TYPE_CHECKING from cylc.flow.prerequisite import Prerequisite from cylc.flow.task_outputs import TaskOutputs from cylc.flow.wallclock import get_current_time_string @@ -313,18 +313,20 @@ def __call__( def satisfy_me( self, outputs: Iterable['Tokens'] - ) -> List['Tokens']: - """Attempt to get my prerequisites satisfied.""" - not_used: List['Tokens'] = [] - for prereqs in [self.prerequisites, self.suicide_prerequisites]: - for prereq in prereqs: - nope = prereq.satisfy_me(outputs) - if nope: - not_used += nope - continue - self._is_satisfied = None - self._suicide_is_satisfied = None - return not_used + ) -> Set[str]: + """Try to satisfy my prerequisites with given outputs. + + Return which outputs I actually depend on. + """ + valid: Set[str] = set() + for prereq in (*self.prerequisites, *self.suicide_prerequisites): + yep = prereq.satisfy_me(outputs) + if yep: + valid = valid.union(yep) + continue + self._is_satisfied = None + self._suicide_is_satisfied = None + return valid def xtriggers_all_satisfied(self): """Return True if all xtriggers are satisfied.""" diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index a453fd4a82e..3a6942750bd 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -21,7 +21,6 @@ Callable, Iterable, List, - Optional, Tuple, Union ) @@ -1463,3 +1462,56 @@ async def test_set_outputs_future( # try to set an invalid output schd.pool.set(["1/b"], ["shrub"], None, ['all']) assert log_filter(log, contains="output 1/b:shrub not found") + + +async def test_prereq_satisfaction( + flow, + scheduler, + start, + log_filter, +): + """Check manual setting of future task outputs. + + """ + id_ = flow( + { + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'graph': { + 'R1': "a:x & a:y => b" + } + }, + 'runtime': { + 'a': { + 'outputs': { + 'x': 'x', + 'y': 'y' + } + } + } + } + ) + schd = scheduler(id_) + async with start(schd) as log: + # it should start up with just 1/a + assert pool_get_task_ids(schd.pool) == ["1/a"] + # spawn b + schd.pool.set(["1/a"], ["x"], None, ['all']) + assert ( + pool_get_task_ids(schd.pool) == ["1/a", "1/b"] + ) + + b = schd.pool.get_task(IntegerPoint("1"), "b") + + assert not b.is_waiting_prereqs_done() + + # set valid and invalid prerequisites, check log. + b.satisfy_me(["1/a:x", "1/a:y", "1/a:z", "1/a:w"]) + assert log_filter(log, contains="1/b does not depend on 1/a:z") + assert log_filter(log, contains="1/b does not depend on 1/a:w") + assert not log_filter(log, contains="1/b does not depend on 1/a:x") + assert not log_filter(log, contains="1/b does not depend on 1/a:y") + + assert b.is_waiting_prereqs_done()