diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index ed93f067e..329b5361d 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -712,6 +712,13 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, ) + except InvalidPayload as e: + cause = str(e.__cause__) + KafkaRest.r( + body={"error_code": RESTErrorCodes.INVALID_DATA.value, "message": cause}, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) except SchemaRetrievalError as e: KafkaRest.r( body={"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, "message": str(e)}, diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 97466836d..e4949b43d 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -4,10 +4,12 @@ """ from __future__ import annotations +from dataclasses import dataclass from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX +from karapace.schema_type import SchemaType from karapace.version import __version__ from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -21,10 +23,12 @@ test_objects_avro_evolution, wait_for_topics, ) +from typing import Any, Mapping import asyncio import base64 import json +import pytest import time NEW_TOPIC_TIMEOUT = 10 @@ -675,3 +679,110 @@ async def test_partitions( res = await rest_async_client.get(f"/topics/{topic_name}/partitions/foo/offsets", headers=header) assert res.status_code == 404 assert res.json()["error_code"] == 404 + + +@dataclass +class IncompatibleDataForSchemaTestCase: + name: str + schema_type: SchemaType + schema: Mapping[str, Any] + expected_error_message: str + + def __str__(self) -> str: + return self.name + + +@pytest.mark.parametrize( + "testcase", + [ + IncompatibleDataForSchemaTestCase( + name="Avro schema, incompatible data", + schema_type=SchemaType.AVRO, + schema={ + "type": "record", + "namespace": "karapace.test", + "name": "IncompatibleDataTest", + "fields": [{"name": "validField", "type": "string"}, {"name": "invalidField", "type": "int"}], + }, + expected_error_message="Object does not fit to stored schema", + ), + IncompatibleDataForSchemaTestCase( + name="JSONSchema, incompatible data", + schema_type=SchemaType.JSONSCHEMA, + schema={ + "$schema": "https://json.schema.org/draft/2020-12/schema", + "$id": "https://example.com/json.schema.test", + "title": "JSON Schema Test", + "description": "a description", + "type": "object", + "properties": { + "validField": { + "type": "string", + }, + "invalidField": { + "type": "integer", + }, + }, + }, + expected_error_message=( + "'not an integer' is not of type 'integer'\n\n" + "Failed validating 'type' in schema['properties']['invalidField']:\n" + " {'type': 'integer'}\n\nOn instance['invalidField']:\n 'not an integer'" + ), + ), + ], + ids=str, +) +async def test_publish_invalid_data_for_schema( + testcase: IncompatibleDataForSchemaTestCase, + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaAdminClient, +) -> None: + incompatible_data = { + "records": [ + { + "value": { + "validField": "valid value", + "invalidField": "not an integer", + }, + }, + ], + } + + topic_name = new_topic(admin_client) + subject = f"{topic_name}-value" + + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + url = f"/topics/{topic_name}" + + # Register schemas to get the ids + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schema": json.dumps(testcase.schema), "schemaType": testcase.schema_type.value}, + ) + assert res.status_code == 200 + schema_id = res.json()["id"] + + payload = { + "value_schema_id": schema_id, + } + payload.update(incompatible_data) + + # Kludge: the schema type defines the value to "JSON", correct is "jsonschema" + # JSON serialization does not use schema to validate the data. + if SchemaType.JSONSCHEMA == testcase.schema_type: + serialization_format_name = "jsonschema" + else: + serialization_format_name = testcase.schema_type.value.lower() + + res = await rest_async_client.post( + url, + json=payload, + headers=REST_HEADERS[serialization_format_name], + ) + assert res.status_code == 422 + res_json = res.json() + assert res_json["error_code"] == 42205 + assert "message" in res_json + assert testcase.expected_error_message == res_json["message"]