Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: store soft deleted schema #873

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from karapace.typing import SchemaId
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import confluent_kafka
import json
import pytest
import random
Expand Down Expand Up @@ -243,3 +245,50 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
assert schema_reader.offset == 3
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_soft_deleted_schema_storing() -> None:
"""This tests a case when _schemas has been compacted and only
the soft deleted version of the schema is present.
"""
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)
soft_deleted_schema_record = Mock(spec=confluent_kafka.Message)
soft_deleted_schema_record.error.return_value = None
soft_deleted_schema_record.key.return_value = json.dumps(
{
"keytype": "SCHEMA",
"subject": "soft-delete-test",
"version": 1,
"magic": 0,
}
)
soft_deleted_schema_record.value.return_value = json.dumps(
{
"deleted": True,
"id": 1,
"schema": '"int"',
"subject": "test-soft-delete-test",
"version": 1,
}
)

consumer_mock.consume.return_value = [soft_deleted_schema_record]
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 1)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0

schema_reader.handle_messages()

soft_deleted_stored_schema = schema_reader.database.find_schema(schema_id=SchemaId(1))
assert soft_deleted_stored_schema is not None
Loading