Skip to content

Commit

Permalink
Merge pull request #853 from Aiven-Open/jjaakola-aiven-rest-proxy-ret…
Browse files Browse the repository at this point in the history
…urn-422-for-invalid-payload

fix: return 422 for Avro/JSONSchema when payload does not match schema
  • Loading branch information
tvainika authored Apr 30, 2024
2 parents f0264eb + 76cab2d commit ffad746
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
7 changes: 7 additions & 0 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,13 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
except InvalidPayload as e:
cause = str(e.__cause__)
KafkaRest.r(
body={"error_code": RESTErrorCodes.INVALID_DATA.value, "message": cause},
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
except SchemaRetrievalError as e:
KafkaRest.r(
body={"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, "message": str(e)},
Expand Down
111 changes: 111 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"""
from __future__ import annotations

from dataclasses import dataclass
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX
from karapace.schema_type import SchemaType
from karapace.version import __version__
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand All @@ -21,10 +23,12 @@
test_objects_avro_evolution,
wait_for_topics,
)
from typing import Any, Mapping

import asyncio
import base64
import json
import pytest
import time

NEW_TOPIC_TIMEOUT = 10
Expand Down Expand Up @@ -675,3 +679,110 @@ async def test_partitions(
res = await rest_async_client.get(f"/topics/{topic_name}/partitions/foo/offsets", headers=header)
assert res.status_code == 404
assert res.json()["error_code"] == 404


@dataclass
class IncompatibleDataForSchemaTestCase:
name: str
schema_type: SchemaType
schema: Mapping[str, Any]
expected_error_message: str

def __str__(self) -> str:
return self.name


@pytest.mark.parametrize(
"testcase",
[
IncompatibleDataForSchemaTestCase(
name="Avro schema, incompatible data",
schema_type=SchemaType.AVRO,
schema={
"type": "record",
"namespace": "karapace.test",
"name": "IncompatibleDataTest",
"fields": [{"name": "validField", "type": "string"}, {"name": "invalidField", "type": "int"}],
},
expected_error_message="Object does not fit to stored schema",
),
IncompatibleDataForSchemaTestCase(
name="JSONSchema, incompatible data",
schema_type=SchemaType.JSONSCHEMA,
schema={
"$schema": "https://json.schema.org/draft/2020-12/schema",
"$id": "https://example.com/json.schema.test",
"title": "JSON Schema Test",
"description": "a description",
"type": "object",
"properties": {
"validField": {
"type": "string",
},
"invalidField": {
"type": "integer",
},
},
},
expected_error_message=(
"'not an integer' is not of type 'integer'\n\n"
"Failed validating 'type' in schema['properties']['invalidField']:\n"
" {'type': 'integer'}\n\nOn instance['invalidField']:\n 'not an integer'"
),
),
],
ids=str,
)
async def test_publish_invalid_data_for_schema(
testcase: IncompatibleDataForSchemaTestCase,
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaAdminClient,
) -> None:
incompatible_data = {
"records": [
{
"value": {
"validField": "valid value",
"invalidField": "not an integer",
},
},
],
}

topic_name = new_topic(admin_client)
subject = f"{topic_name}-value"

await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"

# Register schemas to get the ids
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schema": json.dumps(testcase.schema), "schemaType": testcase.schema_type.value},
)
assert res.status_code == 200
schema_id = res.json()["id"]

payload = {
"value_schema_id": schema_id,
}
payload.update(incompatible_data)

# Kludge: the schema type defines the value to "JSON", correct is "jsonschema"
# JSON serialization does not use schema to validate the data.
if SchemaType.JSONSCHEMA == testcase.schema_type:
serialization_format_name = "jsonschema"
else:
serialization_format_name = testcase.schema_type.value.lower()

res = await rest_async_client.post(
url,
json=payload,
headers=REST_HEADERS[serialization_format_name],
)
assert res.status_code == 422
res_json = res.json()
assert res_json["error_code"] == 42205
assert "message" in res_json
assert testcase.expected_error_message == res_json["message"]

0 comments on commit ffad746

Please sign in to comment.