From 98b6ef6dfa8973f55a4b60ccce11c78a50e7972d Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Mon, 13 May 2024 15:10:53 +0300 Subject: [PATCH] test: store soft deleted schema When _schemas topic has been compacted and only soft deleted version of the schema is present the schema must be stored in the in memory database. Fixes #170. --- tests/unit/test_schema_reader.py | 49 ++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 39ac0dfab..ebbdf3418 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -20,9 +20,11 @@ OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) +from karapace.typing import SchemaId from tests.base_testcase import BaseTestCase from unittest.mock import Mock +import confluent_kafka import json import pytest import random @@ -243,3 +245,50 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche assert schema_reader.offset == 3 assert schema_reader.ready is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP + + +def test_soft_deleted_schema_storing() -> None: + """This tests a case when _schemas has been compacted and only + the soft deleted version of the schema is present. + """ + key_formatter_mock = Mock(spec=KeyFormatter) + consumer_mock = Mock(spec=KafkaConsumer) + soft_deleted_schema_record = Mock(spec=confluent_kafka.Message) + soft_deleted_schema_record.error.return_value = None + soft_deleted_schema_record.key.return_value = json.dumps( + { + "keytype": "SCHEMA", + "subject": "soft-delete-test", + "version": 1, + "magic": 0, + } + ) + soft_deleted_schema_record.value.return_value = json.dumps( + { + "deleted": True, + "id": 1, + "schema": '"int"', + "subject": "test-soft-delete-test", + "version": 1, + } + ) + + consumer_mock.consume.return_value = [soft_deleted_schema_record] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 1) + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + + schema_reader.handle_messages() + + soft_deleted_stored_schema = schema_reader.database.find_schema(schema_id=SchemaId(1)) + assert soft_deleted_stored_schema is not None