From cc5d5a3d017f2fd3be2f3c2ebd6ee0e60d08c85c Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 7 Dec 2022 12:48:12 +0200 Subject: [PATCH 1/4] feat: Avro disable enum and name validation When reading and parsing existing schemas the enum and name validation is disabled for Avro schemas. There can be existing schemas where enum or name is not compliant with Avro specification and have passed by previous Avro SDK versions. Current Java SDK does allow creating non-compliant names, for example having hyphen in the name "test-name". --- karapace/compatibility/__init__.py | 4 +- karapace/schema_models.py | 146 ++++++++++++++------ karapace/schema_registry.py | 6 +- karapace/schema_registry_apis.py | 36 +++-- karapace/serialization.py | 14 +- requirements.txt | 4 +- tests/integration/test_schema.py | 183 ++++++++++++++++++++++++++ tests/unit/test_avro_compatibility.py | 3 +- 8 files changed, 332 insertions(+), 64 deletions(-) diff --git a/karapace/compatibility/__init__.py b/karapace/compatibility/__init__.py index 70d4d2bfe..23ff437ad 100644 --- a/karapace/compatibility/__init__.py +++ b/karapace/compatibility/__init__.py @@ -17,7 +17,7 @@ from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility from karapace.protobuf.schema import ProtobufSchema -from karapace.schema_models import ValidatedTypedSchema +from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema from karapace.schema_reader import SchemaType from karapace.utils import assert_never @@ -76,7 +76,7 @@ def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) def check_compatibility( - old_schema: ValidatedTypedSchema, new_schema: ValidatedTypedSchema, compatibility_mode: CompatibilityModes + old_schema: ParsedTypedSchema, new_schema: ValidatedTypedSchema, compatibility_mode: CompatibilityModes ) -> SchemaCompatibilityResult: """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" if compatibility_mode is CompatibilityModes.NONE: diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 4ac732d02..57abf0792 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -14,12 +14,12 @@ ) from karapace.protobuf.schema import ProtobufSchema from karapace.utils import json_encode -from typing import Any, Dict, Optional, Union +from typing import Any, cast, Dict, Optional, Union import json -def parse_avro_schema_definition(s: str) -> AvroSchema: +def parse_avro_schema_definition(s: str, validate_enum_symbols: bool = True, validate_names: bool = True) -> AvroSchema: """Compatibility function with Avro which ignores trailing data in JSON strings. @@ -35,7 +35,7 @@ def parse_avro_schema_definition(s: str) -> AvroSchema: json_data = json.loads(s[: e.pos]) - return avro_parse(json.dumps(json_data)) + return avro_parse(json.dumps(json_data), validate_enum_symbols=validate_enum_symbols, validate_names=validate_names) def parse_jsonschema_definition(schema_definition: str) -> Draft7Validator: @@ -101,51 +101,115 @@ def __eq__(self, other: Any) -> bool: return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type -class ValidatedTypedSchema(TypedSchema): +def parse( + schema_type: SchemaType, + schema_str: str, + validate_avro_enum_symbols: bool, + validate_avro_names: bool, +) -> "ParsedTypedSchema": + if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: + raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") + + parsed_schema: Union[Draft7Validator, AvroSchema, ProtobufSchema] + if schema_type is SchemaType.AVRO: + try: + parsed_schema = parse_avro_schema_definition( + schema_str, + validate_enum_symbols=validate_avro_enum_symbols, + validate_names=validate_avro_names, + ) + except (SchemaParseException, json.JSONDecodeError, TypeError) as e: + raise InvalidSchema from e + + elif schema_type is SchemaType.JSONSCHEMA: + try: + parsed_schema = parse_jsonschema_definition(schema_str) + # TypeError - Raised when the user forgets to encode the schema as a string. + except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e: + raise InvalidSchema from e + + elif schema_type is SchemaType.PROTOBUF: + try: + parsed_schema = parse_protobuf_schema_definition(schema_str) + except ( + TypeError, + SchemaError, + AssertionError, + ProtobufParserRuntimeException, + IllegalStateException, + IllegalArgumentException, + ProtobufError, + ProtobufException, + ProtobufSchemaParseException, + ) as e: + raise InvalidSchema from e + else: + raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") + + return ParsedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema) + + +class ParsedTypedSchema(TypedSchema): + """Parsed but unvalidated schema resource. + + This class is used when reading and parsing existing schemas from data store. The intent of this class is to provide + representation of the schema which can be used to compare existing versions with new version in compatibility check + and when storing new version. + + This class shall not be used for new schemas received through the public API. + + The intent of this class is not to bypass validation of the syntax of the schema. + Assumption is that schema is syntactically correct. + + Validations that are bypassed: + * AVRO: enumeration symbols, namespace and name validity. + + Existing schemas may have been produced with backing schema SDKs that may have passed validation on schemas that + are considered by the current version of the SDK invalid. + """ + def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): super().__init__(schema_type=schema_type, schema_str=schema_str) self.schema = schema @staticmethod - def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": - if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: - raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - - parsed_schema: Union[Draft7Validator, AvroSchema, ProtobufSchema] - if schema_type is SchemaType.AVRO: - try: - parsed_schema = parse_avro_schema_definition(schema_str) - except (SchemaParseException, json.JSONDecodeError, TypeError) as e: - raise InvalidSchema from e - - elif schema_type is SchemaType.JSONSCHEMA: - try: - parsed_schema = parse_jsonschema_definition(schema_str) - # TypeError - Raised when the user forgets to encode the schema as a string. - except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e: - raise InvalidSchema from e - - elif schema_type is SchemaType.PROTOBUF: - try: - parsed_schema = parse_protobuf_schema_definition(schema_str) - except ( - TypeError, - SchemaError, - AssertionError, - ProtobufParserRuntimeException, - IllegalStateException, - IllegalArgumentException, - ProtobufError, - ProtobufException, - ProtobufSchemaParseException, - ) as e: - raise InvalidSchema from e - else: - raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") - - return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema) + def parse(schema_type: SchemaType, schema_str: str) -> "ParsedTypedSchema": + return parse( + schema_type=schema_type, + schema_str=schema_str, + validate_avro_enum_symbols=False, + validate_avro_names=False, + ) def __str__(self) -> str: if self.schema_type == SchemaType.PROTOBUF: return str(self.schema) return super().__str__() + + +class ValidatedTypedSchema(ParsedTypedSchema): + """Validated schema resource. + + This class is used when receiving a new schema from through the public API. The intent of this class is to + provide validation of the schema. + This class shall not be used when reading and parsing existing schemas. + + The intent of this class is not to validate the syntax of the schema. + Assumption is that schema is syntactically correct. + + Existing schemas may have been produced with backing schema SDKs that may have passed validation on schemas that + are considered by the current version of the SDK invalid. + """ + + def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]): + super().__init__(schema_type=schema_type, schema_str=schema_str, schema=schema) + + @staticmethod + def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": + parsed_schema = parse( + schema_type=schema_type, + schema_str=schema_str, + validate_avro_enum_symbols=True, + validate_avro_names=True, + ) + return cast(ValidatedTypedSchema, parsed_schema) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 935a57f0f..fecbbccfd 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -18,7 +18,7 @@ ) from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator -from karapace.schema_models import SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader from karapace.typing import ResolvedVersion, Subject, SubjectData, Version from karapace.utils import json_encode, KarapaceKafkaClient @@ -347,11 +347,11 @@ async def write_new_schema_local( for old_version in check_against: old_schema = subject_data["schemas"][old_version]["schema"] - validated_old_schema = ValidatedTypedSchema.parse( + parsed_old_schema = ParsedTypedSchema.parse( schema_type=old_schema.schema_type, schema_str=old_schema.schema_str ) result = check_compatibility( - old_schema=validated_old_schema, + old_schema=parsed_old_schema, new_schema=new_schema, compatibility_mode=compatibility_mode, ) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 22c07f70c..05d79dfe7 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -22,7 +22,7 @@ ) from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_registry import KarapaceSchemaRegistry, validate_version from karapace.typing import JsonData from typing import Any, Dict, Optional, Union @@ -348,7 +348,7 @@ async def compatibility_check( old_schema_type = self._validate_schema_type(content_type=content_type, data=old) try: - old_schema = ValidatedTypedSchema.parse(old_schema_type, old["schema"]) + old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"]) except InvalidSchema: self.r( body={ @@ -901,7 +901,9 @@ async def subjects_schema_post( schema_str = body["schema"] schema_type = self._validate_schema_type(content_type=content_type, data=body) try: - new_schema = ValidatedTypedSchema.parse(schema_type, schema_str) + # When checking if schema is already registered, allow unvalidated schema in as + # there might be stored schemas that are non-compliant from the past. + new_schema = ParsedTypedSchema.parse(schema_type, schema_str) except InvalidSchema: self.log.exception("No proper parser found") self.r( @@ -915,23 +917,39 @@ async def subjects_schema_post( # Match schemas based on version from latest to oldest for schema in sorted(subject_data["schemas"].values(), key=lambda item: item["version"], reverse=True): - validated_typed_schema = ValidatedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str) + try: + parsed_typed_schema = ParsedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str) + except InvalidSchema as e: + failed_schema_id = schema["id"] + self.log.exception("Existing schema failed to parse. Id: %s", failed_schema_id) + self.stats.unexpected_exception( + ex=e, where="Matching existing schemas to posted. Failed schema id: {failed_schema_id}" + ) + self.r( + body={ + "error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value, + "message": f"Error while looking up schema under subject {subject}", + }, + content_type=content_type, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + if schema_type is SchemaType.JSONSCHEMA: - schema_valid = validated_typed_schema.to_dict() == new_schema.to_dict() + schema_valid = parsed_typed_schema.to_dict() == new_schema.to_dict() else: - schema_valid = validated_typed_schema.schema == new_schema.schema - if validated_typed_schema.schema_type == new_schema.schema_type and schema_valid: + schema_valid = parsed_typed_schema.schema == new_schema.schema + if parsed_typed_schema.schema_type == new_schema.schema_type and schema_valid: ret = { "subject": subject, "version": schema["version"], "id": schema["id"], - "schema": validated_typed_schema.schema_str, + "schema": parsed_typed_schema.schema_str, } if schema_type is not SchemaType.AVRO: ret["schemaType"] = schema_type self.r(ret, content_type) else: - self.log.debug("Schema %r did not match %r", schema, validated_typed_schema) + self.log.debug("Schema %r did not match %r", schema, parsed_typed_schema) self.r( body={ "error_code": SchemaErrorCodes.SCHEMA_NOT_FOUND.value, diff --git a/karapace/serialization.py b/karapace/serialization.py index fa5c16fa1..44165600a 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -5,7 +5,7 @@ from karapace.client import Client from karapace.protobuf.exception import ProtobufTypeException from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter -from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.utils import json_encode from typing import Any, Dict, Optional, Tuple from urllib.parse import quote @@ -85,7 +85,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> i raise SchemaRetrievalError(result.json()) return result.json()["id"] - async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema]: + async def get_latest_schema(self, subject: str) -> Tuple[int, ParsedTypedSchema]: result = await self.client.get(f"subjects/{quote(subject)}/versions/latest") if not result.ok: raise SchemaRetrievalError(result.json()) @@ -94,11 +94,11 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + return json_result["id"], ParsedTypedSchema.parse(schema_type, json_result["schema"]) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e - async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: + async def get_schema_for_id(self, schema_id: int) -> ParsedTypedSchema: result = await self.client.get(f"schemas/ids/{schema_id}") if not result.ok: raise SchemaRetrievalError(result.json()["message"]) @@ -107,7 +107,7 @@ async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema: raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - return ValidatedTypedSchema.parse(schema_type, json_result["schema"]) + return ParsedTypedSchema.parse(schema_type, json_result["schema"]) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -146,7 +146,7 @@ async def close(self) -> None: self.registry_client = None def get_subject_name(self, topic_name: str, schema: str, subject_type: str, schema_type: SchemaType) -> str: - schema_typed = ValidatedTypedSchema.parse(schema_type, schema) + schema_typed = ParsedTypedSchema.parse(schema_type, schema) namespace = "dummy" if schema_type is SchemaType.AVRO: if isinstance(schema_typed.schema, avro.schema.NamedSchema): @@ -170,7 +170,7 @@ async def get_schema_for_subject(self, subject: str) -> TypedSchema: async def get_id_for_schema(self, schema: str, subject: str, schema_type: SchemaType) -> int: assert self.registry_client, "must not call this method after the object is closed." try: - schema_typed = ValidatedTypedSchema.parse(schema_type, schema) + schema_typed = ParsedTypedSchema.parse(schema_type, schema) except InvalidSchema as e: raise InvalidPayload(f"Schema string {schema} is invalid") from e schema_ser = schema_typed.__str__() diff --git a/requirements.txt b/requirements.txt index db2da1d6e..5de481366 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ accept-types==0.4.1 aiohttp==3.8.3 aiokafka==0.7.2 -avro==1.11.0 jsonschema==3.2.0 networkx==2.5 protobuf==3.19.5 @@ -22,6 +21,9 @@ watchfiles==0.15.0 # git+https://github.com/aiven/kafka-python.git@1b95333c9628152066fb8b1092de9da0433401fd +git+https://github.com/aiven/avro.git@f37a4ea69560d798cc718fccd7f75223527f5ebb#subdirectory=lang/py + + # Indirect dependencies aiosignal==1.2.0 # aiohttp async-timeout==4.0.2 # aiohttp diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 3779ea88d..7a8669d7c 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -20,10 +20,12 @@ ) from typing import List, Tuple +import asyncio import json import os import pytest import requests +import time baseurl = "http://localhost:8081" @@ -2903,3 +2905,184 @@ async def test_invalid_schema_should_provide_good_error_messages(registry_async_ json={"schema": schema_str}, ) assert res.json()["message"] == "Invalid AVRO schema. Error: error is a reserved type name." + + +async def test_schema_non_compliant_namespace_in_existing( + kafka_servers: KafkaServers, + registry_cluster: RegistryDescription, + registry_async_client: Client, +) -> None: + """Test non compliant namespace in existing schema + This test starts with a state where existing schemas have invalid names per Avro specification. + Schemas that have e.g. a dash character in the are accepted by Avro Java SDK although it does + not comply with the Avro specification. + Karapace shall read the data and disable validation when parsing existing schemas. + """ + + subject = create_subject_name_factory("test_schema_non_compliant_name_in_existing")() + + schema = { + "type": "record", + "namespace": "compliant-namespace-test", + "name": "test_schema", + "fields": [ + { + "type": "string", + "name": "test-field", + } + ], + } + + producer = KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers) + message_key = json_encode( + {"keytype": "SCHEMA", "subject": subject, "version": 1, "magic": 1}, sort_keys=False, compact=True, binary=True + ) + message_value = {"deleted": False, "id": 1, "subject": subject, "version": 1, "schema": json.dumps(schema)} + producer.send( + registry_cluster.schemas_topic, + key=message_key, + value=json_encode(message_value, sort_keys=False, compact=True, binary=True), + ).get() + + evolved_schema = { + "type": "record", + "namespace": "compliant_namespace_test", + "name": "test_schema", + "fields": [ + { + "type": "string", + "name": "test-field", + }, + {"type": "string", "name": "test-field-2", "default": "default-value"}, + ], + } + + # Wait until the schema is available + do_until_time = time.monotonic() + 5 + while do_until_time > time.monotonic(): + res = await registry_async_client.get(f"subjects/{subject}/versions/latest") + if res.status_code == 200: + break + await asyncio.sleep(0.5) + + # Compatibility check + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + + # Check non-compliant schema is registered + res = await registry_async_client.post( + f"subjects/{subject}", + json={"schema": json.dumps(schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 1 + + # Post new schema + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 2 + + +async def test_schema_non_compliant_name_in_existing( + kafka_servers: KafkaServers, + registry_cluster: RegistryDescription, + registry_async_client: Client, +) -> None: + """Test non compliant name in existing schema + This test starts with a state where existing schemas have invalid names per Avro specification. + Schemas that have e.g. a dash character in the are accepted by Avro Java SDK although it does + not comply with the Avro specification. + Karapace shall read the data and disable validation when parsing existing schemas. + """ + + subject = create_subject_name_factory("test_schema_non_compliant_name_in_existing")() + + schema = { + "type": "record", + "namespace": "compliant_name_test", + "name": "test-schema", + "fields": [ + { + "type": "string", + "name": "test-field", + } + ], + } + + producer = KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers) + message_key = json_encode( + {"keytype": "SCHEMA", "subject": subject, "version": 1, "magic": 1}, sort_keys=False, compact=True, binary=True + ) + message_value = {"deleted": False, "id": 1, "subject": subject, "version": 1, "schema": json.dumps(schema)} + producer.send( + registry_cluster.schemas_topic, + key=message_key, + value=json_encode(message_value, sort_keys=False, compact=True, binary=True), + ).get() + + evolved_schema = { + "type": "record", + "namespace": "compliant_name_test", + "name": "test_schema", + "fields": [ + { + "type": "string", + "name": "test-field", + }, + {"type": "string", "name": "test-field-2", "default": "default-value"}, + ], + } + + # Wait until the schema is available + do_until_time = time.monotonic() + 5 + while do_until_time > time.monotonic(): + res = await registry_async_client.get(f"subjects/{subject}/versions/latest") + if res.status_code == 200: + break + await asyncio.sleep(0.5) + + # Compatibility check, should not be compatible. + # Test that no parsing error is given as name in the existing schema is non-compliant. + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + + # Send compatibility configuration for subject that disabled backwards compatibility. + # The name cannot be changed if backward compatibility is required. + producer.send( + registry_cluster.schemas_topic, + key=json_encode({"keytype": "CONFIG", "subject": subject, "magic": 0}, sort_keys=False, compact=True, binary=True), + value=json_encode({"compatibilityLevel": "NONE"}, sort_keys=False, compact=True, binary=True), + ).get() + + # Check non-compliant schema is registered + res = await registry_async_client.post( + f"subjects/{subject}", + json={"schema": json.dumps(schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 1 + + # Post new schema + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 2 diff --git a/tests/unit/test_avro_compatibility.py b/tests/unit/test_avro_compatibility.py index c3bc5b8fd..4c89d3bc7 100644 --- a/tests/unit/test_avro_compatibility.py +++ b/tests/unit/test_avro_compatibility.py @@ -37,7 +37,8 @@ '"null"},{"type":"string","name":"f2","default":"foo"},{"type":"string","name":"f3","default":"bar"}]}' ) invalidEnumDefaultValue = parse_avro_schema_definition( - '{"type": "enum", "name": "test_default", "symbols": ["A"], "default": "B"}' + '{"type": "enum", "name": "test_default", "symbols": ["A"], "default": "B"}', + validate_enum_symbols=False, ) correctEnumDefaultValue = parse_avro_schema_definition( '{"type": "enum", "name": "test_default", "symbols": ["A"], "default": "A"}' From 2a094d44766f3418c672909405b44dba03ca9992 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Fri, 16 Dec 2022 15:09:00 +0200 Subject: [PATCH 2/4] feat: Avro disable enum and name validation, test improvement --- tests/integration/test_schema.py | 51 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 7a8669d7c..5f3ef6cd1 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2965,13 +2965,23 @@ async def test_schema_non_compliant_namespace_in_existing( break await asyncio.sleep(0.5) - # Compatibility check + # Compatibility check, is expected to be compatible, namespace is not important. res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest", json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 + # Post evolved new schema + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 2 + # Check non-compliant schema is registered res = await registry_async_client.post( f"subjects/{subject}", @@ -2982,9 +2992,9 @@ async def test_schema_non_compliant_namespace_in_existing( schema_id = res.json()["id"] assert schema_id == 1 - # Post new schema + # Check evolved schema is registered res = await registry_async_client.post( - f"subjects/{subject}/versions", + f"subjects/{subject}", json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 @@ -3051,21 +3061,40 @@ async def test_schema_non_compliant_name_in_existing( break await asyncio.sleep(0.5) - # Compatibility check, should not be compatible. + # Compatibility check, should not be compatible, name is important. # Test that no parsing error is given as name in the existing schema is non-compliant. res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest", json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 + assert not res.json().get("is_compatible") + + # Post evolved schema, should not be compatible and rejected. + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 409 + assert res.json() == { + "error_code": 409, + "message": "Incompatible schema, compatibility_mode=BACKWARD expected: compliant_name_test.test-schema", + } # Send compatibility configuration for subject that disabled backwards compatibility. # The name cannot be changed if backward compatibility is required. - producer.send( - registry_cluster.schemas_topic, - key=json_encode({"keytype": "CONFIG", "subject": subject, "magic": 0}, sort_keys=False, compact=True, binary=True), - value=json_encode({"compatibilityLevel": "NONE"}, sort_keys=False, compact=True, binary=True), - ).get() + res = await registry_async_client.put(f"/config/{subject}", json={"compatibility": "NONE"}) + assert res.status_code == 200 + + # Post evolved schema and expectation is gets registered as no compatiblity is enforced. + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(evolved_schema)}, + ) + assert res.status_code == 200 + assert "id" in res.json() + schema_id = res.json()["id"] + assert schema_id == 2 # Check non-compliant schema is registered res = await registry_async_client.post( @@ -3077,9 +3106,9 @@ async def test_schema_non_compliant_name_in_existing( schema_id = res.json()["id"] assert schema_id == 1 - # Post new schema + # Check evolved schema is registered res = await registry_async_client.post( - f"subjects/{subject}/versions", + f"subjects/{subject}", json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 From d5b9e371000818d802c6f14fbc798df04e7b7b94 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Fri, 16 Dec 2022 21:11:42 +0200 Subject: [PATCH 3/4] test: good error message tst change for Avro 1.12.0 compatibility Removed a test for good error message when Avro schema has reserved name in the name element of schema. See https://issues.apache.org/jira/browse/AVRO-3370 --- tests/integration/test_schema.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 5f3ef6cd1..56d2590ec 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2898,14 +2898,6 @@ async def test_invalid_schema_should_provide_good_error_messages(registry_async_ == "Invalid AVRO schema. Error: Enum symbols must be a sequence of strings, but it is " ) - # This is an upstream bug in the python AVRO library, until the bug is fixed we should at least have a nice error message - schema_str = json.dumps({"type": "enum", "name": "error", "symbols": ["A", "B"]}) - res = await registry_async_client.post( - f"subjects/{test_subject}/versions", - json={"schema": schema_str}, - ) - assert res.json()["message"] == "Invalid AVRO schema. Error: error is a reserved type name." - async def test_schema_non_compliant_namespace_in_existing( kafka_servers: KafkaServers, From da765ae77d2379b1d4ac4f73f9650eeeb4e0e56f Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Fri, 16 Dec 2022 21:12:32 +0200 Subject: [PATCH 4/4] refactor: serialization catches new avro.errors.InvalidAvroBinaryEncoding Serialization is changed to catch avro.errors.InvalidAvroBinaryEncoding when data record payload is invalid. See https://issues.apache.org/jira/browse/AVRO-3380 --- karapace/serialization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/serialization.py b/karapace/serialization.py index 44165600a..ea824f42a 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -218,7 +218,7 @@ async def deserialize(self, bytes_: bytes) -> dict: raise InvalidPayload("No schema with ID from payload") ret_val = read_value(self.config, schema, bio) return ret_val - except (UnicodeDecodeError, TypeError) as e: + except (UnicodeDecodeError, TypeError, avro.errors.InvalidAvroBinaryEncoding) as e: raise InvalidPayload("Data does not contain a valid message") from e except avro.errors.SchemaResolutionException as e: raise InvalidPayload("Data cannot be decoded with provided schema") from e