From 14d8214a764afed711fb06b5ad2261962116e4c8 Mon Sep 17 00:00:00 2001 From: Nicolas Gilles Date: Thu, 14 Mar 2024 20:13:10 +0100 Subject: [PATCH] fix: resume from reader cursor when no senders configured In cases where a reader was configured with no senders (eg. only configured with searches), then in case of reloads/restarts the resume cursor was assumed `None`, and the default 'initial_position' of 'head' is fallen back to. This would end up causing all matching searchg to resend metric events immediately. --- journalpump/journalpump.py | 6 ++++- test/test_journalpump.py | 46 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/journalpump/journalpump.py b/journalpump/journalpump.py index 7f20cdf..ad85b69 100644 --- a/journalpump/journalpump.py +++ b/journalpump/journalpump.py @@ -838,7 +838,11 @@ def configure_readers(self): for reader_name, reader_config in new_config.items(): reader_state = state.get("readers", {}).get(reader_name, {}) - resume_cursor = None + + # Default to reader's cursor, in case we have no senders + resume_cursor = reader_state.get("cursor") + + # Check sender cursors for sender_name, sender in reader_state.get("senders", {}).items(): sender_cursor = sender["sent"]["cursor"] if sender_cursor is None: diff --git a/test/test_journalpump.py b/test/test_journalpump.py index dbcd63f..661ef05 100644 --- a/test/test_journalpump.py +++ b/test/test_journalpump.py @@ -982,6 +982,52 @@ def test_log_level_filtering(self): assert len(sender_d_msgs) == expected_results +def test_journalpump_resume_cursor(tmpdir) -> None: + journalpump_path = str(tmpdir.join("journalpump.json")) + statefile_path = str(tmpdir.join("journalpump_state.json")) + + config = { + "json_state_file_path": statefile_path, + "readers": { + "without_senders": {"senders": {}}, + "with_sender": { + "senders": { + "fake_syslog": { + "output_type": "rsyslog", + "rsyslog_server": "127.0.0.1", + "rsyslog_port": 514, + }, + } + }, + }, + } + + state = { + "readers": { + "without_senders": { + "cursor": "reader_cursor", + }, + "with_sender": { + "cursor": "reader_cursor", + "senders": {"fake_syslog": {"sent": {"cursor": "sender_cursor"}}}, + }, + }, + "start_time": datetime.now().isoformat() + } + + with open(journalpump_path, "w") as fp: + fp.write(json.dumps(config)) + + with open(statefile_path, "w") as fp: + fp.write(json.dumps(state)) + + pump = JournalPump(journalpump_path) + + # Check that readers are initialized with the right cursors + assert pump.readers["without_senders"].cursor == "reader_cursor" + assert pump.readers["with_sender"].cursor == "sender_cursor" + + def test_journalpump_state_file(tmpdir): journalpump_path = str(tmpdir.join("journalpump.json")) statefile_path = str(tmpdir.join("journalpump_state.json"))