Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add version of karapace to the healthcheck response #744

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from karapace.rapu import HTTPRequest, HTTPResponse, RestApp
from karapace.typing import JsonObject
from karapace.utils import json_encode
from karapace.version import __version__
from typing import Awaitable, Callable, NoReturn
from typing_extensions import TypeAlias

Expand Down Expand Up @@ -79,7 +80,10 @@ async def root_get(self) -> NoReturn:
self.r({}, "application/json")

async def health(self, _request: Request) -> aiohttp.web.Response:
resp: JsonObject = {"process_uptime_sec": int(time.monotonic() - self._process_start_time)}
resp: JsonObject = {
"process_uptime_sec": int(time.monotonic() - self._process_start_time),
"karapace_version": __version__,
}
for hook in self.health_hooks:
resp.update(await hook())
return aiohttp.web.Response(
Expand Down
74 changes: 53 additions & 21 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"""
from __future__ import annotations

from kafka import KafkaProducer
from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient
from karapace.version import __version__
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
Expand All @@ -27,7 +31,7 @@
NEW_TOPIC_TIMEOUT = 10


def check_successful_publish_response(success_response, objects, partition_id=None):
def check_successful_publish_response(success_response, objects, partition_id=None) -> None:
assert success_response.ok
success_response = success_response.json()
for k in ["value_schema_id", "offsets"]:
Expand All @@ -40,15 +44,25 @@ def check_successful_publish_response(success_response, objects, partition_id=No
assert partition_id == o["partition"]


async def test_request_body_too_large(rest_async_client, admin_client):
async def test_health_endpoint(rest_async_client: Client) -> None:
res = await rest_async_client.get("/_health")
assert res.status_code == 200
response = res.json()
assert "process_uptime_sec" in response
assert "karapace_version" in response
assert response["process_uptime_sec"] >= 0
assert response["karapace_version"] == __version__


async def test_request_body_too_large(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
pl = {"records": [{"value": 1_048_576 * "a"}]}
res = await rest_async_client.post(f"/topics/{tn}", pl, headers={"Content-Type": "application/json"})
assert res.status_code == 413


async def test_content_types(rest_async_client, admin_client):
async def test_content_types(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
valid_headers = [
Expand Down Expand Up @@ -120,7 +134,7 @@ async def test_content_types(rest_async_client, admin_client):
assert not res.ok


async def test_avro_publish_primitive_schema(rest_async_client, admin_client):
async def test_avro_publish_primitive_schema(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None:
topic_str = new_topic(admin_client)
topic_int = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_str, topic_int], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -142,7 +156,11 @@ async def test_avro_publish_primitive_schema(rest_async_client, admin_client):
assert "partition" in o


async def test_avro_publish(rest_async_client, registry_async_client, admin_client):
async def test_avro_publish(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
tn = new_topic(admin_client)
other_tn = new_topic(admin_client)

Expand Down Expand Up @@ -186,7 +204,7 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


async def test_admin_client(admin_client, producer):
async def test_admin_client(admin_client: KafkaRestAdminClient, producer: KafkaProducer) -> None:
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
retrieved_names = list(topic_info["topics"].keys())
Expand Down Expand Up @@ -226,7 +244,7 @@ async def test_admin_client(admin_client, producer):
admin_client.cluster_metadata(topics=["another_invalid_name"])


async def test_internal(rest_async, admin_client):
async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
prepared_records = [
[b"key", b"value", 0],
Expand Down Expand Up @@ -259,7 +277,7 @@ async def test_internal(rest_async, admin_client):
assert rest_async_proxy.all_empty({"records": [{"value": {"foo": "bar"}}]}, "key") is True


async def test_topics(rest_async_client, admin_client):
async def test_topics(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_foo = "foo"
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -280,7 +298,7 @@ async def test_topics(rest_async_client, admin_client):
assert res.json()["error_code"] == 40403, "Error code does not match"


async def test_list_topics(rest_async_client, admin_client):
async def test_list_topics(rest_async_client, admin_client) -> None:
tn1 = new_topic(admin_client)
tn2 = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn1, tn2], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand All @@ -299,7 +317,7 @@ async def test_list_topics(rest_async_client, admin_client):
assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}"


async def test_publish(rest_async_client, admin_client):
async def test_publish(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
topic_url = f"/topics/{topic}"
Expand All @@ -319,7 +337,7 @@ async def test_publish(rest_async_client, admin_client):

# Produce messages to a topic without key and without explicit partition to verify that
# partitioner assigns partition randomly
async def test_publish_random_partitioning(rest_async_client, admin_client):
async def test_publish_random_partitioning(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic = new_topic(admin_client, num_partitions=100)
await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
topic_url = f"/topics/{topic}"
Expand All @@ -338,7 +356,7 @@ async def test_publish_random_partitioning(rest_async_client, admin_client):
assert len(partitions_seen) >= 2, "Partitioner should randomly assign to different partitions if no key given"


async def test_publish_malformed_requests(rest_async_client, admin_client):
async def test_publish_malformed_requests(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
for url in [f"/topics/{topic_name}", f"/topics/{topic_name}/partitions/0"]:
Expand Down Expand Up @@ -376,7 +394,7 @@ async def test_publish_malformed_requests(rest_async_client, admin_client):
assert res.status_code == 422


async def test_too_large_record(rest_async_client, admin_client):
async def test_too_large_record(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
# Record batch overhead is 22 bytes, reduce just above
Expand All @@ -392,7 +410,7 @@ async def test_too_large_record(rest_async_client, admin_client):
)


async def test_publish_to_nonexisting_topic(rest_async_client):
async def test_publish_to_nonexisting_topic(rest_async_client: Client) -> None:
tn = new_random_name("topic-that-should-not-exist")
header = REST_HEADERS["avro"]
# check succeeds with 1 record and brand new schema
Expand All @@ -405,7 +423,11 @@ async def test_publish_to_nonexisting_topic(rest_async_client):
assert res.json()["error_code"] == 40401, "Error code should be for topic not found"


async def test_publish_with_incompatible_data(rest_async_client, registry_async_client, admin_client):
async def test_publish_with_incompatible_data(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
topic_name = new_topic(admin_client)
subject_1 = f"{topic_name}-value"

Expand Down Expand Up @@ -448,7 +470,7 @@ async def test_publish_with_incompatible_data(rest_async_client, registry_async_
assert "Object does not fit to stored schema" in res_json["message"]


async def test_publish_with_incompatible_schema(rest_async_client, admin_client):
async def test_publish_with_incompatible_schema(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None:
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"
Expand Down Expand Up @@ -493,7 +515,11 @@ async def test_publish_with_incompatible_schema(rest_async_client, admin_client)
assert "Error when registering schema" in res_json["message"]


async def test_publish_with_schema_id_of_another_subject(rest_async_client, registry_async_client, admin_client):
async def test_publish_with_schema_id_of_another_subject(
rest_async_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
"""
Karapace issue 658: https://github.com/aiven/karapace/issues/658
"""
Expand Down Expand Up @@ -560,8 +586,10 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi


async def test_publish_with_schema_id_of_another_subject_novalidation(
rest_async_novalidation_client, registry_async_client, admin_client
):
rest_async_novalidation_client: Client,
registry_async_client: Client,
admin_client: KafkaRestAdminClient,
) -> None:
"""
Same as above but with name_strategy_validation disabled as config
"""
Expand Down Expand Up @@ -623,13 +651,17 @@ async def test_publish_with_schema_id_of_another_subject_novalidation(
assert res.status_code == 200


async def test_brokers(rest_async_client):
async def test_brokers(rest_async_client: Client) -> None:
res = await rest_async_client.get("/brokers")
assert res.ok
assert len(res.json()) == 1, "Only one broker should be running"


async def test_partitions(rest_async_client, admin_client, producer):
async def test_partitions(
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
producer: KafkaProducer,
) -> None:
# TODO -> This seems to be the only combination accepted by the offsets endpoint
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
Expand Down