Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add JSON schema test to schema in multiple subjects test #878

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 123 additions & 46 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from attr import dataclass
from http import HTTPStatus
from karapace.client import Client
from karapace.kafka.producer import KafkaProducer
from karapace.rapu import is_success
from karapace.schema_registry_apis import SchemaErrorMessages
from karapace.schema_type import SchemaType
from karapace.utils import json_encode
from tests.base_testcase import BaseTestCase
from tests.integration.utils.cluster import RegistryDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.utils import (
Expand Down Expand Up @@ -1079,85 +1082,159 @@ async def assert_schema_versions_failed(client: Client, trail: str, schema_id: i
assert res.status_code == response_code


async def register_schema(registry_async_client: Client, trail, subject: str, schema_str: str) -> Tuple[int, int]:
async def register_schema(
registry_async_client: Client, trail: str, subject: str, schema_str: str, schema_type: SchemaType = SchemaType.AVRO
) -> Tuple[int, int]:
# Register to get the id
payload = {"schema": schema_str}
if schema_type == SchemaType.JSONSCHEMA:
payload["schemaType"] = "JSON"
nosahama marked this conversation as resolved.
Show resolved Hide resolved
elif schema_type == SchemaType.PROTOBUF:
payload["schemaType"] = "PROTOBUF"
else:
pass
res = await registry_async_client.post(
f"subjects/{subject}/versions{trail}",
json={"schema": schema_str},
json=payload,
)
assert res.status_code == 200
schema_id = res.json()["id"]

# Get version
res = await registry_async_client.post(
f"subjects/{subject}{trail}",
json={"schema": schema_str},
)
res = await registry_async_client.post(f"subjects/{subject}{trail}", json=payload)
assert res.status_code == 200
assert res.json()["id"] == schema_id
return schema_id, res.json()["version"]


@pytest.mark.parametrize("trail", ["", "/"])
async def test_schema_versions_multiple_subjects_same_schema(registry_async_client: Client, trail: str) -> None:
@dataclass
class MultipleSubjectsSameSchemaTestCase(BaseTestCase):
test_name: str
schema: str
other_schema: str
schema_type: SchemaType


@pytest.mark.parametrize(
"testcase",
[
MultipleSubjectsSameSchemaTestCase(
test_name="Test same AVRO schema on multiple subjects",
schema=json.dumps(
{
"type": "record",
"name": "SimpleTestSchema",
"fields": [
{
"name": "f1",
"type": "string",
},
{
"name": "f2",
"type": "string",
},
],
},
),
other_schema=json.dumps(
{
"type": "record",
"name": "SimpleOtherTestSchema",
"fields": [
{
"name": "f1",
"type": "string",
},
],
},
),
schema_type=SchemaType.AVRO,
),
MultipleSubjectsSameSchemaTestCase(
test_name="Test same JSON schema on multiple subjects",
schema=json.dumps(
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://example.com/product.schema.json",
"title": "SimpleTest",
"description": "Test JSON schema",
"type": "object",
"properties": {
"f1": {
"type": "string",
},
"f2": {
"type": "string",
},
},
},
),
other_schema=json.dumps(
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://example.com/product.schema.json",
"title": "SimpleTestOtherSchema",
"description": "Test JSON schema",
"type": "object",
"properties": {
"other_schema_field": {
"type": "integer",
},
},
}
),
schema_type=SchemaType.JSONSCHEMA,
),
],
)
async def test_schema_versions_multiple_subjects_same_schema(
registry_async_client: Client,
testcase: MultipleSubjectsSameSchemaTestCase,
) -> None:
"""
Tests case where there are multiple subjects with the same schema.
The schema/versions endpoint returns all these subjects.
"""
subject_name_factory = create_subject_name_factory(f"test_schema_versions_multiple_subjects_same_schema-{trail}")
schema_name_factory = create_schema_name_factory(f"test_schema_versions_multiple_subjects_same_schema_{trail}")

schema_1 = {
"type": "record",
"name": schema_name_factory(),
"fields": [
{
"name": "f1",
"type": "string",
},
{
"name": "f2",
"type": "string",
},
],
}
schema_str_1 = json.dumps(schema_1)
schema_2 = {
"type": "record",
"name": schema_name_factory(),
"fields": [
{
"name": "f1",
"type": "string",
}
],
}
schema_str_2 = json.dumps(schema_2)
subject_name_factory = create_subject_name_factory(
f"test_schema_versions_multiple_subjects_same_schema-{testcase.schema_type}"
)

subject_1 = subject_name_factory()
schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject_1, schema_str_1)
schema_id_1, version_1 = await register_schema(
registry_async_client, "", subject_1, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1)]
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

subject_2 = subject_name_factory()
schema_id_2, version_2 = await register_schema(registry_async_client, trail, subject_2, schema_str_1)
schema_id_2, version_2 = await register_schema(
registry_async_client, "", subject_2, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1), (subject_2, version_2)]
assert schema_id_1 == schema_id_2
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

subject_3 = subject_name_factory()
schema_id_3, version_3 = await register_schema(registry_async_client, trail, subject_3, schema_str_1)
schema_id_3, version_3 = await register_schema(
registry_async_client, "", subject_3, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1), (subject_2, version_2), (subject_3, version_3)]
assert schema_id_1 == schema_id_3
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

# subject_4 with different schema to check there are no side effects
subject_4 = subject_name_factory()
schema_id_4, version_4 = await register_schema(registry_async_client, trail, subject_4, schema_str_2)
schema_id_4, version_4 = await register_schema(
registry_async_client, "", subject_4, testcase.other_schema, schema_type=testcase.schema_type
)
schema_2_versions = [(subject_4, version_4)]
assert schema_id_1 != schema_id_4
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, trail, schema_id_4, schema_2_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_4, schema_2_versions)

res = await registry_async_client.get("subjects")
assert res.status_code == 200
assert res.json() == [subject_1, subject_2, subject_3, subject_4]


@pytest.mark.parametrize("trail", ["", "/"])
Expand Down
Loading