From fb59e411bd48909288da3585b2593719ae9318b7 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Mon, 18 Sep 2023 13:33:52 +0200 Subject: [PATCH 1/2] fix: prevent from accepting schema that are not uniquely identifiable from the current parser --- karapace/protobuf/dependency.py | 11 ++++++- karapace/protobuf/schema.py | 7 +++-- tests/unit/protobuf/test_protobuf_schema.py | 34 +++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/karapace/protobuf/dependency.py b/karapace/protobuf/dependency.py index c2694ba03..c0b04d2bf 100644 --- a/karapace/protobuf/dependency.py +++ b/karapace/protobuf/dependency.py @@ -11,13 +11,22 @@ from typing import List, Optional, Set +class FieldNotUniquelyIdentifiableException(Exception): + pass + + class ProtobufDependencyVerifier: def __init__(self) -> None: self.declared_types: List[str] = [] self.used_types: List[str] = [] self.import_path: List[str] = [] - def add_declared_type(self, full_name: str) -> None: + def add_declared_type(self, full_name: str, uniquely: bool = False) -> None: + if uniquely and full_name in self.declared_types: + raise FieldNotUniquelyIdentifiableException( + f"{full_name} is not currently identifiable from the parser, " + f"validating this message lead to break the schema evolution!" + ) self.declared_types.append(full_name) def add_used_type(self, parent: str, element_type: str) -> None: diff --git a/karapace/protobuf/schema.py b/karapace/protobuf/schema.py index 3829f9056..9eef89952 100644 --- a/karapace/protobuf/schema.py +++ b/karapace/protobuf/schema.py @@ -161,9 +161,10 @@ def _process_nested_type( ): verifier.add_declared_type(package_name + "." + parent_name + "." + element_type.name) verifier.add_declared_type(parent_name + "." + element_type.name) - ancestor_only = parent_name.find(".") - if ancestor_only != -1: - verifier.add_declared_type(parent_name[:ancestor_only] + "." + element_type.name) + anchestor_only = parent_name.find(".") + if anchestor_only != -1: + # adding the first father and the type name, this should be unique to identify which is which. + verifier.add_declared_type(parent_name[:anchestor_only] + "." + element_type.name, uniquely=True) if isinstance(element_type, MessageElement): for one_of in element_type.one_ofs: diff --git a/tests/unit/protobuf/test_protobuf_schema.py b/tests/unit/protobuf/test_protobuf_schema.py index 6021b45c3..1956411d4 100644 --- a/tests/unit/protobuf/test_protobuf_schema.py +++ b/tests/unit/protobuf/test_protobuf_schema.py @@ -2,7 +2,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from _pytest.python_api import raises from karapace.protobuf.compare_result import CompareResult +from karapace.protobuf.dependency import FieldNotUniquelyIdentifiableException from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.protobuf.location import Location from karapace.protobuf.schema import ProtobufSchema @@ -376,3 +378,35 @@ def test_protobuf_self_referencing_schema(): """ assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto4).schema, ProtobufSchema) + + +def test_illegal_redefine_objects_in_same_scope(): + proto1 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message AnotherMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message WowANestedMessage { + message DeeplyNestedMsg { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message AnotherLevelOfNesting { + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + } + } + """ + with raises(FieldNotUniquelyIdentifiableException) as e: + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto1).schema, ProtobufSchema) + + assert ( + e.value.args[0] == "AnotherMessage.BamFancyEnum is not currently identifiable from the parser, " + "validating this message lead to break the schema evolution!" + ) From 0aef6f209b2b1f14260b5e903656d4e260048975 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Mon, 25 Sep 2023 14:57:44 +0200 Subject: [PATCH 2/2] test: adding a couple of basic tests in our test suite --- karapace/protobuf/dependency.py | 11 +- karapace/protobuf/schema.py | 7 +- tests/integration/test_rest.py | 53 ++++++ .../test_rest_consumer_protobuf.py | 179 +++++++++++++++++- tests/unit/protobuf/test_protobuf_schema.py | 34 ---- 5 files changed, 232 insertions(+), 52 deletions(-) diff --git a/karapace/protobuf/dependency.py b/karapace/protobuf/dependency.py index c0b04d2bf..c2694ba03 100644 --- a/karapace/protobuf/dependency.py +++ b/karapace/protobuf/dependency.py @@ -11,22 +11,13 @@ from typing import List, Optional, Set -class FieldNotUniquelyIdentifiableException(Exception): - pass - - class ProtobufDependencyVerifier: def __init__(self) -> None: self.declared_types: List[str] = [] self.used_types: List[str] = [] self.import_path: List[str] = [] - def add_declared_type(self, full_name: str, uniquely: bool = False) -> None: - if uniquely and full_name in self.declared_types: - raise FieldNotUniquelyIdentifiableException( - f"{full_name} is not currently identifiable from the parser, " - f"validating this message lead to break the schema evolution!" - ) + def add_declared_type(self, full_name: str) -> None: self.declared_types.append(full_name) def add_used_type(self, parent: str, element_type: str) -> None: diff --git a/karapace/protobuf/schema.py b/karapace/protobuf/schema.py index 9eef89952..3829f9056 100644 --- a/karapace/protobuf/schema.py +++ b/karapace/protobuf/schema.py @@ -161,10 +161,9 @@ def _process_nested_type( ): verifier.add_declared_type(package_name + "." + parent_name + "." + element_type.name) verifier.add_declared_type(parent_name + "." + element_type.name) - anchestor_only = parent_name.find(".") - if anchestor_only != -1: - # adding the first father and the type name, this should be unique to identify which is which. - verifier.add_declared_type(parent_name[:anchestor_only] + "." + element_type.name, uniquely=True) + ancestor_only = parent_name.find(".") + if ancestor_only != -1: + verifier.add_declared_type(parent_name[:ancestor_only] + "." + element_type.name) if isinstance(element_type, MessageElement): for one_of in element_type.one_ofs: diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 8b73a7ffd..29b51825e 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -5,6 +5,8 @@ from __future__ import annotations from kafka.errors import UnknownTopicOrPartitionError +from karapace.client import Client +from karapace.kafka_rest_apis import KafkaRestAdminClient from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -186,6 +188,57 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie # assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}" +async def test_another_avro_publish( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +): + topic = new_topic(admin_client) + other_tn = new_topic(admin_client) + + await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + header = REST_HEADERS["avro"] + + tested_avro_schema = { + "type": "record", + "name": "example", + "namespace": "example", + "doc": "example", + "fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}], + } + + schema_str = json.dumps(tested_avro_schema) + + # check succeeds with 1 record and brand new schema + res = await registry_async_client.post( + f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + key_schema_id = res.json()["id"] + + res = await registry_async_client.post( + f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + value_schema_id = res.json()["id"] + + key_body = {"test": 5} + + value_body = {"test": 5} + + body = { + "key_schema_id": key_schema_id, + "value_schema_id": value_schema_id, + "records": [{"key": key_body, "value": value_body}], + } + + url = f"/topics/{topic}" + res = await rest_async_client.post(url, json=body, headers=header) + assert res.ok + + async def test_admin_client(admin_client, producer): topic_names = [new_topic(admin_client) for i in range(10, 13)] topic_info = admin_client.cluster_metadata() diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index dfa7278b5..c6b1c96da 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -2,12 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ - from karapace.client import Client from karapace.kafka_rest_apis import KafkaRestAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from tests.integration.test_rest import NEW_TOPIC_TIMEOUT from tests.utils import ( + create_subject_name_factory, new_consumer, new_random_name, new_topic, @@ -17,6 +17,7 @@ schema_data_second, wait_for_topics, ) +from typing import Generator import pytest @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" - res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) assert res.ok resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references( assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available" assert msg["topic"] == topic_name assert msg["value"] == produced_message + + +@pytest.mark.parametrize("google_library_included", [True, False]) +async def test_produce_and_retrieve_protobuf( + registry_async_client: Client, + rest_async_client: Client, + admin_client: KafkaRestAdminClient, + google_library_included: bool, +) -> None: + topic_name = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")() + subject_topic = f"{topic_name}-value" + + base_schema_subject = f"{subject}_base_schema_subject" + google_postal_address_schema_subject = f"{subject}_google_address_schema_subject" + + CUSTOMER_PLACE_PROTO = """ + syntax = "proto3"; + package a1; + message Place { + string city = 1; + int32 zone = 2; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO} + res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body) + assert res.status_code == 200 + + if not google_library_included: + GOOGLE_POSTAL_ADDRESS_PROTO = """ + syntax = "proto3"; + + package google.type; + + option cc_enable_arenas = true; + option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress"; + option java_multiple_files = true; + option java_outer_classname = "PostalAddressProto"; + option java_package = "com.google.type"; + option objc_class_prefix = "GTP"; + message PostalAddress { + int32 revision = 1; + string region_code = 2; + string language_code = 3; + string postal_code = 4; + string sorting_code = 5; + string administrative_area = 6; + string locality = 7; + string sublocality = 8; + repeated string address_lines = 9; + repeated string recipients = 10; + string organization = 11; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO} + res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body) + assert res.status_code == 200 + + postal_address_import = ( + 'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";' + ) + + CUSTOMER_PROTO = f""" + syntax = "proto3"; + package a1; + import "Place.proto"; + + {postal_address_import} + + // @producer: another comment + message Customer {{ + string name = 1; + int32 code = 2; + Place place = 3; + google.type.PostalAddress address = 4; + }} + """ + + def references() -> Generator[str, None, None]: + yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1} + + if not google_library_included: + yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1} + + body = { + "schemaType": "PROTOBUF", + "schema": CUSTOMER_PROTO, + "references": list(references()), + } + res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body) + + assert res.status_code == 200 + topic_schema_id = res.json()["id"] + + message_to_produce = [ + { + "name": "John Doe", + "code": 123456, + "place": {"city": "New York", "zone": 5}, + "address": { + "revision": 1, + "region_code": "US", + "postal_code": "10001", + "address_lines": ["123 Main St", "Apt 4"], + }, + }, + { + "name": "Sophie Smith", + "code": 987654, + "place": {"city": "London", "zone": 3}, + "address": { + "revision": 2, + "region_code": "UK", + "postal_code": "SW1A 1AA", + "address_lines": ["10 Downing Street"], + }, + }, + { + "name": "Pierre Dupont", + "code": 246813, + "place": {"city": "Paris", "zone": 1}, + "address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]}, + }, + ] + + res = await rest_async_client.post( + f"/topics/{topic_name}", + json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 200 + + group = new_random_name("protobuf_recursive_reference_message") + instance_id = await new_consumer(rest_async_client, group) + + subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription" + + consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" + + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) + assert res.ok + + resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) + data = resp.json() + + assert isinstance(data, list) + assert len(data) == 3 + + for i in range(0, 3): + msg = data[i] + expected_message = message_to_produce[i] + + assert "key" in msg + assert "offset" in msg + assert "topic" in msg + assert "value" in msg + assert "timestamp" in msg + + assert msg["key"] is None, "no key defined in production" + assert msg["topic"] == topic_name + + for key in expected_message.keys(): + if key == "address": + for address_key in expected_message["address"].keys(): + assert expected_message["address"][address_key] == msg["value"]["address"][address_key] + else: + assert msg["value"][key] == expected_message[key] diff --git a/tests/unit/protobuf/test_protobuf_schema.py b/tests/unit/protobuf/test_protobuf_schema.py index 1956411d4..6021b45c3 100644 --- a/tests/unit/protobuf/test_protobuf_schema.py +++ b/tests/unit/protobuf/test_protobuf_schema.py @@ -2,9 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from _pytest.python_api import raises from karapace.protobuf.compare_result import CompareResult -from karapace.protobuf.dependency import FieldNotUniquelyIdentifiableException from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.protobuf.location import Location from karapace.protobuf.schema import ProtobufSchema @@ -378,35 +376,3 @@ def test_protobuf_self_referencing_schema(): """ assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto4).schema, ProtobufSchema) - - -def test_illegal_redefine_objects_in_same_scope(): - proto1 = """\ - syntax = "proto3"; - - package fancy.company.in.party.v1; - message AnotherMessage { - enum BamFancyEnum { - // Hei! This is a comment! - MY_AWESOME_FIELD = 0; - } - message WowANestedMessage { - message DeeplyNestedMsg { - enum BamFancyEnum { - // Hei! This is a comment! - MY_AWESOME_FIELD = 0; - } - message AnotherLevelOfNesting { - BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; - } - } - } - } - """ - with raises(FieldNotUniquelyIdentifiableException) as e: - assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto1).schema, ProtobufSchema) - - assert ( - e.value.args[0] == "AnotherMessage.BamFancyEnum is not currently identifiable from the parser, " - "validating this message lead to break the schema evolution!" - )