Skip to content

Commit

Permalink
Merge pull request #167 from Aiven-Open/ngilles/resume-from-read-curs…
Browse files Browse the repository at this point in the history
…or-when-no-senders

fix: resume from reader cursor when no senders configured
  • Loading branch information
aiven-amartin authored Mar 15, 2024
2 parents 3f80062 + 7d01dac commit 5cf281e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
6 changes: 5 additions & 1 deletion journalpump/journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,11 @@ def configure_readers(self):

for reader_name, reader_config in new_config.items():
reader_state = state.get("readers", {}).get(reader_name, {})
resume_cursor = None

# Default to reader's cursor, in case we have no senders
resume_cursor = reader_state.get("cursor")

# Check sender cursors
for sender_name, sender in reader_state.get("senders", {}).items():
sender_cursor = sender["sent"]["cursor"]
if sender_cursor is None:
Expand Down
46 changes: 46 additions & 0 deletions test/test_journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,52 @@ def test_log_level_filtering(self):
assert len(sender_d_msgs) == expected_results


def test_journalpump_resume_cursor(tmpdir) -> None:
journalpump_path = str(tmpdir.join("journalpump.json"))
statefile_path = str(tmpdir.join("journalpump_state.json"))

config = {
"json_state_file_path": statefile_path,
"readers": {
"without_senders": {"senders": {}},
"with_sender": {
"senders": {
"fake_syslog": {
"output_type": "rsyslog",
"rsyslog_server": "127.0.0.1",
"rsyslog_port": 514,
},
}
},
},
}

state = {
"readers": {
"without_senders": {
"cursor": "reader_cursor",
},
"with_sender": {
"cursor": "reader_cursor",
"senders": {"fake_syslog": {"sent": {"cursor": "sender_cursor"}}},
},
},
"start_time": datetime.now().isoformat(),
}

with open(journalpump_path, "w") as fp:
fp.write(json.dumps(config))

with open(statefile_path, "w") as fp:
fp.write(json.dumps(state))

pump = JournalPump(journalpump_path)

# Check that readers are initialized with the right cursors
assert pump.readers["without_senders"].cursor == "reader_cursor"
assert pump.readers["with_sender"].cursor == "sender_cursor"


def test_journalpump_state_file(tmpdir):
journalpump_path = str(tmpdir.join("journalpump.json"))
statefile_path = str(tmpdir.join("journalpump_state.json"))
Expand Down

0 comments on commit 5cf281e

Please sign in to comment.