Skip to content

Commit

Permalink
test: adding a couple of basic tests in our test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Sep 25, 2023
1 parent fb59e41 commit 8d4ea3e
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 4 deletions.
52 changes: 52 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from __future__ import annotations

from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand Down Expand Up @@ -186,6 +188,56 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_another_avro_publish(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
):
topic = new_topic(admin_client)
other_tn = new_topic(admin_client)

await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
header = REST_HEADERS["avro"]

tested_avro_schema = {
"type": "record",
"name": "example",
"namespace": "example",
"doc": "example",
"fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}],
}

schema_str = json.dumps(tested_avro_schema)

# check succeeds with 1 record and brand new schema]
res = await registry_async_client.post(
f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

key_schema_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"}
)
assert res.ok

value_schema_id = res.json()["id"]

key_body = {"test": 5}

value_body = {"test": 5}

body = {
"key_schema_id": key_schema_id,
"value_schema_id": value_schema_id,
"records": [{"key": key_body, "value": value_body}],
}

url = f"/topics/{topic}"
res = await rest_async_client.post(url, json=body, headers=header)
assert res.ok

async def test_admin_client(admin_client, producer):
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
Expand Down
179 changes: 175 additions & 4 deletions tests/integration/test_rest_consumer_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from karapace.protobuf.kotlin_wrapper import trim_margin
from tests.integration.test_rest import NEW_TOPIC_TIMEOUT
from tests.utils import (
create_subject_name_factory,
new_consumer,
new_random_name,
new_topic,
Expand All @@ -17,6 +17,7 @@
schema_data_second,
wait_for_topics,
)
from typing import Generator

import pytest

Expand Down Expand Up @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand Down Expand Up @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

Expand All @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references(

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"])
res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
Expand All @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references(
assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available"
assert msg["topic"] == topic_name
assert msg["value"] == produced_message


@pytest.mark.parametrize("google_library_included", [True, False])
async def test_produce_and_retrieve_protobuf(
registry_async_client: Client,
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
google_library_included: bool,
) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")()
subject_topic = f"{topic_name}-value"

base_schema_subject = f"{subject}_base_schema_subject"
google_postal_address_schema_subject = f"{subject}_google_address_schema_subject"

CUSTOMER_PLACE_PROTO = """
syntax = "proto3";
package a1;
message Place {
string city = 1;
int32 zone = 2;
}
"""

body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO}
res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body)
assert res.status_code == 200

if not google_library_included:
GOOGLE_POSTAL_ADDRESS_PROTO = """
syntax = "proto3";
package google.type;
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress";
option java_multiple_files = true;
option java_outer_classname = "PostalAddressProto";
option java_package = "com.google.type";
option objc_class_prefix = "GTP";
message PostalAddress {
int32 revision = 1;
string region_code = 2;
string language_code = 3;
string postal_code = 4;
string sorting_code = 5;
string administrative_area = 6;
string locality = 7;
string sublocality = 8;
repeated string address_lines = 9;
repeated string recipients = 10;
string organization = 11;
}
"""

body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO}
res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body)
assert res.status_code == 200

postal_address_import = (
'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";'
)

CUSTOMER_PROTO = f"""
syntax = "proto3";
package a1;
import "Place.proto";
{postal_address_import}
// @producer: another comment
message Customer {{
string name = 1;
int32 code = 2;
Place place = 3;
google.type.PostalAddress address = 4;
}}
"""

def references() -> Generator[str, None, None]:
yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1}

if not google_library_included:
yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1}

body = {
"schemaType": "PROTOBUF",
"schema": CUSTOMER_PROTO,
"references": list(references()),
}
res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body)

assert res.status_code == 200
topic_schema_id = res.json()["id"]

message_to_produce = [
{
"name": "John Doe",
"code": 123456,
"place": {"city": "New York", "zone": 5},
"address": {
"revision": 1,
"region_code": "US",
"postal_code": "10001",
"address_lines": ["123 Main St", "Apt 4"],
},
},
{
"name": "Sophie Smith",
"code": 987654,
"place": {"city": "London", "zone": 3},
"address": {
"revision": 2,
"region_code": "UK",
"postal_code": "SW1A 1AA",
"address_lines": ["10 Downing Street"],
},
},
{
"name": "Pierre Dupont",
"code": 246813,
"place": {"city": "Paris", "zone": 1},
"address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]},
},
]

res = await rest_async_client.post(
f"/topics/{topic_name}",
json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]},
headers=REST_HEADERS["protobuf"],
)
assert res.status_code == 200

group = new_random_name("protobuf_recursive_reference_message")
instance_id = await new_consumer(rest_async_client, group)

subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription"

consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000"

res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"])
assert res.ok

resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"])
data = resp.json()

assert isinstance(data, list)
assert len(data) == 3

for i in range(0, 3):
msg = data[i]
expected_message = message_to_produce[i]

assert "key" in msg
assert "offset" in msg
assert "topic" in msg
assert "value" in msg
assert "timestamp" in msg

assert msg["key"] is None, "no key defined in production"
assert msg["topic"] == topic_name

for key in expected_message.keys():
if key == "address":
for address_key in expected_message["address"].keys():
assert expected_message["address"][address_key] == msg["value"]["address"][address_key]
else:
assert msg["value"][key] == expected_message[key]

0 comments on commit 8d4ea3e

Please sign in to comment.