Skip to content

Commit

Permalink
Implement setting for disabling name strategy validation
Browse files Browse the repository at this point in the history
Before Karapace 3.7.0 when producing messages with REST API Karapace
did not validate that given schema belongs to valid subject as per
name strategy.  Add a setting to disable this validation to allow
easily skip name strategy validation optionally.
  • Loading branch information
tvainika committed Oct 11, 2023
1 parent 4e745f8 commit a46d3e1
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``name_strategy``
- ``topic_name``
- Name strategy to use when storing schemas from the kafka rest proxy service
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
2 changes: 2 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Config(TypedDict):
karapace_rest: bool
karapace_registry: bool
name_strategy: str
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str

Expand Down Expand Up @@ -144,6 +145,7 @@ class ConfigDefaults(Config, total=False):
"karapace_rest": False,
"karapace_registry": False,
"name_strategy": "topic_name",
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
}
Expand Down
2 changes: 1 addition & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
need_new_call=subject_not_included,
)

if subject_not_included(parsed_schema, valid_subjects):
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
raise InvalidSchema()

return schema_id
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,80 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
await client.close()


@pytest.fixture(scope="function", name="rest_async_novalidation")
async def fixture_rest_async_novalidation(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
# won't work and will cause test failures.
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults(
{
"admin_metadata_max_age": 2,
"bootstrap_uri": kafka_servers.bootstrap_servers,
# Use non-default max request size for REST producer.
"producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES,
"name_strategy_validation": False, # This should be only difference from rest_async
}
)
write_config(config_path, config)
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
rest.serializer.registry_client.client = registry_async_client
try:
yield rest
finally:
await rest.close()


@pytest.fixture(scope="function", name="rest_async_novalidation_client")
async def fixture_rest_async_novalidationclient(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
rest_async_novalidation: KafkaRest,
aiohttp_client: AiohttpClient,
) -> AsyncIterator[Client]:
rest_url = request.config.getoption("rest_url")

# client and server_uri are incompatible settings.
if rest_url:
client = Client(server_uri=rest_url)
else:

async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
return await aiohttp_client(rest_async_novalidation.app)

client = Client(client_factory=get_client)

try:
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client
finally:
await client.close()


@pytest.fixture(scope="function", name="rest_async_registry_auth")
async def fixture_rest_async_registry_auth(
request: SubRequest,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,70 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi
assert res.status_code == 200


async def test_publish_with_schema_id_of_another_subject_novalidation(
rest_async_novalidation_client, registry_async_client, admin_client
):
"""
Same as above but with name_strategy_validation disabled as config
"""
topic_name = new_topic(admin_client)
subject_1 = f"{topic_name}-value"
subject_2 = "some-other-subject-value"

await wait_for_topics(rest_async_novalidation_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"

schema_1 = {
"type": "record",
"name": "Schema1",
"fields": [
{
"name": "name",
"type": "string",
},
],
}
schema_2 = {
"type": "record",
"name": "Schema2",
"fields": [
{
"name": "temperature",
"type": "int",
},
],
}

# Register schemas to get the ids
res = await registry_async_client.post(
f"subjects/{subject_1}/versions",
json={"schema": json.dumps(schema_1)},
)
assert res.status_code == 200
schema_1_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{subject_2}/versions",
json={"schema": json.dumps(schema_2)},
)
assert res.status_code == 200
schema_2_id = res.json()["id"]

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200 # This is fine if name_strategy_validation is disabled

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200


async def test_brokers(rest_async_client):
res = await rest_async_client.get("/brokers")
assert res.ok
Expand Down

0 comments on commit a46d3e1

Please sign in to comment.