Skip to content

Commit

Permalink
Expire broadcasts on task completion rather than cyclically.
Browse files Browse the repository at this point in the history
Expire broadcasts on task completion rather than cyclically.
  • Loading branch information
wxtim committed Sep 5, 2024
1 parent 293c3cc commit 2956a0a
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 6 deletions.
28 changes: 26 additions & 2 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ def check_ext_triggers(self, itask, ext_trigger_queue):
return self._match_ext_trigger(itask)

def clear_broadcast(
self, point_strings=None, namespaces=None, cancel_settings=None):
self,
point_strings=None,
namespaces=None,
cancel_settings=None,
is_housekeeping=False
):
"""Clear broadcasts globally, or for listed namespaces and/or points.
Return a tuple (modified_settings, bad_options), where:
Expand All @@ -98,6 +103,10 @@ def clear_broadcast(
* namespaces: a list of bad namespaces.
* cancel: a list of tuples. Each tuple contains the keys of a bad
setting.
Args:
task_completion: Tells logging to indicate that this clearance is
automated housekeeping.
"""
# If cancel_settings defined, only clear specific broadcasts
cancel_keys_list = self._settings_to_keys_list(cancel_settings)
Expand Down Expand Up @@ -138,7 +147,10 @@ def clear_broadcast(
# Log the broadcast
self.workflow_db_mgr.put_broadcast(modified_settings, is_cancel=True)
LOG.info(
get_broadcast_change_report(modified_settings, is_cancel=True))
get_broadcast_change_report(
modified_settings,
is_cancel=True,
is_housekeeping=is_housekeeping))
if bad_options:
LOG.error(get_broadcast_bad_options_report(bad_options))
if modified_settings:
Expand All @@ -161,6 +173,18 @@ def expire_broadcast(self, cutoff=None, **kwargs):
return (None, {"expire": [cutoff]})
return self.clear_broadcast(point_strings=point_strings, **kwargs)

def housekeep(self, cycle, task):
"""Clear broadcasts to a specific task at a specific point."""
with self.lock:
if (
self.broadcasts.get(str(cycle), '')
and self.broadcasts[str(cycle)].get(task, '')
):
self.clear_broadcast(
point_strings=[cycle],
namespaces=[task],
is_housekeeping=True)

def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict:
"""Retrieve all broadcast variables that target a given task ID."""
if tokens is None or tokens == 'None':
Expand Down
16 changes: 13 additions & 3 deletions cylc/flow/broadcast_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CHANGE_PREFIX_CANCEL = "-"
CHANGE_PREFIX_SET = "+"
CHANGE_TITLE_CANCEL = "Broadcast cancelled:"
CHANGE_TITLE_CANCEL_ON_COMPLETION = (
"Broadcast cancelled (housekept on task completion):")
CHANGE_TITLE_SET = "Broadcast set:"


Expand Down Expand Up @@ -85,11 +87,19 @@ def get_broadcast_change_iter(modified_settings, is_cancel=False):
"value": str(value)}


def get_broadcast_change_report(modified_settings, is_cancel=False):
"""Return a string for reporting modification to broadcast settings."""
def get_broadcast_change_report(
modified_settings, is_cancel=False, is_housekeeping=False
):
"""Return a string for reporting modification to broadcast settings.
Args:
is_housekeeping: Note that this is an automatic cancellation.
"""
if not modified_settings:
return ""
if is_cancel:
if is_housekeeping:
msg = CHANGE_TITLE_CANCEL_ON_COMPLETION
elif is_cancel:
msg = CHANGE_TITLE_CANCEL
else:
msg = CHANGE_TITLE_SET
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,6 @@ async def _main_loop(self) -> None:
# A simulated task state change occurred.
self.reset_inactivity_timer()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

self.process_queued_task_messages()
Expand Down
12 changes: 12 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,18 @@ def spawn_on_output(self, itask, output, forced=False):

def remove_if_complete(
self, itask: TaskProxy, output: Optional[str] = None
) -> bool:
"""Wraps _remove_if_complete, clears broadcasts targeted
at this task if it's complete.
"""
complete = self._remove_if_complete(itask, output)
if complete:
self.task_events_mgr.broadcast_mgr.housekeep(
**itask.tokens.task)
return complete

def _remove_if_complete(
self, itask: TaskProxy, output: Optional[str] = None
) -> bool:
"""Remove a finished task if required outputs are complete.
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
import logging
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -2152,3 +2153,81 @@ async def test_trigger_unqueued(flow, scheduler, start):
assert not schd.pool.task_queue_mgr.force_released, (
"Triggering an unqueued task should not affect the force_released list"
)


async def test_broadcast_to_succeeded_task_not_cleared(
flow,
scheduler,
run,
complete,
log_filter
):
"""Clear broadcast on task exit, but don't routinely houskeep broadcasts
by cycle (as was earlier Cylc behaviour).
Instead, clear broadcasts for a task on task completion.
https://github.com/cylc/cylc-flow/issues/6308
"""
# Add a completely irrelevent broadcast to check that it's
# left alone by the clearing logic.
control = {
'not_a_hippo': {'environment': {'irrelephant': '42'}}}
ignore_me = {'1000': control}

def get_expect(val):
"""Shortcut to writing out the full dictionary"""
return {
'1000': {
'foo': {'environment': {'testvar': val}},
**control}}

# Setup scheduler:
wid = flow({
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': 1000,
'final cycle point': 1002,
'graph': {
'P1Y': 'foo[-P1Y] => foo',
'R1/$': 'not_a_hippo'
}},
'runtime': {'foo': {'environment': {'testvar': 1}}}
})
schd = scheduler(wid, paused_start=False)

async with run(schd) as log:
# Control broadcast - should be left alone:
schd.broadcast_mgr.put_broadcast(
["1000"], ['not_a_hippo'], [control['not_a_hippo']])

# Add a broadcast relating to a non-expired
# task and check that it's there:
schd.broadcast_mgr.put_broadcast(
["1000"], ['foo'], [{'environment': {'testvar': "2"}}])
assert schd.broadcast_mgr.broadcasts == get_expect('2')

# Allow the 1000/foo to complete, check that
# broadcasts have been cleared:
await complete(schd, '1000/foo')
assert schd.broadcast_mgr.broadcasts == ignore_me
assert log_filter(log, regex='testvar=2')[-1][2] == (
'Broadcast cancelled (housekept on task completion):'
'\n- [1000/foo] [environment]testvar=2')

# Update the broadcast and check that the main loop
# doesn't clear it:
schd.broadcast_mgr.put_broadcast(
["1000"], ['foo'], [{'environment': {'testvar': "3"}}])
await asyncio.sleep(1)
assert schd.broadcast_mgr.broadcasts == get_expect("3")

# Check that the broadcast _is_ consumed by the finishing
# of the task to which it applies a second time:
schd.pool.force_trigger_tasks(['1000/foo'], [2])
await complete(schd, '1000/foo')
assert schd.broadcast_mgr.broadcasts == ignore_me
assert log_filter(log, regex='testvar=3')[-1][2] == (
'Broadcast cancelled (housekept on task completion):'
'\n- [1000/foo] [environment]testvar=3'
)

0 comments on commit 2956a0a

Please sign in to comment.