From a46d3e13286e5d5b680b7e264f8703744623cf04 Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Wed, 11 Oct 2023 14:36:07 +0300 Subject: [PATCH] Implement setting for disabling name strategy validation Before Karapace 3.7.0 when producing messages with REST API Karapace did not validate that given schema belongs to valid subject as per name strategy. Add a setting to disable this validation to allow easily skip name strategy validation optionally. --- README.rst | 3 ++ karapace/config.py | 2 + karapace/kafka_rest_apis/__init__.py | 2 +- tests/integration/conftest.py | 74 ++++++++++++++++++++++++++++ tests/integration/test_rest.py | 64 ++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index b230bc1dd..efc38e197 100644 --- a/README.rst +++ b/README.rst @@ -462,6 +462,9 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``name_strategy`` - ``topic_name`` - Name strategy to use when storing schemas from the kafka rest proxy service + * - ``name_strategy_validation`` + - ``true`` + - If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) diff --git a/karapace/config.py b/karapace/config.py index 41371bec9..ac837dd29 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -74,6 +74,7 @@ class Config(TypedDict): karapace_rest: bool karapace_registry: bool name_strategy: str + name_strategy_validation: bool master_election_strategy: str protobuf_runtime_directory: str @@ -144,6 +145,7 @@ class ConfigDefaults(Config, total=False): "karapace_rest": False, "karapace_registry": False, "name_strategy": "topic_name", + "name_strategy_validation": True, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", } diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 1b6bf4b6f..b6f5c43b5 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -779,7 +779,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: need_new_call=subject_not_included, ) - if subject_not_included(parsed_schema, valid_subjects): + if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects): raise InvalidSchema() return schema_id diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0c943280f..1c16b3c2b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -287,6 +287,80 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument await client.close() +@pytest.fixture(scope="function", name="rest_async_novalidation") +async def fixture_rest_async_novalidation( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + # Do not start a REST api when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + rest_url = request.config.getoption("rest_url") + if rest_url: + yield None + return + + config_path = tmp_path / "karapace_config.json" + + config = set_config_defaults( + { + "admin_metadata_max_age": 2, + "bootstrap_uri": kafka_servers.bootstrap_servers, + # Use non-default max request size for REST producer. + "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + "name_strategy_validation": False, # This should be only difference from rest_async + } + ) + write_config(config_path, config) + rest = KafkaRest(config=config) + + assert rest.serializer.registry_client + rest.serializer.registry_client.client = registry_async_client + try: + yield rest + finally: + await rest.close() + + +@pytest.fixture(scope="function", name="rest_async_novalidation_client") +async def fixture_rest_async_novalidationclient( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + rest_async_novalidation: KafkaRest, + aiohttp_client: AiohttpClient, +) -> AsyncIterator[Client]: + rest_url = request.config.getoption("rest_url") + + # client and server_uri are incompatible settings. + if rest_url: + client = Client(server_uri=rest_url) + else: + + async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument + return await aiohttp_client(rest_async_novalidation.app) + + client = Client(client_factory=get_client) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "brokers", + json_data=None, + headers=None, + error_msg="REST API is unreachable", + timeout=10, + sleep=0.3, + ) + yield client + finally: + await client.close() + + @pytest.fixture(scope="function", name="rest_async_registry_auth") async def fixture_rest_async_registry_auth( request: SubRequest, diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 8b73a7ffd..c7a49813c 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -559,6 +559,70 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi assert res.status_code == 200 +async def test_publish_with_schema_id_of_another_subject_novalidation( + rest_async_novalidation_client, registry_async_client, admin_client +): + """ + Same as above but with name_strategy_validation disabled as config + """ + topic_name = new_topic(admin_client) + subject_1 = f"{topic_name}-value" + subject_2 = "some-other-subject-value" + + await wait_for_topics(rest_async_novalidation_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + url = f"/topics/{topic_name}" + + schema_1 = { + "type": "record", + "name": "Schema1", + "fields": [ + { + "name": "name", + "type": "string", + }, + ], + } + schema_2 = { + "type": "record", + "name": "Schema2", + "fields": [ + { + "name": "temperature", + "type": "int", + }, + ], + } + + # Register schemas to get the ids + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": json.dumps(schema_1)}, + ) + assert res.status_code == 200 + schema_1_id = res.json()["id"] + + res = await registry_async_client.post( + f"subjects/{subject_2}/versions", + json={"schema": json.dumps(schema_2)}, + ) + assert res.status_code == 200 + schema_2_id = res.json()["id"] + + res = await rest_async_novalidation_client.post( + url, + json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 # This is fine if name_strategy_validation is disabled + + res = await rest_async_novalidation_client.post( + url, + json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 + + async def test_brokers(rest_async_client): res = await rest_async_client.get("/brokers") assert res.ok