Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move offset always, also with invalid processed records #894

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ def handle_messages(self) -> None:
assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError:
LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset())
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
continue
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
LOG.error(
Expand All @@ -380,7 +382,9 @@ def handle_messages(self) -> None:
try:
value = self._parse_message_value(message_value)
except JSONDecodeError:
LOG.exception("Invalid JSON in msg.value() at offset %s", msg.offset())
# Invalid entry shall also move the offset so Karapace makes progress towards ready state.
self.offset = msg.offset()
LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset())
continue

self.handle_msg(key, value)
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"""

from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import (
KafkaSchemaReader,
Expand All @@ -20,6 +23,7 @@
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import json
import pytest
import random
import time
Expand Down Expand Up @@ -184,3 +188,58 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None:
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)

schema_str = json.dumps(
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
).encode()

ok1_message = Mock(spec=Message)
ok1_message.key.return_value = b'{"keytype":"SCHEMA","subject1":"test","version":1,"magic":1}'
ok1_message.error.return_value = None
ok1_message.value.return_value = schema_str
ok1_message.offset.return_value = 1
invalid_key_message = Mock(spec=Message)
invalid_key_message.key.return_value = "invalid-key"
invalid_key_message.error.return_value = None
invalid_key_message.value.return_value = schema_str
invalid_key_message.offset.return_value = 2
invalid_value_message = Mock(spec=Message)
invalid_value_message.key.return_value = b'{"keytype":"SCHEMA","subject3":"test","version":1,"magic":1}'
invalid_value_message.error.return_value = None
invalid_value_message.value.return_value = "invalid-value"
invalid_value_message.offset.return_value = 3

consumer_mock.consume.side_effect = [ok1_message], [invalid_key_message], [invalid_value_message], []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 4)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.offset == 1
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 2
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 3
assert schema_reader.ready is False
schema_reader.handle_messages() # call last time to call _is_ready()
assert schema_reader.offset == 3
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
Loading