Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add another couple of basic tests #722

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from __future__ import annotations

from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand Down Expand Up @@ -186,6 +188,57 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_another_avro_publish(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
):
topic = new_topic(admin_client)
other_tn = new_topic(admin_client)

await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
header = REST_HEADERS["avro"]

tested_avro_schema = {
"type": "record",
"name": "example",
"namespace": "example",
"doc": "example",
"fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}],
}

schema_str = json.dumps(tested_avro_schema)

# check succeeds with 1 record and brand new schema
res = await registry_async_client.post(
f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

key_schema_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

value_schema_id = res.json()["id"]

key_body = {"test": 5}

value_body = {"test": 5}

body = {
"key_schema_id": key_schema_id,
"value_schema_id": value_schema_id,
"records": [{"key": key_body, "value": value_body}],
}

url = f"/topics/{topic}"
res = await rest_async_client.post(url, json=body, headers=header)
assert res.ok


async def test_admin_client(admin_client, producer):
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
Expand Down
179 changes: 175 additions & 4 deletions tests/integration/test_rest_consumer_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from karapace.protobuf.kotlin_wrapper import trim_margin
from tests.integration.test_rest import NEW_TOPIC_TIMEOUT
from tests.utils import (
create_subject_name_factory,
new_consumer,
new_random_name,
new_topic,
Expand All @@ -17,6 +17,7 @@
schema_data_second,
wait_for_topics,
)
from typing import Generator

import pytest

Expand Down Expand Up @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand Down Expand Up @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand All @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"])
res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
Expand All @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available"
assert msg["topic"] == topic_name
assert msg["value"] == produced_message


@pytest.mark.parametrize("google_library_included", [True, False])
async def test_produce_and_retrieve_protobuf(
registry_async_client: Client,
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
google_library_included: bool,
) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")()
subject_topic = f"{topic_name}-value"

base_schema_subject = f"{subject}_base_schema_subject"
google_postal_address_schema_subject = f"{subject}_google_address_schema_subject"

CUSTOMER_PLACE_PROTO = """
syntax = "proto3";
package a1;
message Place {
string city = 1;
int32 zone = 2;
}
"""

body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO}
res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body)
assert res.status_code == 200

if not google_library_included:
GOOGLE_POSTAL_ADDRESS_PROTO = """
syntax = "proto3";

package google.type;

option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress";
option java_multiple_files = true;
option java_outer_classname = "PostalAddressProto";
option java_package = "com.google.type";
option objc_class_prefix = "GTP";
message PostalAddress {
int32 revision = 1;
string region_code = 2;
string language_code = 3;
string postal_code = 4;
string sorting_code = 5;
string administrative_area = 6;
string locality = 7;
string sublocality = 8;
repeated string address_lines = 9;
repeated string recipients = 10;
string organization = 11;
}
"""

body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO}
res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body)
assert res.status_code == 200

postal_address_import = (
'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";'
)

CUSTOMER_PROTO = f"""
syntax = "proto3";
package a1;
import "Place.proto";

{postal_address_import}

// @producer: another comment
message Customer {{
string name = 1;
int32 code = 2;
Place place = 3;
google.type.PostalAddress address = 4;
}}
"""

def references() -> Generator[str, None, None]:
yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1}

if not google_library_included:
yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1}

body = {
"schemaType": "PROTOBUF",
"schema": CUSTOMER_PROTO,
"references": list(references()),
}
res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body)

assert res.status_code == 200
topic_schema_id = res.json()["id"]

message_to_produce = [
{
"name": "John Doe",
"code": 123456,
"place": {"city": "New York", "zone": 5},
"address": {
"revision": 1,
"region_code": "US",
"postal_code": "10001",
"address_lines": ["123 Main St", "Apt 4"],
},
},
{
"name": "Sophie Smith",
"code": 987654,
"place": {"city": "London", "zone": 3},
"address": {
"revision": 2,
"region_code": "UK",
"postal_code": "SW1A 1AA",
"address_lines": ["10 Downing Street"],
},
},
{
"name": "Pierre Dupont",
"code": 246813,
"place": {"city": "Paris", "zone": 1},
"address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]},
},
]

res = await rest_async_client.post(
f"/topics/{topic_name}",
json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]},
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

group = new_random_name("protobuf_recursive_reference_message")
instance_id = await new_consumer(rest_async_client, group)

subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription"

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
data = resp.json()

assert isinstance(data, list)
assert len(data) == 3

for i in range(0, 3):
msg = data[i]
expected_message = message_to_produce[i]

assert "key" in msg
assert "offset" in msg
assert "topic" in msg
assert "value" in msg
assert "timestamp" in msg

assert msg["key"] is None, "no key defined in production"
assert msg["topic"] == topic_name

for key in expected_message.keys():
if key == "address":
for address_key in expected_message["address"].keys():
assert expected_message["address"][address_key] == msg["value"]["address"][address_key]
else:
assert msg["value"][key] == expected_message[key]
Loading