diff --git a/cylc/flow/broadcast_mgr.py b/cylc/flow/broadcast_mgr.py index b9329114302..a96e7956197 100644 --- a/cylc/flow/broadcast_mgr.py +++ b/cylc/flow/broadcast_mgr.py @@ -84,7 +84,12 @@ def check_ext_triggers(self, itask, ext_trigger_queue): return self._match_ext_trigger(itask) def clear_broadcast( - self, point_strings=None, namespaces=None, cancel_settings=None): + self, + point_strings=None, + namespaces=None, + cancel_settings=None, + is_housekeeping=False + ): """Clear broadcasts globally, or for listed namespaces and/or points. Return a tuple (modified_settings, bad_options), where: @@ -98,6 +103,10 @@ def clear_broadcast( * namespaces: a list of bad namespaces. * cancel: a list of tuples. Each tuple contains the keys of a bad setting. + + Args: + task_completion: Tells logging to indicate that this clearance is + automated housekeeping. """ # If cancel_settings defined, only clear specific broadcasts cancel_keys_list = self._settings_to_keys_list(cancel_settings) @@ -138,7 +147,10 @@ def clear_broadcast( # Log the broadcast self.workflow_db_mgr.put_broadcast(modified_settings, is_cancel=True) LOG.info( - get_broadcast_change_report(modified_settings, is_cancel=True)) + get_broadcast_change_report( + modified_settings, + is_cancel=True, + is_housekeeping=is_housekeeping)) if bad_options: LOG.error(get_broadcast_bad_options_report(bad_options)) if modified_settings: @@ -161,6 +173,18 @@ def expire_broadcast(self, cutoff=None, **kwargs): return (None, {"expire": [cutoff]}) return self.clear_broadcast(point_strings=point_strings, **kwargs) + def housekeep(self, cycle, task): + """Clear broadcasts to a specific task at a specific point.""" + with self.lock: + if ( + self.broadcasts.get(str(cycle), '') + and self.broadcasts[str(cycle)].get(task, '') + ): + self.clear_broadcast( + point_strings=[cycle], + namespaces=[task], + is_housekeeping=True) + def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict: """Retrieve all broadcast variables that target a given task ID.""" if tokens is None or tokens == 'None': diff --git a/cylc/flow/broadcast_report.py b/cylc/flow/broadcast_report.py index cb44f2212f3..562ddbb5c96 100644 --- a/cylc/flow/broadcast_report.py +++ b/cylc/flow/broadcast_report.py @@ -24,6 +24,8 @@ CHANGE_PREFIX_CANCEL = "-" CHANGE_PREFIX_SET = "+" CHANGE_TITLE_CANCEL = "Broadcast cancelled:" +CHANGE_TITLE_CANCEL_ON_COMPLETION = ( + "Broadcast cancelled (housekept on task completion):") CHANGE_TITLE_SET = "Broadcast set:" @@ -85,11 +87,19 @@ def get_broadcast_change_iter(modified_settings, is_cancel=False): "value": str(value)} -def get_broadcast_change_report(modified_settings, is_cancel=False): - """Return a string for reporting modification to broadcast settings.""" +def get_broadcast_change_report( + modified_settings, is_cancel=False, is_housekeeping=False +): + """Return a string for reporting modification to broadcast settings. + + Args: + is_housekeeping: Note that this is an automatic cancellation. + """ if not modified_settings: return "" - if is_cancel: + if is_housekeeping: + msg = CHANGE_TITLE_CANCEL_ON_COMPLETION + elif is_cancel: msg = CHANGE_TITLE_CANCEL else: msg = CHANGE_TITLE_SET diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 92702b0b55e..39d3e799845 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1531,7 +1531,6 @@ async def _main_loop(self) -> None: # A simulated task state change occurred. self.reset_inactivity_timer() - self.broadcast_mgr.expire_broadcast(self.pool.get_min_point()) self.late_tasks_check() self.process_queued_task_messages() diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index aeee7505e66..7c16c84579e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1440,6 +1440,18 @@ def spawn_on_output(self, itask, output, forced=False): def remove_if_complete( self, itask: TaskProxy, output: Optional[str] = None + ) -> bool: + """Wraps _remove_if_complete, clears broadcasts targeted + at this task if it's complete. + """ + complete = self._remove_if_complete(itask, output) + if complete: + self.task_events_mgr.broadcast_mgr.housekeep( + **itask.tokens.task) + return complete + + def _remove_if_complete( + self, itask: TaskProxy, output: Optional[str] = None ) -> bool: """Remove a finished task if required outputs are complete. diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 0428e3af6c1..4885e9879a0 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio import logging from typing import ( TYPE_CHECKING, @@ -2152,3 +2153,81 @@ async def test_trigger_unqueued(flow, scheduler, start): assert not schd.pool.task_queue_mgr.force_released, ( "Triggering an unqueued task should not affect the force_released list" ) + + +async def test_broadcast_to_succeeded_task_not_cleared( + flow, + scheduler, + run, + complete, + log_filter +): + """Clear broadcast on task exit, but don't routinely houskeep broadcasts + by cycle (as was earlier Cylc behaviour). + + Instead, clear broadcasts for a task on task completion. + + https://github.com/cylc/cylc-flow/issues/6308 + """ + # Add a completely irrelevent broadcast to check that it's + # left alone by the clearing logic. + control = { + 'not_a_hippo': {'environment': {'irrelephant': '42'}}} + ignore_me = {'1000': control} + + def get_expect(val): + """Shortcut to writing out the full dictionary""" + return { + '1000': { + 'foo': {'environment': {'testvar': val}}, + **control}} + + # Setup scheduler: + wid = flow({ + 'scheduler': {'cycle point format': '%Y'}, + 'scheduling': { + 'initial cycle point': 1000, + 'final cycle point': 1002, + 'graph': { + 'P1Y': 'foo[-P1Y] => foo', + 'R1/$': 'not_a_hippo' + }}, + 'runtime': {'foo': {'environment': {'testvar': 1}}} + }) + schd = scheduler(wid, paused_start=False) + + async with run(schd) as log: + # Control broadcast - should be left alone: + schd.broadcast_mgr.put_broadcast( + ["1000"], ['not_a_hippo'], [control['not_a_hippo']]) + + # Add a broadcast relating to a non-expired + # task and check that it's there: + schd.broadcast_mgr.put_broadcast( + ["1000"], ['foo'], [{'environment': {'testvar': "2"}}]) + assert schd.broadcast_mgr.broadcasts == get_expect('2') + + # Allow the 1000/foo to complete, check that + # broadcasts have been cleared: + await complete(schd, '1000/foo') + assert schd.broadcast_mgr.broadcasts == ignore_me + assert log_filter(log, regex='testvar=2')[-1][2] == ( + 'Broadcast cancelled (housekept on task completion):' + '\n- [1000/foo] [environment]testvar=2') + + # Update the broadcast and check that the main loop + # doesn't clear it: + schd.broadcast_mgr.put_broadcast( + ["1000"], ['foo'], [{'environment': {'testvar': "3"}}]) + await asyncio.sleep(1) + assert schd.broadcast_mgr.broadcasts == get_expect("3") + + # Check that the broadcast _is_ consumed by the finishing + # of the task to which it applies a second time: + schd.pool.force_trigger_tasks(['1000/foo'], [2]) + await complete(schd, '1000/foo') + assert schd.broadcast_mgr.broadcasts == ignore_me + assert log_filter(log, regex='testvar=3')[-1][2] == ( + 'Broadcast cancelled (housekept on task completion):' + '\n- [1000/foo] [environment]testvar=3' + )