Skip to content

Commit

Permalink
Fix wrong-outputs warning.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 17, 2023
1 parent 06fbdea commit 3fddad0
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 25 deletions.
12 changes: 6 additions & 6 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import math
import re
from typing import Iterable, List, TYPE_CHECKING
from typing import Iterable, Set, TYPE_CHECKING

from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import TriggerExpressionError
Expand Down Expand Up @@ -202,25 +202,25 @@ def _conditional_is_satisfied(self):
'"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg))
return res

def satisfy_me(self, outputs: Iterable['Tokens']) -> List['Tokens']:
def satisfy_me(self, outputs: Iterable['Tokens']) -> Set[str]:
"""Attempt to satisfy me with given outputs.
Updates cache with the result.
Return which outputs were not used.
Return outputs that match.
"""
not_used = []
valid = set()
for output in outputs:
prereq = output.to_prereq_tuple()
if prereq not in self.satisfied:
not_used.append(output)
continue
valid.add(output.relative_id_with_selectors)
self.satisfied[prereq] = self.DEP_STATE_SATISFIED
if self.conditional_expression is None:
self._all_satisfied = all(self.satisfied.values())
else:
self._all_satisfied = self._conditional_is_satisfied()
return not_used
return valid

def api_dump(self):
"""Return list of populated Protobuf data objects."""
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,17 +505,17 @@ def state_reset(
return False

def satisfy_me(self, outputs: Iterable[str]) -> None:
"""Attempt to satisfy my prerequisites with given outputs.
"""Try to satisfy my prerequisites with given outputs.
The output strings are of the form "cycle/task:message"
Log a warning for outputs that I don't depend on.
"""
tokens = [Tokens(p, relative=True) for p in outputs]
not_used = self.state.satisfy_me(tokens)
for output in not_used:
used = self.state.satisfy_me(tokens)
for output in set(outputs) - used:
LOG.warning(
f"{self.identity} does not depend on"
f" {output.relative_id_with_selectors}"
f"{self.identity} does not depend on {output}"
)

def clock_expire(self) -> bool:
Expand Down
28 changes: 15 additions & 13 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Task state related logic."""


from typing import List, Iterable, TYPE_CHECKING
from typing import List, Iterable, Set, TYPE_CHECKING
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.task_outputs import TaskOutputs
from cylc.flow.wallclock import get_current_time_string
Expand Down Expand Up @@ -313,18 +313,20 @@ def __call__(
def satisfy_me(
self,
outputs: Iterable['Tokens']
) -> List['Tokens']:
"""Attempt to get my prerequisites satisfied."""
not_used: List['Tokens'] = []
for prereqs in [self.prerequisites, self.suicide_prerequisites]:
for prereq in prereqs:
nope = prereq.satisfy_me(outputs)
if nope:
not_used += nope
continue
self._is_satisfied = None
self._suicide_is_satisfied = None
return not_used
) -> Set[str]:
"""Try to satisfy my prerequisites with given outputs.
Return which outputs I actually depend on.
"""
valid: Set[str] = set()
for prereq in (*self.prerequisites, *self.suicide_prerequisites):
yep = prereq.satisfy_me(outputs)
if yep:
valid = valid.union(yep)
continue
self._is_satisfied = None
self._suicide_is_satisfied = None
return valid

def xtriggers_all_satisfied(self):
"""Return True if all xtriggers are satisfied."""
Expand Down
54 changes: 53 additions & 1 deletion tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
Callable,
Iterable,
List,
Optional,
Tuple,
Union
)
Expand Down Expand Up @@ -1463,3 +1462,56 @@ async def test_set_outputs_future(
# try to set an invalid output
schd.pool.set(["1/b"], ["shrub"], None, ['all'])
assert log_filter(log, contains="output 1/b:shrub not found")


async def test_prereq_satisfaction(
flow,
scheduler,
start,
log_filter,
):
"""Check manual setting of future task outputs.
"""
id_ = flow(
{
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'graph': {
'R1': "a:x & a:y => b"
}
},
'runtime': {
'a': {
'outputs': {
'x': 'x',
'y': 'y'
}
}
}
}
)
schd = scheduler(id_)
async with start(schd) as log:
# it should start up with just 1/a
assert pool_get_task_ids(schd.pool) == ["1/a"]
# spawn b
schd.pool.set(["1/a"], ["x"], None, ['all'])
assert (
pool_get_task_ids(schd.pool) == ["1/a", "1/b"]
)

b = schd.pool.get_task(IntegerPoint("1"), "b")

assert not b.is_waiting_prereqs_done()

# set valid and invalid prerequisites, check log.
b.satisfy_me(["1/a:x", "1/a:y", "1/a:z", "1/a:w"])
assert log_filter(log, contains="1/b does not depend on 1/a:z")
assert log_filter(log, contains="1/b does not depend on 1/a:w")
assert not log_filter(log, contains="1/b does not depend on 1/a:x")
assert not log_filter(log, contains="1/b does not depend on 1/a:y")

assert b.is_waiting_prereqs_done()

0 comments on commit 3fddad0

Please sign in to comment.