diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 76f0e22b9..af41aae99 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None: old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE ) assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_compatible_in_transitive_mode() -> None: + old_json = '{"type": "array", "name": "name_old"}' + new_json = '{"type": "array", "name": "name_new"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_incompatible_in_transitive_mode() -> None: + old_json = '{"type": "array"}' + new_json = '{"type": "integer"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.incompatible diff --git a/tests/unit/test_schema_registry.py b/tests/unit/test_schema_registry.py new file mode 100644 index 000000000..c4a25864b --- /dev/null +++ b/tests/unit/test_schema_registry.py @@ -0,0 +1,257 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.schema_compatibility import SchemaCompatibility +from karapace.config import DEFAULTS +from karapace.in_memory_database import InMemoryDatabase +from karapace.schema_models import ParsedTypedSchema, SchemaVersion, TypedSchema, ValidatedTypedSchema +from karapace.schema_registry import KarapaceSchemaRegistry +from karapace.schema_type import SchemaType +from karapace.typing import Subject, Version +from unittest import mock +from unittest.mock import MagicMock, Mock + +import logging + +LOG = logging.getLogger(__name__) + + +class MockedSchemaRegistry: + def __init__(self, compatibility_mode: CompatibilityModes, has_deleted_schema: bool = False): + self.schema_registry = KarapaceSchemaRegistry(DEFAULTS) + self.compatibility_mode = compatibility_mode + self.schema_registry.get_compatibility_mode = Mock(return_value=self.compatibility_mode) + + schema_version1 = Mock(spec=SchemaVersion) + schema_version1.deleted = False + typed_schema1 = Mock(spec=TypedSchema) + schema_version1.schema = typed_schema1 + + schema_version2 = Mock(spec=SchemaVersion) + schema_version2.deleted = has_deleted_schema + typed_schema2 = Mock(spec=TypedSchema) + schema_version2.schema = typed_schema2 + + schema_version3 = Mock(spec=SchemaVersion) + typed_schema3 = Mock(spec=TypedSchema) + schema_version3.deleted = False + schema_version3.schema = typed_schema3 + + self.schema_registry.database = Mock(spec=InMemoryDatabase) + + self.schema_registry.database.find_subject_schemas = Mock( + return_value={ + Version(1): schema_version1, + Version(2): schema_version2, + Version(3): schema_version3, + } + ) + + self.parsed_schema1 = Mock(spec=ParsedTypedSchema) + self.parsed_schema2 = Mock(spec=ParsedTypedSchema) + self.parsed_schema3 = Mock(spec=ParsedTypedSchema) + + def resolve_and_parse_mock(schema: TypedSchema) -> ParsedTypedSchema: + if schema == typed_schema1: + return self.parsed_schema1 + if schema == typed_schema2: + return self.parsed_schema2 + if schema == typed_schema3: + return self.parsed_schema3 + raise ValueError(f"Unexpected object {schema}") + + self.schema_registry.resolve_and_parse = MagicMock(side_effect=resolve_and_parse_mock) + + def check_schema_compatibility( + self, + new_schema: ValidatedTypedSchema, + subject: Subject, + ) -> SchemaCompatibilityResult: + return self.schema_registry.check_schema_compatibility(new_schema, subject) + + +async def test_schema_compatible_in_transitive_mode() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.compatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.compatible + + # All 3 schemas are checked against + SchemaCompatibility.check_compatibility.assert_has_calls( + [ + mock.call( + old_schema=schema_registry.parsed_schema1, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + mock.call( + old_schema=schema_registry.parsed_schema2, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + mock.call( + old_schema=schema_registry.parsed_schema3, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + ] + ) + + +async def test_schema_incompatible_in_transitive_mode() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.incompatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.incompatible + + # Only one schema is checked against (first fail stops all checks) + SchemaCompatibility.check_compatibility.assert_has_calls( + [ + mock.call( + old_schema=schema_registry.parsed_schema1, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + ] + ) + + +async def test_schema_compatible_in_not_transitive_mode() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.compatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.compatible + + # Only the last schema is checked against (not transitive) + SchemaCompatibility.check_compatibility.assert_has_calls( + [ + mock.call( + old_schema=schema_registry.parsed_schema3, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ) + ] + ) + + +async def test_schema_incompatible_in_not_transitive_mode() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.incompatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.incompatible + + # Only the last schema is checked against (not transitive) + SchemaCompatibility.check_compatibility.assert_has_calls( + [ + mock.call( + old_schema=schema_registry.parsed_schema3, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ) + ] + ) + + +async def test_schema_compatible_with_no_live_schemas() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # No schemas in registry + schema_registry.schema_registry.database.find_subject_schemas = Mock(return_value={}) + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.compatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.compatible + + # No check is done (no existing schemas) + SchemaCompatibility.check_compatibility.assert_not_called() + + +async def test_schema_compatible_in_transitive_mode_with_deleted_schema() -> None: + # Given + schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE, has_deleted_schema=True) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + subject = Subject("subject") + + # Don't test the actual compatibility checks + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.compatible + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + # When + schema_registry.check_schema_compatibility(new_schema, subject) + + # Then + assert result.compatibility is SchemaCompatibilityType.compatible + + # Only 2 schemas are checked against (the 3rd one is deleted) + SchemaCompatibility.check_compatibility.assert_has_calls( + [ + mock.call( + old_schema=schema_registry.parsed_schema1, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + mock.call( + old_schema=schema_registry.parsed_schema3, + new_schema=new_schema, + compatibility_mode=schema_registry.compatibility_mode, + ), + ] + ) diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..0dab348ac 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -3,18 +3,24 @@ See LICENSE for details """ from aiohttp.test_utils import TestClient, TestServer +from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import DEFAULTS, set_config_defaults -from karapace.rapu import HTTPResponse +from karapace.rapu import HTTPRequest, HTTPResponse +from karapace.schema_models import ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader from karapace.schema_registry import KarapaceSchemaRegistry from karapace.schema_registry_apis import KarapaceSchemaRegistryController +from karapace.schema_type import SchemaType +from karapace.typing import Subject from unittest.mock import ANY, AsyncMock, Mock, patch, PropertyMock import asyncio import pytest -async def test_validate_schema_request_body(): +async def test_validate_schema_request_body() -> None: controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) controller._validate_schema_request_body( # pylint: disable=W0212 @@ -30,7 +36,7 @@ async def test_validate_schema_request_body(): assert str(exc_info.value) == "HTTPResponse 422" -async def test_forward_when_not_ready(): +async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) @@ -63,3 +69,42 @@ async def test_forward_when_not_ready(): mock_forward_func.assert_called_once_with( request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" ) + + +async def test_compatibility_check_in_not_transitive_mode() -> None: + # Given + config = set_config_defaults(DEFAULTS) + config["compatibility"] = "FULL_TRANSITIVE" + controller = KarapaceSchemaRegistryController(config=config) + + result = Mock(spec=SchemaCompatibilityResult) + result.compatibility = SchemaCompatibilityType.compatible + + compatibility_mode = CompatibilityModes.FULL + controller.schema_registry = Mock(spec=KarapaceSchemaRegistry) + controller.schema_registry.get_compatibility_mode = Mock(return_value=compatibility_mode) + SchemaCompatibility.check_compatibility = Mock(return_value=result) + + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + controller.get_new_schema = Mock(return_value=new_schema) + + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}') + controller.get_old_schema = Mock(return_value=old_schema) + + request_mock = Mock(HTTPRequest) + request_mock.json = '{"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}}' + + # When + with pytest.raises(HTTPResponse) as exc_info: + await controller.compatibility_check( + "application/json", + subject=Subject("subject1"), + version="1", + request=request_mock, + ) + + # Then only check for compatibility against the provided schema version + SchemaCompatibility.check_compatibility.assert_called_once_with(old_schema, new_schema, compatibility_mode) + + assert exc_info.type is HTTPResponse + assert str(exc_info.value) == "HTTPResponse 200"