Skip to content

Commit

Permalink
Simplify and fix runahead computation. (cylc#5893)
Browse files Browse the repository at this point in the history
## Simplify and fix runahead computation.

* In back-compat mode the cycle point time zone is assumed to be local, whereas in normal mode it is assumed to be UTC. There was contamination of the point parse caching where the time zone would carry over from tests of back-compat vs normal mode

### Testing
* New runhead integration test.
* Reload: test reloading doesn't nudge the runahead limit
* We were using the pytest-env plugin to run the tests in a non-UTC time zone: The pytest-env plugin doesn't work with pytest-xdist so this was being ignored.
* Add both runahead formats to existing tests for compute runahead. Add compat mode and not compat mode versions of the future triggers bug test.
* Wrote a test to check for changes of runahead limit based on changing task statuses in compat mode.

---------

Co-authored-by: Oliver Sanders <[email protected]>
Co-authored-by: Ronnie Dutta <[email protected]>
Co-authored-by: Tim Pillinger <[email protected]>
  • Loading branch information
4 people authored Jan 9, 2024
1 parent dd41648 commit b64fcd6
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 185 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ jobs:
fail-fast: false # Don't let a failed MacOS run stop the Ubuntu runs
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.10', '3.11']
include:
# mac os test
- os: 'macos-11'
python-version: '3.7'
python-version: '3.7' # oldest supported version
# non-utc timezone test
- os: 'ubuntu-latest'
python-version: '3.9' # not the oldest, not the most recent version
time-zone: 'XXX-09:35'
env:
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes
steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions changes.d/5893.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug in computing a time interval-based runahead limit when future triggers are present.
16 changes: 0 additions & 16 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,6 @@ def get_stop_point(self):
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
16 changes: 13 additions & 3 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,22 @@ def _interval_parse(interval_string):

def point_parse(point_string: str) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object."""
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
return _point_parse(
point_string,
WorkflowSpecifics.DUMP_FORMAT,
WorkflowSpecifics.ASSUMED_TIME_ZONE
)


@lru_cache(10000)
def _point_parse(point_string, _dump_fmt):
"""Parse a point_string into a proper TimePoint object."""
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object.
Args:
point_string: The string to parse.
_dump_fmt: Dump format (only used to avoid invalid cache hits).
_tz: Cycle point time zone (only used to avoid invalid cache hits).
"""
if "%" in WorkflowSpecifics.DUMP_FORMAT:
# May be a custom not-quite ISO 8601 dump format.
with contextlib.suppress(IsodatetimeError):
Expand Down
118 changes: 45 additions & 73 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,82 +309,48 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""
"""
limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
base_point: Optional['PointBase'] = None

# First get the runahead base point.
if not self.main_pool:
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

# Find the earliest sequence point beyond the workflow start point.
base_point = min(
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
)
else:
# Find the earliest point with unfinished tasks.
# Find the earliest point with incomplete tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
# All n=0 tasks are incomplete by definition, but Cylc 7
# ignores failed ones (it does not ignore submit-failed!).
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
cylc.flow.flags.cylc7_back_compat and
all(
itask.state(TASK_STATUS_FAILED)
for itask in itasks
)
):
points.append(point)
continue
base_point = point
break

if not points:
return False
base_point = min(points)
if base_point is None:
return False

LOG.debug(f"Runahead: base point {base_point}")

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -401,8 +367,10 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
# Now generate all possible cycle points from the base point and stop
# at the runahead limit point. Note both cycle count and time interval
# limits involve all possible cycles, not just active cycles.
sequence_points: Set['PointBase'] = set()
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -412,44 +380,48 @@ def compute_runahead(self, force=False) -> bool:
sequence_points = self._prev_runahead_sequence_points
else:
# Recompute possible points.
sequence_points = set()
for sequence in self.config.sequences:
seq_point = sequence.get_next_point(base_point)
seq_point = sequence.get_first_point(base_point)
count = 1
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + ilimit:
# this point may be beyond the runahead limit
break
else:
# PT0H allows only the base cycle point to run.
if seq_point > base_point + limit:
# this point can not be beyond the runahead limit
break
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point

points = set(points).union(sequence_points)

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(ilimit + 1)][-1]
# (len(list) may be less than ilimit due to sequence end)
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
limit_point = max(sequence_points)

# Adjust for future offset and stop point, if necessary.
# Adjust for future offset and stop point.
pre_adj_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(
"Runahead (future trigger adjust):"
f" {pre_adj_limit} -> {limit_point}"
)
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.debug(f"Runahead limit: {limit_point}")
LOG.debug(
"Runahead (stop point adjust):"
f" {pre_adj_limit} -> {limit_point} (stop point)"
)

LOG.debug(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point
return True

Expand Down
5 changes: 1 addition & 4 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ testpaths =
cylc/flow/
tests/unit/
tests/integration/
env =
# a weird timezone to check that tests aren't assuming the local timezone
TZ=XXX-09:35
doctest_optionflags =
NORMALIZE_WHITESPACE
IGNORE_EXCEPTION_DETAIL
ELLIPSIS
asyncio_mode = auto
markers=
linkcheck: Test links
linkcheck: Test links
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
Loading

0 comments on commit b64fcd6

Please sign in to comment.