diff --git a/README.rst b/README.rst index b230bc1dd..efc38e197 100644 --- a/README.rst +++ b/README.rst @@ -462,6 +462,9 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``name_strategy`` - ``topic_name`` - Name strategy to use when storing schemas from the kafka rest proxy service + * - ``name_strategy_validation`` + - ``true`` + - If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) diff --git a/karapace/config.py b/karapace/config.py index 41371bec9..ac837dd29 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -74,6 +74,7 @@ class Config(TypedDict): karapace_rest: bool karapace_registry: bool name_strategy: str + name_strategy_validation: bool master_election_strategy: str protobuf_runtime_directory: str @@ -144,6 +145,7 @@ class ConfigDefaults(Config, total=False): "karapace_rest": False, "karapace_registry": False, "name_strategy": "topic_name", + "name_strategy_validation": True, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", } diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 1b6bf4b6f..b6f5c43b5 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -779,7 +779,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: need_new_call=subject_not_included, ) - if subject_not_included(parsed_schema, valid_subjects): + if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects): raise InvalidSchema() return schema_id diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0c943280f..1c16b3c2b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -287,6 +287,80 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument await client.close() +@pytest.fixture(scope="function", name="rest_async_novalidation") +async def fixture_rest_async_novalidation( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + # Do not start a REST api when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + rest_url = request.config.getoption("rest_url") + if rest_url: + yield None + return + + config_path = tmp_path / "karapace_config.json" + + config = set_config_defaults( + { + "admin_metadata_max_age": 2, + "bootstrap_uri": kafka_servers.bootstrap_servers, + # Use non-default max request size for REST producer. + "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + "name_strategy_validation": False, # This should be only difference from rest_async + } + ) + write_config(config_path, config) + rest = KafkaRest(config=config) + + assert rest.serializer.registry_client + rest.serializer.registry_client.client = registry_async_client + try: + yield rest + finally: + await rest.close() + + +@pytest.fixture(scope="function", name="rest_async_novalidation_client") +async def fixture_rest_async_novalidationclient( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + rest_async_novalidation: KafkaRest, + aiohttp_client: AiohttpClient, +) -> AsyncIterator[Client]: + rest_url = request.config.getoption("rest_url") + + # client and server_uri are incompatible settings. + if rest_url: + client = Client(server_uri=rest_url) + else: + + async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument + return await aiohttp_client(rest_async_novalidation.app) + + client = Client(client_factory=get_client) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "brokers", + json_data=None, + headers=None, + error_msg="REST API is unreachable", + timeout=10, + sleep=0.3, + ) + yield client + finally: + await client.close() + + @pytest.fixture(scope="function", name="rest_async_registry_auth") async def fixture_rest_async_registry_auth( request: SubRequest, diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 8b73a7ffd..c7a49813c 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -559,6 +559,70 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi assert res.status_code == 200 +async def test_publish_with_schema_id_of_another_subject_novalidation( + rest_async_novalidation_client, registry_async_client, admin_client +): + """ + Same as above but with name_strategy_validation disabled as config + """ + topic_name = new_topic(admin_client) + subject_1 = f"{topic_name}-value" + subject_2 = "some-other-subject-value" + + await wait_for_topics(rest_async_novalidation_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + url = f"/topics/{topic_name}" + + schema_1 = { + "type": "record", + "name": "Schema1", + "fields": [ + { + "name": "name", + "type": "string", + }, + ], + } + schema_2 = { + "type": "record", + "name": "Schema2", + "fields": [ + { + "name": "temperature", + "type": "int", + }, + ], + } + + # Register schemas to get the ids + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": json.dumps(schema_1)}, + ) + assert res.status_code == 200 + schema_1_id = res.json()["id"] + + res = await registry_async_client.post( + f"subjects/{subject_2}/versions", + json={"schema": json.dumps(schema_2)}, + ) + assert res.status_code == 200 + schema_2_id = res.json()["id"] + + res = await rest_async_novalidation_client.post( + url, + json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 # This is fine if name_strategy_validation is disabled + + res = await rest_async_novalidation_client.post( + url, + json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 + + async def test_brokers(rest_async_client): res = await rest_async_client.get("/brokers") assert res.ok