Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests, avro: add test on invalid reference schema #965

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def check_compatibility(
if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)

if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
Expand Down
118 changes: 116 additions & 2 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema, Versioner
from karapace.schema_references import Reference, Referents
Expand All @@ -24,7 +25,120 @@ class SubjectData:
compatibility: str | None = None


class InMemoryDatabase:
class KarapaceDatabase(ABC):
@abstractmethod
def get_schema_id(self, new_schema: TypedSchema) -> SchemaId:
pass

@abstractmethod
def get_schema_id_if_exists(
self,
*,
subject: Subject,
schema: TypedSchema,
include_deleted: bool,
) -> SchemaId | None:
pass

@abstractmethod
def get_next_version(self, *, subject: Subject) -> Version:
pass

@abstractmethod
def insert_schema_version(
self,
*,
subject: Subject,
schema_id: SchemaId,
version: Version,
deleted: bool,
schema: TypedSchema,
references: Sequence[Reference] | None,
) -> None:
pass

@abstractmethod
def insert_subject(self, *, subject: Subject) -> None:
pass

@abstractmethod
def get_subject_compatibility(self, *, subject: Subject) -> str | None:
pass

@abstractmethod
def delete_subject_compatibility(self, *, subject: Subject) -> None:
pass

@abstractmethod
def set_subject_compatibility(self, *, subject: Subject, compatibility: str) -> None:
pass

@abstractmethod
def find_schema(self, *, schema_id: SchemaId) -> TypedSchema | None:
pass

@abstractmethod
def find_schemas(self, *, include_deleted: bool, latest_only: bool) -> dict[Subject, list[SchemaVersion]]:
pass

@abstractmethod
def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]:
pass

@abstractmethod
def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]:
pass

@abstractmethod
def find_subject(self, *, subject: Subject) -> Subject | None:
pass

@abstractmethod
def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
pass

@abstractmethod
def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]:
pass

@abstractmethod
def delete_subject(self, *, subject: Subject, version: Version) -> None:
pass

@abstractmethod
def delete_subject_hard(self, *, subject: Subject) -> None:
pass

@abstractmethod
def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
pass

@abstractmethod
def num_schemas(self) -> int:
pass

@abstractmethod
def num_subjects(self) -> int:
pass

@abstractmethod
def num_schema_versions(self) -> tuple[int, int]:
pass

@abstractmethod
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
pass

@abstractmethod
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
pass

@abstractmethod
def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
pass


class InMemoryDatabase(KarapaceDatabase):
def __init__(self) -> None:
self.global_schema_id = SchemaId(0)
self.id_lock_thread = Lock()
Expand Down Expand Up @@ -76,7 +190,7 @@ def get_schema_id_if_exists(
*,
subject: Subject,
schema: TypedSchema,
include_deleted: bool, # pylint: disable=unused-argument
include_deleted: bool,
) -> SchemaId | None:
subject_fingerprints = self._hash_to_schema_id_on_subject.get(subject)
if subject_fingerprints:
Expand Down
70 changes: 66 additions & 4 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

import avro.schema
import hashlib
import logging
import re

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,6 +140,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -180,6 +183,44 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


class AvroMerge:
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
self.dependencies = dependencies
self.unique_id = 0

def union_safe_schema_str(self, schema_str: str) -> str:
# in case we meet union - we use it as is
regex = re.compile(r"^\s*\[")
base_schema = (
f'{{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",'
f'"type": "record", "fields": [{{"name": "name", "type":'
)
if regex.match(schema_str):
return f"{base_schema} {schema_str}}}]}}"
return f"{base_schema} [{schema_str}]}}]}}"

def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str:
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
merged_schemas = []

while stack:
current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
else:
self.unique_id += 1
merged_schemas.append(self.union_safe_schema_str(current_schema_str))

return ",\n".join(merged_schemas)

def wrap(self) -> str:
return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]"


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -188,21 +229,41 @@ def parse(
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
normalize: bool = False,
dependencies_compat: bool = False,
) -> ParsedTypedSchema:
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies or dependencies_compat:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
else:
wrapped_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
schema_str,
wrapped_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies or dependencies_compat:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
schema=parsed_schema_result,
references=references,
dependencies=dependencies,
schema_wrapped=parsed_schema,
)
except (SchemaParseException, JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
Expand Down Expand Up @@ -264,9 +325,10 @@ def __init__(
schema: Draft7Validator | AvroSchema | ProtobufSchema,
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
) -> None:
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema

self.schema_wrapped = schema_wrapped
super().__init__(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
29 changes: 25 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.in_memory_database import KarapaceDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(
config: Config,
offset_watcher: OffsetWatcher,
key_formatter: KeyFormatter,
database: InMemoryDatabase,
database: KarapaceDatabase,
master_coordinator: MasterCoordinator | None = None,
) -> None:
Thread.__init__(self, name="schema-reader")
Expand Down Expand Up @@ -349,6 +349,9 @@ def handle_messages(self) -> None:
if are_we_master is True:
watch_offsets = True

self.consume_messages(msgs, watch_offsets)

def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:
schema_records_processed_keymode_canonical = 0
schema_records_processed_keymode_deprecated_karapace = 0
for msg in msgs:
Expand All @@ -360,8 +363,14 @@ def handle_messages(self) -> None:

assert message_key is not None
key = json_decode(message_key)
except AssertionError as exc:
LOG.warning("Empty msg.key() at offset %s", msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
except JSONDecodeError as exc:
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
non_bytes_key = msg.key().decode() # type: ignore[union-attr]
LOG.warning("Invalid JSON in msg.key(): %s at offset %s", non_bytes_key, msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
Expand Down Expand Up @@ -540,7 +549,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as exc:
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
3 changes: 0 additions & 3 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from tests.integration.conftest import create_kafka_server
from tests.integration.utils.config import KafkaDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive

import pytest

Expand All @@ -26,7 +25,6 @@
@pytest.fixture(scope="function", name="kafka_server_session_timeout")
def fixture_kafka_server(
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
tmp_path_factory: pytest.TempPathFactory,
):
# use custom data and log dir to avoid conflict with other kafka servers
Expand All @@ -40,7 +38,6 @@ def fixture_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
kafka_config_extra,
)

Expand Down
Loading
Loading