diff --git a/.github/workflows/test_fast.yml b/.github/workflows/test_fast.yml index c7682f9458a..66f08ce61ac 100644 --- a/.github/workflows/test_fast.yml +++ b/.github/workflows/test_fast.yml @@ -32,7 +32,6 @@ jobs: time-zone: 'XXX-09:35' env: - # Use non-UTC time zone TZ: ${{ matrix.time-zone }} PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes diff --git a/changes.d/5893.fix b/changes.d/5893.fix new file mode 100644 index 00000000000..504cd6a649e --- /dev/null +++ b/changes.d/5893.fix @@ -0,0 +1 @@ +Fixed bug in computing a time interval-based runahead limit when future triggers are present. diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index e3d8a2fe64a..c47d242b5c5 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -415,22 +415,6 @@ def get_stop_point(self): """Return the last point of this sequence, or None if unbounded.""" pass - def get_first_n_points(self, n, point=None): - """Return a list of first n points of this sequence.""" - if point is None: - p1 = self.get_start_point() - else: - p1 = self.get_first_point(point) - if p1 is None: - return [] - result = [p1] - for _ in range(1, n): - p1 = self.get_next_point_on_sequence(p1) - if p1 is None: - break - result.append(p1) - return result - @abstractmethod def __eq__(self, other) -> bool: # Return True if other (sequence) is equal to self. diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index 553b63e4b2f..a66ce3f5ba0 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -947,12 +947,22 @@ def _interval_parse(interval_string): def point_parse(point_string: str) -> 'TimePoint': """Parse a point_string into a proper TimePoint object.""" - return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT) + return _point_parse( + point_string, + WorkflowSpecifics.DUMP_FORMAT, + WorkflowSpecifics.ASSUMED_TIME_ZONE + ) @lru_cache(10000) -def _point_parse(point_string, _dump_fmt): - """Parse a point_string into a proper TimePoint object.""" +def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint': + """Parse a point_string into a proper TimePoint object. + + Args: + point_string: The string to parse. + _dump_fmt: Dump format (only used to avoid invalid cache hits). + _tz: Cycle point time zone (only used to avoid invalid cache hits). + """ if "%" in WorkflowSpecifics.DUMP_FORMAT: # May be a custom not-quite ISO 8601 dump format. with contextlib.suppress(IsodatetimeError): diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 851ee1e39b7..bce46ffb6ff 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -307,8 +307,8 @@ def compute_runahead(self, force=False) -> bool: With force=True we recompute the limit even if the base point has not changed (needed if max_future_offset changed, or on reload). - """ + """ limit = self.config.runahead_limit # e.g. P2 or P2D count_cycles = False with suppress(TypeError): @@ -316,73 +316,39 @@ def compute_runahead(self, force=False) -> bool: ilimit = int(limit) # type: ignore count_cycles = True - base_point: 'PointBase' - points: List['PointBase'] = [] + base_point: Optional['PointBase'] = None + # First get the runahead base point. if not self.main_pool: - # No tasks yet, just consider sequence points. - if count_cycles: - # Get the first ilimit points in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for plist in [ - seq.get_first_n_points( - ilimit, self.config.start_point) - for seq in self.config.sequences - ] - for point in plist - ] - # Drop points beyond the limit. - points = sorted(points)[:ilimit + 1] - base_point = min(points) - - else: - # Start at first point in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for point in { - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - } - if point is not None - ] - base_point = min(points) - # Drop points beyond the limit. - points = [ - point - for point in points - if point <= base_point + limit - ] - + # Find the earliest sequence point beyond the workflow start point. + base_point = min( + point + for point in { + seq.get_first_point(self.config.start_point) + for seq in self.config.sequences + } + if point is not None + ) else: - # Find the earliest point with unfinished tasks. + # Find the earliest point with incomplete tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): + # All n=0 tasks are incomplete by definition, but Cylc 7 + # ignores failed ones (it does not ignore submit-failed!). if ( - points # got the limit already so this point too - or any( - not itask.state( - TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_EXPIRED - ) - or ( - # For Cylc 7 back-compat, ignore incomplete tasks. - # (Success is required in back-compat mode, so - # failedtasks end up as incomplete; and Cylc 7 - # ignores failed tasks in computing the limit). - itask.state.outputs.is_incomplete() - and not cylc.flow.flags.cylc7_back_compat - ) + cylc.flow.flags.cylc7_back_compat and + all( + itask.state(TASK_STATUS_FAILED) for itask in itasks ) ): - points.append(point) + continue + base_point = point + break - if not points: - return False - base_point = min(points) + if base_point is None: + return False + + LOG.debug(f"Runahead: base point {base_point}") if self._prev_runahead_base_point is None: self._prev_runahead_base_point = base_point @@ -399,8 +365,10 @@ def compute_runahead(self, force=False) -> bool: # change or the runahead limit is already at stop point. return False - # Get all cycle points possible after the base point. - sequence_points: Set['PointBase'] + # Now generate all possible cycle points from the base point and stop + # at the runahead limit point. Note both cycle count and time interval + # limits involve all possible cycles, not just active cycles. + sequence_points: Set['PointBase'] = set() if ( not force and self._prev_runahead_sequence_points @@ -410,18 +378,19 @@ def compute_runahead(self, force=False) -> bool: sequence_points = self._prev_runahead_sequence_points else: # Recompute possible points. - sequence_points = set() for sequence in self.config.sequences: - seq_point = sequence.get_next_point(base_point) + seq_point = sequence.get_first_point(base_point) count = 1 while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. if count > 1 + ilimit: + # this point may be beyond the runahead limit break else: # PT0H allows only the base cycle point to run. if seq_point > base_point + limit: + # this point can not be beyond the runahead limit break count += 1 sequence_points.add(seq_point) @@ -429,25 +398,28 @@ def compute_runahead(self, force=False) -> bool: self._prev_runahead_sequence_points = sequence_points self._prev_runahead_base_point = base_point - points = set(points).union(sequence_points) - if count_cycles: - # Some sequences may have different intervals. - limit_point = sorted(points)[:(ilimit + 1)][-1] + # (len(list) may be less than ilimit due to sequence end) + limit_point = sorted(sequence_points)[:ilimit + 1][-1] else: - # We already stopped at the runahead limit. - limit_point = sorted(points)[-1] + limit_point = max(sequence_points) - # Adjust for future offset and stop point, if necessary. + # Adjust for future offset and stop point. pre_adj_limit = limit_point if self.max_future_offset is not None: limit_point += self.max_future_offset - LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)") + LOG.debug( + "Runahead (future trigger adjust):" + f" {pre_adj_limit} -> {limit_point}" + ) if self.stop_point and limit_point > self.stop_point: limit_point = self.stop_point - LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)") - LOG.debug(f"Runahead limit: {limit_point}") + LOG.debug( + "Runahead (stop point adjust):" + f" {pre_adj_limit} -> {limit_point} (stop point)" + ) + LOG.debug(f"Runahead limit: {limit_point}") self.runahead_limit_point = limit_point return True diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 97526088660..18057a86339 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -38,6 +38,7 @@ TASK_STATUS_FAILED, TASK_STATUS_EXPIRED, TASK_STATUS_SUBMIT_FAILED, + TASK_STATUSES_ALL, ) # NOTE: foo and bar have no parents so at start-up (even with the workflow @@ -1008,7 +1009,8 @@ async def test_runahead_limit_for_sequence_before_start_cycle( ): """It should obey the runahead limit. - Ensure the runahead limit is computed correctly for sequences before the start cycle + Ensure the runahead limit is computed correctly for sequences that begin + before the start cycle. See https://github.com/cylc/cylc-flow/issues/5603 """ @@ -1154,11 +1156,12 @@ async def test_no_flow_tasks_dont_spawn( for itask in pool ] == pool + async def test_task_proxy_remove_from_queues( flow, one_conf, scheduler, start, ): """TaskPool.remove should delete task proxies from queues. - + See https://github.com/cylc/cylc-flow/pull/5573 """ # Set up a scheduler with a non-default queue: @@ -1243,6 +1246,218 @@ async def test_detect_incomplete_tasks( # ensure that it is correctly identified as incomplete assert itask.state.outputs.get_incomplete() assert itask.state.outputs.is_incomplete() - assert log_filter(log, contains=f"[{itask}] did not complete required outputs:") + assert log_filter( + log, contains=f"[{itask}] did not complete required outputs:") # the task should not have been removed assert itask in schd.pool.get_tasks() + + +@pytest.mark.parametrize('compat_mode', ['compat-mode', 'normal-mode']) +@pytest.mark.parametrize('cycling_mode', ['integer', 'datetime']) +@pytest.mark.parametrize('runahead_format', ['P3Y', 'P3']) +async def test_compute_runahead( + cycling_mode, + compat_mode, + runahead_format, + flow, + scheduler, + start, + monkeypatch, +): + """Test the calculation of the runahead limit. + + This test ensures that: + * Runahead tasks are excluded from computations + see https://github.com/cylc/cylc-flow/issues/5825 + * Tasks are initiated with the correct is_runahead status on startup. + * Behaviour in compat/regular modes is same unless failed tasks are present + * Behaviour is the same for integer/datetime cycling modes. + + """ + if cycling_mode == 'integer': + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'runahead limit': 'P3', + 'graph': { + 'P1': 'a' + }, + } + } + point = lambda point: IntegerPoint(str(int(point))) + else: + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '0001', + 'runahead limit': runahead_format, + 'graph': { + 'P1Y': 'a' + }, + } + } + point = ISO8601Point + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + compat_mode == 'compat-mode', + ) + + id_ = flow(config) + schd = scheduler(id_) + async with start(schd): + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 + + # ensure task states are initiated with is_runahead status + assert schd.pool.get_task(point('0001'), 'a').state(is_runahead=False) + assert schd.pool.get_task(point('0005'), 'a').state(is_runahead=True) + + # mark the first three cycles as running + for cycle in range(1, 4): + schd.pool.get_task(point(f'{cycle:04}'), 'a').state.reset( + TASK_STATUS_RUNNING + ) + + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # In Cylc 8 all incomplete tasks hold back runahead. + + # In Cylc 7, submit-failed tasks hold back runahead.. + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_SUBMIT_FAILED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 4 + + # ... but failed ones don't. Go figure. + schd.pool.get_task(point('0001'), 'a').state.reset( + TASK_STATUS_FAILED + ) + schd.pool.compute_runahead(force=True) + if compat_mode == 'compat-mode': + assert int(str(schd.pool.runahead_limit_point)) == 5 + else: + assert int(str(schd.pool.runahead_limit_point)) == 4 # no change + + # mark cycle 1 as complete + # (via task message so the task gets removed before runahead compute) + schd.task_events_mgr.process_message( + schd.pool.get_task(point('0001'), 'a'), + logging.INFO, + TASK_OUTPUT_SUCCEEDED + ) + schd.pool.compute_runahead(force=True) + assert int(str(schd.pool.runahead_limit_point)) == 5 # +1 + + +@pytest.mark.parametrize('rhlimit', ['P2D', 'P2']) +@pytest.mark.parametrize('compat_mode', ['compat-mode', 'normal-mode']) +async def test_runahead_future_trigger( + flow, + scheduler, + start, + monkeypatch, + rhlimit, + compat_mode, +): + """Equivalent time interval and cycle count runahead limits should yield + the same limit point, even if there is a future trigger. + + See https://github.com/cylc/cylc-flow/pull/5893 + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': 'CCYYMMDD', + }, + 'scheduling': { + 'initial cycle point': '2001', + 'runahead limit': rhlimit, + 'graph': { + 'P1D': ''' + a + a[+P1D] => b + ''', + }, + } + }) + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + compat_mode == 'compat-mode', + ) + schd = scheduler(id_,) + async with start(schd, level=logging.DEBUG): + assert str(schd.pool.runahead_limit_point) == '20010103' + schd.pool.release_runahead_tasks() + for itask in schd.pool.get_all_tasks(): + schd.pool.spawn_on_output(itask, 'succeeded') + # future trigger raises the limit by one cycle point + assert str(schd.pool.runahead_limit_point) == '20010104' + + +async def test_compute_runahead_against_task_state( + flow, + scheduler, + start, + monkeypatch, +): + """For each task status check whether changing the oldest task + to that status will cause compute_runahead to make a change. + """ + states = [ + # (Status, Are we expecting an update?) + (TASK_STATUS_WAITING, False), + (TASK_STATUS_EXPIRED, True), + (TASK_STATUS_PREPARING, False), + (TASK_STATUS_SUBMIT_FAILED, True), + (TASK_STATUS_SUBMITTED, False), + (TASK_STATUS_RUNNING, False), + (TASK_STATUS_FAILED, True), + (TASK_STATUS_SUCCEEDED, True) + ] + config = { + 'scheduler': { + 'allow implicit tasks': 'True', + 'cycle point format': '%Y', + }, + 'scheduling': { + 'initial cycle point': '0001', + 'runahead limit': 'P1Y', + 'graph': { + 'P1Y': 'a' + }, + } + } + + def max_cycle(tasks): + return max([int(t.tokens.get("cycle")) for t in tasks]) + + monkeypatch.setattr( + 'cylc.flow.flags.cylc7_back_compat', + True) + monkeypatch.setattr( + 'cylc.flow.task_events_mgr.TaskEventsManager._insert_task_job', + lambda *_: True) + + schd = scheduler(flow(config)) + async with start(schd): + for task_status, new_runahead in states: + before = max_cycle(schd.pool.get_tasks()) + itask = schd.pool.get_task(ISO8601Point(f'{before - 2:04}'), 'a') + schd.task_events_mgr.process_message( + itask, + logging.INFO, + task_status, + ) + after = max_cycle(schd.pool.get_tasks()) + assert bool(before != after) == new_runahead diff --git a/tests/unit/cycling/test_cycling.py b/tests/unit/cycling/test_cycling.py index ae90b68e62f..eaf48aa1e35 100644 --- a/tests/unit/cycling/test_cycling.py +++ b/tests/unit/cycling/test_cycling.py @@ -88,86 +88,3 @@ def test_parse_bad_exclusion(expression): """Tests incorrectly formatted exclusions""" with pytest.raises(Exception): parse_exclusion(expression) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2/P2', 1), - None, - [2,4,6,8,10] - ), - ( - ('R/2/P2', 1), - 3, - [4,6,8,10,12] - ), - ), -) -def test_get_first_n_points_integer( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(INTEGER_CYCLING_TYPE) - sequence = IntegerSequence(*sequence) - if wf_start_point is not None: - wf_start_point = IntegerPoint(wf_start_point) - expected = [ - IntegerPoint(p) - for p in expected - ] - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - ) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2008/P2Y', '2001'), - None, - ['2008', '2010', '2012', '2014', '2016'] - ), - ( - ('R/2008/P2Y', '2001'), - '2009', - ['2010', '2012', '2014', '2016', '2018'] - ), - ), -) -def test_get_first_n_points_iso8601( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(ISO8601_CYCLING_TYPE, 'Z') - sequence = ISO8601Sequence(*sequence) - if wf_start_point is not None: - wf_start_point = ISO8601Point(wf_start_point) - expected = [ - ISO8601Point(p) - for p in expected - ] - - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - )