Skip to content

Commit

Permalink
fix: move offset always, also with invalid processed records
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jun 6, 2024
1 parent a189931 commit 00ba719
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
4 changes: 4 additions & 0 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ def handle_messages(self) -> None:
assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError:
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset())
continue
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
Expand Down Expand Up @@ -380,6 +382,8 @@ def handle_messages(self) -> None:
try:
value = self._parse_message_value(message_value)
except JSONDecodeError:
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.exception("Invalid JSON in msg.value() at offset %s", msg.offset())
continue

Expand Down
51 changes: 51 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"""

from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import (
KafkaSchemaReader,
Expand All @@ -20,6 +23,7 @@
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import json
import pytest
import random
import time
Expand Down Expand Up @@ -184,3 +188,50 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None:
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)

schema_str = json.dumps(
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
).encode()

ok1_message = Mock(spec=Message)
ok1_message.key.return_value = b'{"keytype":"SCHEMA","subject1":"test","version":1,"magic":1}'
ok1_message.error.return_value = None
ok1_message.value.return_value = schema_str
ok1_message.offset.return_value = 0
invalid_key_message = Mock(spec=Message)
invalid_key_message.key.return_value = "invalid-key"
invalid_key_message.error.return_value = None
invalid_key_message.value.return_value = schema_str
invalid_key_message.offset.return_value = 1
invalid_value_message = Mock(spec=Message)
invalid_value_message.key.return_value = b'{"keytype":"SCHEMA","subject3":"test","version":1,"magic":1}'
invalid_value_message.error.return_value = None
invalid_value_message.value.return_value = "invalid-value"
invalid_value_message.offset.return_value = 2

consumer_mock.consume.side_effect = [ok1_message, invalid_key_message, invalid_value_message], []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 3)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.offset == 2
schema_reader.handle_messages() # call again to call _is_ready()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP

0 comments on commit 00ba719

Please sign in to comment.