diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 8b73a7ffd..29b51825e 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -5,6 +5,8 @@ from __future__ import annotations from kafka.errors import UnknownTopicOrPartitionError +from karapace.client import Client +from karapace.kafka_rest_apis import KafkaRestAdminClient from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -186,6 +188,57 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie # assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}" +async def test_another_avro_publish( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +): + topic = new_topic(admin_client) + other_tn = new_topic(admin_client) + + await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + header = REST_HEADERS["avro"] + + tested_avro_schema = { + "type": "record", + "name": "example", + "namespace": "example", + "doc": "example", + "fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}], + } + + schema_str = json.dumps(tested_avro_schema) + + # check succeeds with 1 record and brand new schema + res = await registry_async_client.post( + f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + key_schema_id = res.json()["id"] + + res = await registry_async_client.post( + f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + value_schema_id = res.json()["id"] + + key_body = {"test": 5} + + value_body = {"test": 5} + + body = { + "key_schema_id": key_schema_id, + "value_schema_id": value_schema_id, + "records": [{"key": key_body, "value": value_body}], + } + + url = f"/topics/{topic}" + res = await rest_async_client.post(url, json=body, headers=header) + assert res.ok + + async def test_admin_client(admin_client, producer): topic_names = [new_topic(admin_client) for i in range(10, 13)] topic_info = admin_client.cluster_metadata() diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index dfa7278b5..c6b1c96da 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -2,12 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ - from karapace.client import Client from karapace.kafka_rest_apis import KafkaRestAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from tests.integration.test_rest import NEW_TOPIC_TIMEOUT from tests.utils import ( + create_subject_name_factory, new_consumer, new_random_name, new_topic, @@ -17,6 +17,7 @@ schema_data_second, wait_for_topics, ) +from typing import Generator import pytest @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" - res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) assert res.ok resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references( assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available" assert msg["topic"] == topic_name assert msg["value"] == produced_message + + +@pytest.mark.parametrize("google_library_included", [True, False]) +async def test_produce_and_retrieve_protobuf( + registry_async_client: Client, + rest_async_client: Client, + admin_client: KafkaRestAdminClient, + google_library_included: bool, +) -> None: + topic_name = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")() + subject_topic = f"{topic_name}-value" + + base_schema_subject = f"{subject}_base_schema_subject" + google_postal_address_schema_subject = f"{subject}_google_address_schema_subject" + + CUSTOMER_PLACE_PROTO = """ + syntax = "proto3"; + package a1; + message Place { + string city = 1; + int32 zone = 2; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO} + res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body) + assert res.status_code == 200 + + if not google_library_included: + GOOGLE_POSTAL_ADDRESS_PROTO = """ + syntax = "proto3"; + + package google.type; + + option cc_enable_arenas = true; + option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress"; + option java_multiple_files = true; + option java_outer_classname = "PostalAddressProto"; + option java_package = "com.google.type"; + option objc_class_prefix = "GTP"; + message PostalAddress { + int32 revision = 1; + string region_code = 2; + string language_code = 3; + string postal_code = 4; + string sorting_code = 5; + string administrative_area = 6; + string locality = 7; + string sublocality = 8; + repeated string address_lines = 9; + repeated string recipients = 10; + string organization = 11; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO} + res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body) + assert res.status_code == 200 + + postal_address_import = ( + 'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";' + ) + + CUSTOMER_PROTO = f""" + syntax = "proto3"; + package a1; + import "Place.proto"; + + {postal_address_import} + + // @producer: another comment + message Customer {{ + string name = 1; + int32 code = 2; + Place place = 3; + google.type.PostalAddress address = 4; + }} + """ + + def references() -> Generator[str, None, None]: + yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1} + + if not google_library_included: + yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1} + + body = { + "schemaType": "PROTOBUF", + "schema": CUSTOMER_PROTO, + "references": list(references()), + } + res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body) + + assert res.status_code == 200 + topic_schema_id = res.json()["id"] + + message_to_produce = [ + { + "name": "John Doe", + "code": 123456, + "place": {"city": "New York", "zone": 5}, + "address": { + "revision": 1, + "region_code": "US", + "postal_code": "10001", + "address_lines": ["123 Main St", "Apt 4"], + }, + }, + { + "name": "Sophie Smith", + "code": 987654, + "place": {"city": "London", "zone": 3}, + "address": { + "revision": 2, + "region_code": "UK", + "postal_code": "SW1A 1AA", + "address_lines": ["10 Downing Street"], + }, + }, + { + "name": "Pierre Dupont", + "code": 246813, + "place": {"city": "Paris", "zone": 1}, + "address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]}, + }, + ] + + res = await rest_async_client.post( + f"/topics/{topic_name}", + json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 200 + + group = new_random_name("protobuf_recursive_reference_message") + instance_id = await new_consumer(rest_async_client, group) + + subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription" + + consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" + + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) + assert res.ok + + resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) + data = resp.json() + + assert isinstance(data, list) + assert len(data) == 3 + + for i in range(0, 3): + msg = data[i] + expected_message = message_to_produce[i] + + assert "key" in msg + assert "offset" in msg + assert "topic" in msg + assert "value" in msg + assert "timestamp" in msg + + assert msg["key"] is None, "no key defined in production" + assert msg["topic"] == topic_name + + for key in expected_message.keys(): + if key == "address": + for address_key in expected_message["address"].keys(): + assert expected_message["address"][address_key] == msg["value"]["address"][address_key] + else: + assert msg["value"][key] == expected_message[key]