From b64fcd6f5f35238c8b335cf6bcb8c04deddb37da Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 10 Jan 2024 00:05:45 +1300 Subject: [PATCH] Simplify and fix runahead computation. (#5893) ## Simplify and fix runahead computation. * In back-compat mode the cycle point time zone is assumed to be local, whereas in normal mode it is assumed to be UTC. There was contamination of the point parse caching where the time zone would carry over from tests of back-compat vs normal mode ### Testing * New runhead integration test. * Reload: test reloading doesn't nudge the runahead limit * We were using the pytest-env plugin to run the tests in a non-UTC time zone: The pytest-env plugin doesn't work with pytest-xdist so this was being ignored. * Add both runahead formats to existing tests for compute runahead. Add compat mode and not compat mode versions of the future triggers bug test. * Wrote a test to check for changes of runahead limit based on changing task statuses in compat mode. --------- Co-authored-by: Oliver Sanders Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Co-authored-by: Tim Pillinger <26465611+wxtim@users.noreply.github.com> --- .github/workflows/test_fast.yml | 10 +- changes.d/5893.fix | 1 + cylc/flow/cycling/__init__.py | 16 -- cylc/flow/cycling/iso8601.py | 16 +- cylc/flow/task_pool.py | 118 ++++++--------- pytest.ini | 5 +- setup.cfg | 1 - tests/integration/test_task_pool.py | 221 +++++++++++++++++++++++++++- tests/unit/cycling/test_cycling.py | 83 ----------- 9 files changed, 286 insertions(+), 185 deletions(-) create mode 100644 changes.d/5893.fix diff --git a/.github/workflows/test_fast.yml b/.github/workflows/test_fast.yml index 31bda3578e4..53d6c475bc3 100644 --- a/.github/workflows/test_fast.yml +++ b/.github/workflows/test_fast.yml @@ -20,11 +20,17 @@ jobs: fail-fast: false # Don't let a failed MacOS run stop the Ubuntu runs matrix: os: ['ubuntu-latest'] - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python-version: ['3.7', '3.8', '3.10', '3.11'] include: + # mac os test - os: 'macos-11' - python-version: '3.7' + python-version: '3.7' # oldest supported version + # non-utc timezone test + - os: 'ubuntu-latest' + python-version: '3.9' # not the oldest, not the most recent version + time-zone: 'XXX-09:35' env: + TZ: ${{ matrix.time-zone }} PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes steps: - name: Checkout diff --git a/changes.d/5893.fix b/changes.d/5893.fix new file mode 100644 index 00000000000..504cd6a649e --- /dev/null +++ b/changes.d/5893.fix @@ -0,0 +1 @@ +Fixed bug in computing a time interval-based runahead limit when future triggers are present. diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index e3d8a2fe64a..c47d242b5c5 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -415,22 +415,6 @@ def get_stop_point(self): """Return the last point of this sequence, or None if unbounded.""" pass - def get_first_n_points(self, n, point=None): - """Return a list of first n points of this sequence.""" - if point is None: - p1 = self.get_start_point() - else: - p1 = self.get_first_point(point) - if p1 is None: - return [] - result = [p1] - for _ in range(1, n): - p1 = self.get_next_point_on_sequence(p1) - if p1 is None: - break - result.append(p1) - return result - @abstractmethod def __eq__(self, other) -> bool: # Return True if other (sequence) is equal to self. diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index 3c8bd3955fa..655c06c2881 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -943,12 +943,22 @@ def _interval_parse(interval_string): def point_parse(point_string: str) -> 'TimePoint': """Parse a point_string into a proper TimePoint object.""" - return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT) + return _point_parse( + point_string, + WorkflowSpecifics.DUMP_FORMAT, + WorkflowSpecifics.ASSUMED_TIME_ZONE + ) @lru_cache(10000) -def _point_parse(point_string, _dump_fmt): - """Parse a point_string into a proper TimePoint object.""" +def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint': + """Parse a point_string into a proper TimePoint object. + + Args: + point_string: The string to parse. + _dump_fmt: Dump format (only used to avoid invalid cache hits). + _tz: Cycle point time zone (only used to avoid invalid cache hits). + """ if "%" in WorkflowSpecifics.DUMP_FORMAT: # May be a custom not-quite ISO 8601 dump format. with contextlib.suppress(IsodatetimeError): diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 6ddc62d39f2..2885bacba7b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -309,8 +309,8 @@ def compute_runahead(self, force=False) -> bool: With force=True we recompute the limit even if the base point has not changed (needed if max_future_offset changed, or on reload). - """ + """ limit = self.config.runahead_limit # e.g. P2 or P2D count_cycles = False with suppress(TypeError): @@ -318,73 +318,39 @@ def compute_runahead(self, force=False) -> bool: ilimit = int(limit) # type: ignore count_cycles = True - base_point: 'PointBase' - points: List['PointBase'] = [] + base_point: Optional['PointBase'] = None + # First get the runahead base point. if not self.main_pool: - # No tasks yet, just consider sequence points. - if count_cycles: - # Get the first ilimit points in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for plist in [ - seq.get_first_n_points( - ilimit, self.config.start_point) - for seq in self.config.sequences - ] - for point in plist - ] - # Drop points beyond the limit. - points = sorted(points)[:ilimit + 1] - base_point = min(points) - - else: - # Start at first point in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for point in { - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - } - if point is not None - ] - base_point = min(points) - # Drop points beyond the limit. - points = [ - point - for point in points - if point <= base_point + limit - ] - + # Find the earliest sequence point beyond the workflow start point. + base_point = min( + point + for point in { + seq.get_first_point(self.config.start_point) + for seq in self.config.sequences + } + if point is not None + ) else: - # Find the earliest point with unfinished tasks. + # Find the earliest point with incomplete tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): + # All n=0 tasks are incomplete by definition, but Cylc 7 + # ignores failed ones (it does not ignore submit-failed!). if ( - points # got the limit already so this point too - or any( - not itask.state( - TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_EXPIRED - ) - or ( - # For Cylc 7 back-compat, ignore incomplete tasks. - # (Success is required in back-compat mode, so - # failedtasks end up as incomplete; and Cylc 7 - # ignores failed tasks in computing the limit). - itask.state.outputs.is_incomplete() - and not cylc.flow.flags.cylc7_back_compat - ) + cylc.flow.flags.cylc7_back_compat and + all( + itask.state(TASK_STATUS_FAILED) for itask in itasks ) ): - points.append(point) + continue + base_point = point + break - if not points: - return False - base_point = min(points) + if base_point is None: + return False + + LOG.debug(f"Runahead: base point {base_point}") if self._prev_runahead_base_point is None: self._prev_runahead_base_point = base_point @@ -401,8 +367,10 @@ def compute_runahead(self, force=False) -> bool: # change or the runahead limit is already at stop point. return False - # Get all cycle points possible after the base point. - sequence_points: Set['PointBase'] + # Now generate all possible cycle points from the base point and stop + # at the runahead limit point. Note both cycle count and time interval + # limits involve all possible cycles, not just active cycles. + sequence_points: Set['PointBase'] = set() if ( not force and self._prev_runahead_sequence_points @@ -412,18 +380,19 @@ def compute_runahead(self, force=False) -> bool: sequence_points = self._prev_runahead_sequence_points else: # Recompute possible points. - sequence_points = set() for sequence in self.config.sequences: - seq_point = sequence.get_next_point(base_point) + seq_point = sequence.get_first_point(base_point) count = 1 while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. if count > 1 + ilimit: + # this point may be beyond the runahead limit break else: # PT0H allows only the base cycle point to run. if seq_point > base_point + limit: + # this point can not be beyond the runahead limit break count += 1 sequence_points.add(seq_point) @@ -431,25 +400,28 @@ def compute_runahead(self, force=False) -> bool: self._prev_runahead_sequence_points = sequence_points self._prev_runahead_base_point = base_point - points = set(points).union(sequence_points) - if count_cycles: - # Some sequences may have different intervals. - limit_point = sorted(points)[:(ilimit + 1)][-1] + # (len(list) may be less than ilimit due to sequence end) + limit_point = sorted(sequence_points)[:ilimit + 1][-1] else: - # We already stopped at the runahead limit. - limit_point = sorted(points)[-1] + limit_point = max(sequence_points) - # Adjust for future offset and stop point, if necessary. + # Adjust for future offset and stop point. pre_adj_limit = limit_point if self.max_future_offset is not None: limit_point += self.max_future_offset - LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)") + LOG.debug( + "Runahead (future trigger adjust):" + f" {pre_adj_limit} -> {limit_point}" + ) if self.stop_point and limit_point > self.stop_point: limit_point = self.stop_point - LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)") - LOG.debug(f"Runahead limit: {limit_point}") + LOG.debug( + "Runahead (stop point adjust):" + f" {pre_adj_limit} -> {limit_point} (stop point)" + ) + LOG.debug(f"Runahead limit: {limit_point}") self.runahead_limit_point = limit_point return True diff --git a/pytest.ini b/pytest.ini index 81df3785cec..9be86cb507c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -32,13 +32,10 @@ testpaths = cylc/flow/ tests/unit/ tests/integration/ -env = - # a weird timezone to check that tests aren't assuming the local timezone - TZ=XXX-09:35 doctest_optionflags = NORMALIZE_WHITESPACE IGNORE_EXCEPTION_DETAIL ELLIPSIS asyncio_mode = auto markers= - linkcheck: Test links \ No newline at end of file + linkcheck: Test links diff --git a/setup.cfg b/setup.cfg index 25479541eaf..67ed7ac71c3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -119,7 +119,6 @@ tests = pytest-asyncio>=0.17,!=0.23.* pytest-cov>=2.8.0 pytest-xdist>=2 - pytest-env>=0.6.2 pytest>=6 testfixtures>=6.11.0 towncrier>=23 diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 97526088660..18057a86339 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -38,6 +38,7 @@ TASK_STATUS_FAILED, TASK_STATUS_EXPIRED, TASK_STATUS_SUBMIT_FAILED, + TASK_STATUSES_ALL, ) # NOTE: foo and bar have no parents so at start-up (even with the workflow @@ -1008,7 +1009,8 @@ async def test_runahead_limit_for_sequence_before_start_cycle( ): """It should obey the runahead limit. - Ensure the runahead limit is computed correctly for sequences before the start cycle + Ensure the runahead limit is computed correctly for sequences that begin + before the start cycle. See https://github.com/cylc/cylc-flow/issues/5603 """ @@ -1154,11 +1156,12 @@ async def test_no_flow_tasks_dont_spawn( for itask in pool ] == pool + async def test_task_proxy_remove_from_queues( flow, one_conf, scheduler, start, ): """TaskPool.remove should delete task proxies from queues. - + See https://github.com/cylc/cylc-flow/pull/5573 """ # Set up a scheduler with a non-default queue: @@ -1243,6 +1246,218 @@ async def test_detect_incomplete_tasks( # ensure that it is correctly identified as incomplete assert itask.state.outputs.get_incomplete() assert itask.state.outputs.is_incomplete() - assert log_filter(log, contains=f"[{itask}] did not complete required outputs:") + assert log_filter( + log, contains=f"[{itask}] did not complete required outputs:") # the task should not have been removed assert itask in schd.pool.get_tasks() + + +@pytest.mark.parametrize('compat_mode', ['compat-mode', 'normal-mode']) +@pytest.mark.parametrize('cycling_mode', ['integer', 'datetime']) +@pytest.mark.parametrize('runahead_format', ['P3Y', 'P3']) +async def test_compute_runahead( + cycling_mode, + compat_mode, + runahead_format, + flow, + scheduler, + start, + monkeypatch, +): + """Test the calculation of the runahead limit. + + This test ensures that: + * Runahead tasks are excluded from computations + see https://github.com/cylc/cylc-flow/issues/5825 + * Tasks are initiated with the correct is_runahead status on startup. + * Behaviour in compat/regular modes is same unless failed tasks are present + * Behaviour is the same for integer/datetime cycling modes. + + """ + if cycling_mode == 'integer': + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'runahead limit': 'P3', + 'graph': { + 'P1': 'a' + }, + } + } + point = lambda point: IntegerPoint(str(int(point))) + else: + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '0001', + 'runahead limit': runahead_format, + 'graph': { + 'P1Y': 'a' + }, + } + } + point = ISO8601Point + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + compat_mode == 'compat-mode', + ) + + id_ = flow(config) + schd = scheduler(id_) + async with start(schd): + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 + + # ensure task states are initiated with is_runahead status + assert schd.pool.get_task(point('0001'), 'a').state(is_runahead=False) + assert schd.pool.get_task(point('0005'), 'a').state(is_runahead=True) + + # mark the first three cycles as running + for cycle in range(1, 4): + schd.pool.get_task(point(f'{cycle:04}'), 'a').state.reset( + TASK_STATUS_RUNNING + ) + + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # In Cylc 8 all incomplete tasks hold back runahead. + + # In Cylc 7, submit-failed tasks hold back runahead.. + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_SUBMIT_FAILED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 + + # ... but failed ones don't. Go figure. + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_FAILED + ) + schd.pool.compute_runahead(force=True) + if compat_mode == 'compat-mode': + assert int(str(schd.pool.runahead_limit_point)) == 5 + else: + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # mark cycle 1 as complete + # (via task message so the task gets removed before runahead compute) + schd.task_events_mgr.process_message( + schd.pool.get_task(point('0001'), 'a'), + logging.INFO, + TASK_OUTPUT_SUCCEEDED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 5 # +1 + + +@pytest.mark.parametrize('rhlimit', ['P2D', 'P2']) +@pytest.mark.parametrize('compat_mode', ['compat-mode', 'normal-mode']) +async def test_runahead_future_trigger( + flow, + scheduler, + start, + monkeypatch, + rhlimit, + compat_mode, +): + """Equivalent time interval and cycle count runahead limits should yield + the same limit point, even if there is a future trigger. + + See https://github.com/cylc/cylc-flow/pull/5893 + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYYMMDD', + }, + 'scheduling': { + 'initial cycle point': '2001', + 'runahead limit': rhlimit, + 'graph': { + 'P1D': ''' + a + a[+P1D] => b + ''', + }, + } + }) + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + compat_mode == 'compat-mode', + ) + schd = scheduler(id_,) + async with start(schd, level=logging.DEBUG): + assert str(schd.pool.runahead_limit_point) == '20010103' + schd.pool.release_runahead_tasks() + for itask in schd.pool.get_all_tasks(): + schd.pool.spawn_on_output(itask, 'succeeded') + # future trigger raises the limit by one cycle point + assert str(schd.pool.runahead_limit_point) == '20010104' + + +async def test_compute_runahead_against_task_state( + flow, + scheduler, + start, + monkeypatch, +): + """For each task status check whether changing the oldest task + to that status will cause compute_runahead to make a change. + """ + states = [ + # (Status, Are we expecting an update?) + (TASK_STATUS_WAITING, False), + (TASK_STATUS_EXPIRED, True), + (TASK_STATUS_PREPARING, False), + (TASK_STATUS_SUBMIT_FAILED, True), + (TASK_STATUS_SUBMITTED, False), + (TASK_STATUS_RUNNING, False), + (TASK_STATUS_FAILED, True), + (TASK_STATUS_SUCCEEDED, True) + ] + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': '%Y', + }, + 'scheduling': { + 'initial cycle point': '0001', + 'runahead limit': 'P1Y', + 'graph': { + 'P1Y': 'a' + }, + } + } + + def max_cycle(tasks): + return max([int(t.tokens.get("cycle")) for t in tasks]) + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + True) + monkeypatch.setattr( + 'cylc.flow.task_events_mgr.TaskEventsManager._insert_task_job', + lambda *_: True) + + schd = scheduler(flow(config)) + async with start(schd): + for task_status, new_runahead in states: + before = max_cycle(schd.pool.get_tasks()) + itask = schd.pool.get_task(ISO8601Point(f'{before - 2:04}'), 'a') + schd.task_events_mgr.process_message( + itask, + logging.INFO, + task_status, + ) + after = max_cycle(schd.pool.get_tasks()) + assert bool(before != after) == new_runahead diff --git a/tests/unit/cycling/test_cycling.py b/tests/unit/cycling/test_cycling.py index ae90b68e62f..eaf48aa1e35 100644 --- a/tests/unit/cycling/test_cycling.py +++ b/tests/unit/cycling/test_cycling.py @@ -88,86 +88,3 @@ def test_parse_bad_exclusion(expression): """Tests incorrectly formatted exclusions""" with pytest.raises(Exception): parse_exclusion(expression) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2/P2', 1), - None, - [2,4,6,8,10] - ), - ( - ('R/2/P2', 1), - 3, - [4,6,8,10,12] - ), - ), -) -def test_get_first_n_points_integer( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(INTEGER_CYCLING_TYPE) - sequence = IntegerSequence(*sequence) - if wf_start_point is not None: - wf_start_point = IntegerPoint(wf_start_point) - expected = [ - IntegerPoint(p) - for p in expected - ] - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - ) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2008/P2Y', '2001'), - None, - ['2008', '2010', '2012', '2014', '2016'] - ), - ( - ('R/2008/P2Y', '2001'), - '2009', - ['2010', '2012', '2014', '2016', '2018'] - ), - ), -) -def test_get_first_n_points_iso8601( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(ISO8601_CYCLING_TYPE, 'Z') - sequence = ISO8601Sequence(*sequence) - if wf_start_point is not None: - wf_start_point = ISO8601Point(wf_start_point) - expected = [ - ISO8601Point(p) - for p in expected - ] - - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - )