Skip to content

Commit

Permalink
Merge pull request #516 from aiven/jjaakola-aiven-disable-avro-name-v…
Browse files Browse the repository at this point in the history
…alidation-when-parsing-existing-schemas

Disable Avro name and enum validation when parsing existing schemas
  • Loading branch information
tvainika authored Dec 29, 2022
2 parents c540f3c + da765ae commit 6a0d7d1
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 70 deletions.
4 changes: 2 additions & 2 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ValidatedTypedSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_reader import SchemaType
from karapace.utils import assert_never

Expand Down Expand Up @@ -76,7 +76,7 @@ def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema)


def check_compatibility(
old_schema: ValidatedTypedSchema, new_schema: ValidatedTypedSchema, compatibility_mode: CompatibilityModes
old_schema: ParsedTypedSchema, new_schema: ValidatedTypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""
if compatibility_mode is CompatibilityModes.NONE:
Expand Down
146 changes: 105 additions & 41 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
)
from karapace.protobuf.schema import ProtobufSchema
from karapace.utils import json_encode
from typing import Any, Dict, Optional, Union
from typing import Any, cast, Dict, Optional, Union

import json


def parse_avro_schema_definition(s: str) -> AvroSchema:
def parse_avro_schema_definition(s: str, validate_enum_symbols: bool = True, validate_names: bool = True) -> AvroSchema:
"""Compatibility function with Avro which ignores trailing data in JSON
strings.
Expand All @@ -35,7 +35,7 @@ def parse_avro_schema_definition(s: str) -> AvroSchema:

json_data = json.loads(s[: e.pos])

return avro_parse(json.dumps(json_data))
return avro_parse(json.dumps(json_data), validate_enum_symbols=validate_enum_symbols, validate_names=validate_names)


def parse_jsonschema_definition(schema_definition: str) -> Draft7Validator:
Expand Down Expand Up @@ -101,51 +101,115 @@ def __eq__(self, other: Any) -> bool:
return isinstance(other, TypedSchema) and self.__str__() == other.__str__() and self.schema_type is other.schema_type


class ValidatedTypedSchema(TypedSchema):
def parse(
schema_type: SchemaType,
schema_str: str,
validate_avro_enum_symbols: bool,
validate_avro_names: bool,
) -> "ParsedTypedSchema":
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]
if schema_type is SchemaType.AVRO:
try:
parsed_schema = parse_avro_schema_definition(
schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
except (SchemaParseException, json.JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
# TypeError - Raised when the user forgets to encode the schema as a string.
except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.PROTOBUF:
try:
parsed_schema = parse_protobuf_schema_definition(schema_str)
except (
TypeError,
SchemaError,
AssertionError,
ProtobufParserRuntimeException,
IllegalStateException,
IllegalArgumentException,
ProtobufError,
ProtobufException,
ProtobufSchemaParseException,
) as e:
raise InvalidSchema from e
else:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

return ParsedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema)


class ParsedTypedSchema(TypedSchema):
"""Parsed but unvalidated schema resource.
This class is used when reading and parsing existing schemas from data store. The intent of this class is to provide
representation of the schema which can be used to compare existing versions with new version in compatibility check
and when storing new version.
This class shall not be used for new schemas received through the public API.
The intent of this class is not to bypass validation of the syntax of the schema.
Assumption is that schema is syntactically correct.
Validations that are bypassed:
* AVRO: enumeration symbols, namespace and name validity.
Existing schemas may have been produced with backing schema SDKs that may have passed validation on schemas that
are considered by the current version of the SDK invalid.
"""

def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]):
super().__init__(schema_type=schema_type, schema_str=schema_str)
self.schema = schema

@staticmethod
def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema":
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]
if schema_type is SchemaType.AVRO:
try:
parsed_schema = parse_avro_schema_definition(schema_str)
except (SchemaParseException, json.JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
# TypeError - Raised when the user forgets to encode the schema as a string.
except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.PROTOBUF:
try:
parsed_schema = parse_protobuf_schema_definition(schema_str)
except (
TypeError,
SchemaError,
AssertionError,
ProtobufParserRuntimeException,
IllegalStateException,
IllegalArgumentException,
ProtobufError,
ProtobufException,
ProtobufSchemaParseException,
) as e:
raise InvalidSchema from e
else:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

return ValidatedTypedSchema(schema_type=schema_type, schema_str=schema_str, schema=parsed_schema)
def parse(schema_type: SchemaType, schema_str: str) -> "ParsedTypedSchema":
return parse(
schema_type=schema_type,
schema_str=schema_str,
validate_avro_enum_symbols=False,
validate_avro_names=False,
)

def __str__(self) -> str:
if self.schema_type == SchemaType.PROTOBUF:
return str(self.schema)
return super().__str__()


class ValidatedTypedSchema(ParsedTypedSchema):
"""Validated schema resource.
This class is used when receiving a new schema from through the public API. The intent of this class is to
provide validation of the schema.
This class shall not be used when reading and parsing existing schemas.
The intent of this class is not to validate the syntax of the schema.
Assumption is that schema is syntactically correct.
Existing schemas may have been produced with backing schema SDKs that may have passed validation on schemas that
are considered by the current version of the SDK invalid.
"""

def __init__(self, schema_type: SchemaType, schema_str: str, schema: Union[Draft7Validator, AvroSchema, ProtobufSchema]):
super().__init__(schema_type=schema_type, schema_str=schema_str, schema=schema)

@staticmethod
def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema":
parsed_schema = parse(
schema_type=schema_type,
schema_str=schema_str,
validate_avro_enum_symbols=True,
validate_avro_names=True,
)
return cast(ValidatedTypedSchema, parsed_schema)
6 changes: 3 additions & 3 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from karapace.key_format import KeyFormatter
from karapace.master_coordinator import MasterCoordinator
from karapace.schema_models import SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_models import ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.typing import ResolvedVersion, Subject, SubjectData, Version
from karapace.utils import json_encode, KarapaceKafkaClient
Expand Down Expand Up @@ -347,11 +347,11 @@ async def write_new_schema_local(

for old_version in check_against:
old_schema = subject_data["schemas"][old_version]["schema"]
validated_old_schema = ValidatedTypedSchema.parse(
parsed_old_schema = ParsedTypedSchema.parse(
schema_type=old_schema.schema_type, schema_str=old_schema.schema_str
)
result = check_compatibility(
old_schema=validated_old_schema,
old_schema=parsed_old_schema,
new_schema=new_schema,
compatibility_mode=compatibility_mode,
)
Expand Down
36 changes: 27 additions & 9 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from karapace.karapace import KarapaceBase
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_models import ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
from karapace.typing import JsonData
from typing import Any, Dict, Optional, Union
Expand Down Expand Up @@ -348,7 +348,7 @@ async def compatibility_check(

old_schema_type = self._validate_schema_type(content_type=content_type, data=old)
try:
old_schema = ValidatedTypedSchema.parse(old_schema_type, old["schema"])
old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"])
except InvalidSchema:
self.r(
body={
Expand Down Expand Up @@ -901,7 +901,9 @@ async def subjects_schema_post(
schema_str = body["schema"]
schema_type = self._validate_schema_type(content_type=content_type, data=body)
try:
new_schema = ValidatedTypedSchema.parse(schema_type, schema_str)
# When checking if schema is already registered, allow unvalidated schema in as
# there might be stored schemas that are non-compliant from the past.
new_schema = ParsedTypedSchema.parse(schema_type, schema_str)
except InvalidSchema:
self.log.exception("No proper parser found")
self.r(
Expand All @@ -915,23 +917,39 @@ async def subjects_schema_post(

# Match schemas based on version from latest to oldest
for schema in sorted(subject_data["schemas"].values(), key=lambda item: item["version"], reverse=True):
validated_typed_schema = ValidatedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str)
try:
parsed_typed_schema = ParsedTypedSchema.parse(schema["schema"].schema_type, schema["schema"].schema_str)
except InvalidSchema as e:
failed_schema_id = schema["id"]
self.log.exception("Existing schema failed to parse. Id: %s", failed_schema_id)
self.stats.unexpected_exception(
ex=e, where="Matching existing schemas to posted. Failed schema id: {failed_schema_id}"
)
self.r(
body={
"error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value,
"message": f"Error while looking up schema under subject {subject}",
},
content_type=content_type,
status=HTTPStatus.INTERNAL_SERVER_ERROR,
)

if schema_type is SchemaType.JSONSCHEMA:
schema_valid = validated_typed_schema.to_dict() == new_schema.to_dict()
schema_valid = parsed_typed_schema.to_dict() == new_schema.to_dict()
else:
schema_valid = validated_typed_schema.schema == new_schema.schema
if validated_typed_schema.schema_type == new_schema.schema_type and schema_valid:
schema_valid = parsed_typed_schema.schema == new_schema.schema
if parsed_typed_schema.schema_type == new_schema.schema_type and schema_valid:
ret = {
"subject": subject,
"version": schema["version"],
"id": schema["id"],
"schema": validated_typed_schema.schema_str,
"schema": parsed_typed_schema.schema_str,
}
if schema_type is not SchemaType.AVRO:
ret["schemaType"] = schema_type
self.r(ret, content_type)
else:
self.log.debug("Schema %r did not match %r", schema, validated_typed_schema)
self.log.debug("Schema %r did not match %r", schema, parsed_typed_schema)
self.r(
body={
"error_code": SchemaErrorCodes.SCHEMA_NOT_FOUND.value,
Expand Down
16 changes: 8 additions & 8 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from karapace.client import Client
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.utils import json_encode
from typing import Any, Dict, Optional, Tuple
from urllib.parse import quote
Expand Down Expand Up @@ -85,7 +85,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema) -> i
raise SchemaRetrievalError(result.json())
return result.json()["id"]

async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema]:
async def get_latest_schema(self, subject: str) -> Tuple[int, ParsedTypedSchema]:
result = await self.client.get(f"subjects/{quote(subject)}/versions/latest")
if not result.ok:
raise SchemaRetrievalError(result.json())
Expand All @@ -94,11 +94,11 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche
raise SchemaRetrievalError(f"Invalid result format: {json_result}")
try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))
return json_result["id"], ValidatedTypedSchema.parse(schema_type, json_result["schema"])
return json_result["id"], ParsedTypedSchema.parse(schema_type, json_result["schema"])
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema:
async def get_schema_for_id(self, schema_id: int) -> ParsedTypedSchema:
result = await self.client.get(f"schemas/ids/{schema_id}")
if not result.ok:
raise SchemaRetrievalError(result.json()["message"])
Expand All @@ -107,7 +107,7 @@ async def get_schema_for_id(self, schema_id: int) -> ValidatedTypedSchema:
raise SchemaRetrievalError(f"Invalid result format: {json_result}")
try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))
return ValidatedTypedSchema.parse(schema_type, json_result["schema"])
return ParsedTypedSchema.parse(schema_type, json_result["schema"])
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

Expand Down Expand Up @@ -146,7 +146,7 @@ async def close(self) -> None:
self.registry_client = None

def get_subject_name(self, topic_name: str, schema: str, subject_type: str, schema_type: SchemaType) -> str:
schema_typed = ValidatedTypedSchema.parse(schema_type, schema)
schema_typed = ParsedTypedSchema.parse(schema_type, schema)
namespace = "dummy"
if schema_type is SchemaType.AVRO:
if isinstance(schema_typed.schema, avro.schema.NamedSchema):
Expand All @@ -170,7 +170,7 @@ async def get_schema_for_subject(self, subject: str) -> TypedSchema:
async def get_id_for_schema(self, schema: str, subject: str, schema_type: SchemaType) -> int:
assert self.registry_client, "must not call this method after the object is closed."
try:
schema_typed = ValidatedTypedSchema.parse(schema_type, schema)
schema_typed = ParsedTypedSchema.parse(schema_type, schema)
except InvalidSchema as e:
raise InvalidPayload(f"Schema string {schema} is invalid") from e
schema_ser = schema_typed.__str__()
Expand Down Expand Up @@ -218,7 +218,7 @@ async def deserialize(self, bytes_: bytes) -> dict:
raise InvalidPayload("No schema with ID from payload")
ret_val = read_value(self.config, schema, bio)
return ret_val
except (UnicodeDecodeError, TypeError) as e:
except (UnicodeDecodeError, TypeError, avro.errors.InvalidAvroBinaryEncoding) as e:
raise InvalidPayload("Data does not contain a valid message") from e
except avro.errors.SchemaResolutionException as e:
raise InvalidPayload("Data cannot be decoded with provided schema") from e
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
accept-types==0.4.1
aiohttp==3.8.3
aiokafka==0.7.2
avro==1.11.0
jsonschema==3.2.0
networkx==2.5
protobuf==3.19.5
Expand All @@ -22,6 +21,9 @@ watchfiles==0.15.0
#
git+https://github.com/aiven/kafka-python.git@1b95333c9628152066fb8b1092de9da0433401fd

git+https://github.com/aiven/avro.git@f37a4ea69560d798cc718fccd7f75223527f5ebb#subdirectory=lang/py


# Indirect dependencies
aiosignal==1.2.0 # aiohttp
async-timeout==4.0.2 # aiohttp
Expand Down
Loading

0 comments on commit 6a0d7d1

Please sign in to comment.