Skip to content

Commit

Permalink
Merge pull request #735 from Aiven-Open/subject-name-strategy
Browse files Browse the repository at this point in the history
Fix subject name strategy config and add validation flag
  • Loading branch information
giuseppelillo authored Oct 12, 2023
2 parents 9a4f91e + a46d3e1 commit 211a4b9
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 8 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,11 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- ``runtime``
- Runtime directory for the ``protoc`` protobuf schema parser and code generator
* - ``name_strategy``
- ``subject_name``
- ``topic_name``
- Name strategy to use when storing schemas from the kafka rest proxy service
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
2 changes: 1 addition & 1 deletion container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ RUN groupadd --system karapace \
&& chown --recursive karapace:karapace /opt/karapace /var/log/karapace

# Install protobuf compiler.
ARG PROTOBUF_COMPILER_VERSION="3.12.4-1"
ARG PROTOBUF_COMPILER_VERSION="3.12.4-1+deb11u1"
RUN apt-get update \
&& apt-get install --assume-yes --no-install-recommends \
protobuf-compiler=$PROTOBUF_COMPILER_VERSION \
Expand Down
18 changes: 18 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Config(TypedDict):
session_timeout_ms: int
karapace_rest: bool
karapace_registry: bool
name_strategy: str
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str

Expand Down Expand Up @@ -142,6 +144,8 @@ class ConfigDefaults(Config, total=False):
"session_timeout_ms": 10000,
"karapace_rest": False,
"karapace_registry": False,
"name_strategy": "topic_name",
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
}
Expand All @@ -158,6 +162,13 @@ class ElectionStrategy(Enum):
lowest = "lowest"


@unique
class NameStrategy(Enum):
topic_name = "topic_name"
record_name = "record_name"
topic_record_name = "topic_record_name"


def parse_env_value(value: str) -> str | int | bool:
# we only have ints, strings and bools in the config
try:
Expand Down Expand Up @@ -256,6 +267,13 @@ def validate_config(config: Config) -> None:
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}"
) from None

name_strategy = config["name_strategy"]
try:
NameStrategy(name_strategy)
except ValueError:
valid_strategies = [strategy.value for strategy in NameStrategy]
raise InvalidConfiguration(f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}") from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
raise InvalidConfiguration(
"Using 'rest_authorization' requires configuration value for 'sasl_bootstrap_uri' to be set"
Expand Down
2 changes: 1 addition & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
need_new_call=subject_not_included,
)

if subject_not_included(parsed_schema, valid_subjects):
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
raise InvalidSchema()

return schema_id
Expand Down
5 changes: 2 additions & 3 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ class SchemaRegistrySerializer:
def __init__(
self,
config: dict,
name_strategy: str = "topic_name",
**cfg, # pylint: disable=unused-argument
) -> None:
self.config = config
self.state_lock = asyncio.Lock()
Expand All @@ -245,7 +243,8 @@ def __init__(
else:
registry_url = f"http://{self.config['registry_host']}:{self.config['registry_port']}"
registry_client = SchemaRegistryClient(registry_url, session_auth=session_auth)
self.subject_name_strategy = NAME_STRATEGIES[name_strategy]
name_strategy = config.get("name_strategy", "topic_name")
self.subject_name_strategy = NAME_STRATEGIES.get(name_strategy, topic_name_strategy)
self.registry_client: Optional[SchemaRegistryClient] = registry_client
self.ids_to_schemas: Dict[int, TypedSchema] = {}
self.ids_to_subjects: MutableMapping[int, List[Subject]] = TTLCache(maxsize=10000, ttl=600)
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,80 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
await client.close()


@pytest.fixture(scope="function", name="rest_async_novalidation")
async def fixture_rest_async_novalidation(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
# won't work and will cause test failures.
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults(
{
"admin_metadata_max_age": 2,
"bootstrap_uri": kafka_servers.bootstrap_servers,
# Use non-default max request size for REST producer.
"producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES,
"name_strategy_validation": False, # This should be only difference from rest_async
}
)
write_config(config_path, config)
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
rest.serializer.registry_client.client = registry_async_client
try:
yield rest
finally:
await rest.close()


@pytest.fixture(scope="function", name="rest_async_novalidation_client")
async def fixture_rest_async_novalidationclient(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
rest_async_novalidation: KafkaRest,
aiohttp_client: AiohttpClient,
) -> AsyncIterator[Client]:
rest_url = request.config.getoption("rest_url")

# client and server_uri are incompatible settings.
if rest_url:
client = Client(server_uri=rest_url)
else:

async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
return await aiohttp_client(rest_async_novalidation.app)

client = Client(client_factory=get_client)

try:
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client
finally:
await client.close()


@pytest.fixture(scope="function", name="rest_async_registry_auth")
async def fixture_rest_async_registry_auth(
request: SubRequest,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,70 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi
assert res.status_code == 200


async def test_publish_with_schema_id_of_another_subject_novalidation(
rest_async_novalidation_client, registry_async_client, admin_client
):
"""
Same as above but with name_strategy_validation disabled as config
"""
topic_name = new_topic(admin_client)
subject_1 = f"{topic_name}-value"
subject_2 = "some-other-subject-value"

await wait_for_topics(rest_async_novalidation_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"

schema_1 = {
"type": "record",
"name": "Schema1",
"fields": [
{
"name": "name",
"type": "string",
},
],
}
schema_2 = {
"type": "record",
"name": "Schema2",
"fields": [
{
"name": "temperature",
"type": "int",
},
],
}

# Register schemas to get the ids
res = await registry_async_client.post(
f"subjects/{subject_1}/versions",
json={"schema": json.dumps(schema_1)},
)
assert res.status_code == 200
schema_1_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{subject_2}/versions",
json={"schema": json.dumps(schema_2)},
)
assert res.status_code == 200
schema_2_id = res.json()["id"]

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200 # This is fine if name_strategy_validation is disabled

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200


async def test_brokers(rest_async_client):
res = await rest_async_client.get("/brokers")
assert res.ok
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_protobuf_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer:
with open(config_path, encoding="utf8") as handler:
config = read_config(handler)
serializer = SchemaRegistrySerializer(config_path=config_path, config=config)
serializer = SchemaRegistrySerializer(config=config)
await serializer.registry_client.close()
serializer.registry_client = mock_client
return serializer
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer:
with open(config_path, encoding="utf8") as handler:
config = read_config(handler)
serializer = SchemaRegistrySerializer(config_path=config_path, config=config)
serializer = SchemaRegistrySerializer(config=config)
await serializer.registry_client.close()
serializer.registry_client = mock_client
return serializer
Expand Down

0 comments on commit 211a4b9

Please sign in to comment.