diff --git a/container/compose.yml b/container/compose.yml index c42d4b428..5272aa8d5 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -78,7 +78,7 @@ services: KARAPACE_GROUP_ID: karapace-schema-registry KARAPACE_MASTER_ELIGIBILITY: true KARAPACE_TOPIC_NAME: _schemas - KARAPACE_LOG_LEVEL: DEBUG + KARAPACE_LOG_LEVEL: INFO KARAPACE_COMPATIBILITY: FULL KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 @@ -117,7 +117,7 @@ services: KARAPACE_REGISTRY_HOST: karapace-schema-registry KARAPACE_REGISTRY_PORT: 8081 KARAPACE_ADMIN_METADATA_MAX_AGE: 0 - KARAPACE_LOG_LEVEL: DEBUG + KARAPACE_LOG_LEVEL: INFO KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false @@ -156,6 +156,11 @@ services: prometheus: image: prom/prometheus + command: + - --storage.tsdb.path=/prometheus + - --storage.tsdb.retention.time=1d + - --enable-feature=otlp-write-receiver + - --config.file=/etc/prometheus/prometheus.yml volumes: - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml - ./prometheus/rules.yml:/etc/prometheus/rules.yml diff --git a/container/opentelemetry/collector-config.yaml b/container/opentelemetry/collector-config.yaml index 3fcd0df0c..a7b67746d 100644 --- a/container/opentelemetry/collector-config.yaml +++ b/container/opentelemetry/collector-config.yaml @@ -15,7 +15,7 @@ exporters: tls: insecure: true otlphttp/prometheus: - endpoint: prometheus:9090/api/v1/otlp + endpoint: http://prometheus:9090/api/v1/otlp tls: insecure: true diff --git a/container/prometheus/prometheus.yml b/container/prometheus/prometheus.yml index 68b64109f..745041b82 100644 --- a/container/prometheus/prometheus.yml +++ b/container/prometheus/prometheus.yml @@ -3,6 +3,10 @@ global: scrape_timeout: 5s # How long until a scrape request times out. evaluation_interval: 10s # How frequently to evaluate rules. +storage: + tsdb: + out_of_order_time_window: 30m + rule_files: - /etc/prometheus/rules.yml diff --git a/src/karapace/config.py b/src/karapace/config.py index 0dd811d92..718cbca09 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -29,6 +29,7 @@ class KarapaceTags(BaseModel): class KarapaceTelemetry(BaseModel): otel_endpoint_url: str | None = None + metrics_export_interval_milliseconds: int = 10000 resource_service_name: str = "karapace" resource_service_instance_id: str = "karapace" resource_telemetry_sdk_name: str = "opentelemetry" diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index f8b5684c6..3d981c1aa 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -17,6 +17,7 @@ import schema_registry.routers.mode import schema_registry.routers.schemas import schema_registry.routers.subjects +import schema_registry.telemetry.meter import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -30,6 +31,7 @@ __name__, schema_registry.controller, schema_registry.telemetry.tracer, + schema_registry.telemetry.meter, ] ) diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py index b02c131c6..472064f94 100644 --- a/src/schema_registry/factory.py +++ b/src/schema_registry/factory.py @@ -17,7 +17,7 @@ from schema_registry.middlewares import setup_middlewares from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.setup import setup_routers -from schema_registry.telemetry.setup import setup_tracing +from schema_registry.telemetry.setup import setup_metering, setup_tracing from typing import AsyncContextManager import logging @@ -59,6 +59,7 @@ def create_karapace_application( app = FastAPI(lifespan=lifespan) # type: ignore[arg-type] setup_tracing() + setup_metering() setup_routers(app=app) setup_exception_handlers(app=app) setup_middlewares(app=app) diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py index d9d53ea2f..d60bdc102 100644 --- a/src/schema_registry/telemetry/container.py +++ b/src/schema_registry/telemetry/container.py @@ -9,10 +9,11 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.semconv.attributes import telemetry_attributes as T +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer -def create_tracing_resource(config: Config) -> Resource: +def create_telemetry_resource(config: Config) -> Resource: return Resource.create( { "service.name": config.telemetry.resource_service_name, @@ -26,6 +27,9 @@ def create_tracing_resource(config: Config) -> Resource: class TelemetryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) - tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) - tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) + + telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config) + + meter = providers.Singleton(Meter) tracer = providers.Singleton(Tracer) + tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource) diff --git a/src/schema_registry/telemetry/meter.py b/src/schema_registry/telemetry/meter.py new file mode 100644 index 000000000..d3912c4cd --- /dev/null +++ b/src/schema_registry/telemetry/meter.py @@ -0,0 +1,37 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + MetricExporter, + MetricReader, + PeriodicExportingMetricReader, +) +from typing import Final + + +class Meter: + START_TIME_KEY: Final = "start_time" + + @staticmethod + @inject + def get_meter(config: Config = Provide[KarapaceContainer.config]) -> metrics.Meter: + return metrics.get_meter_provider().get_meter(f"{config.tags.app}.meter") + + @staticmethod + @inject + def get_metric_reader(config: Config = Provide[KarapaceContainer.config]) -> MetricReader: + exporter: MetricExporter = ConsoleMetricExporter() + if config.telemetry.otel_endpoint_url: + exporter = OTLPMetricExporter(endpoint=config.telemetry.otel_endpoint_url) + return PeriodicExportingMetricReader( + exporter=exporter, + export_interval_millis=config.telemetry.metrics_export_interval_milliseconds, + ) diff --git a/src/schema_registry/telemetry/middleware.py b/src/schema_registry/telemetry/middleware.py index c6d14bbe9..56f0809ba 100644 --- a/src/schema_registry/telemetry/middleware.py +++ b/src/schema_registry/telemetry/middleware.py @@ -6,11 +6,14 @@ from collections.abc import Awaitable, Callable from dependency_injector.wiring import inject, Provide from fastapi import FastAPI, Request, Response +from opentelemetry.metrics import Counter, Histogram, UpDownCounter from opentelemetry.trace import SpanKind from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging +import time LOG = logging.getLogger(__name__) @@ -20,12 +23,59 @@ async def telemetry_middleware( request: Request, call_next: Callable[[Request], Awaitable[Response]], tracer: Tracer = Provide[TelemetryContainer.tracer], + meter: Meter = Provide[TelemetryContainer.meter], ) -> Response: resource = request.url.path.split("/")[1] with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: + span.add_event("Creating metering resources") + karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter( + name="karapace_http_requests_in_progress", + description="In-progress requests for HTTP/TCP Protocol", + ) + karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ) + karapace_http_requests_total: Counter = meter.get_meter().create_counter( + name="karapace_http_requests_total", + description="Total Request Count for HTTP/TCP Protocol", + ) + + # Set start time for request + setattr(request.state, meter.START_TIME_KEY, time.monotonic()) + + # Extract request labels + path = request.url.path + method = request.method + + # Increment requests in progress before response handler + span.add_event("Metering requests in progress (increase)") + karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path}) + + # Call request handler tracer.update_span_with_request(request=request, span=span) + span.add_event("Calling request handler") response: Response = await call_next(request) tracer.update_span_with_response(response=response, span=span) + + # Instrument request duration + span.add_event("Metering request duration") + karapace_http_requests_duration_seconds.record( + amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)), + attributes={"method": method, "path": path}, + ) + + # Instrument total requests + span.add_event("Metering total requests") + karapace_http_requests_total.add( + amount=1, attributes={"method": method, "path": path, "status": response.status_code} + ) + + # Decrement requests in progress after response handler + span.add_event("Metering requests in progress (decrease)") + karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path}) + return response diff --git a/src/schema_registry/telemetry/setup.py b/src/schema_registry/telemetry/setup.py index 30b423902..294f85c50 100644 --- a/src/schema_registry/telemetry/setup.py +++ b/src/schema_registry/telemetry/setup.py @@ -4,9 +4,12 @@ """ from dependency_injector.wiring import inject, Provide -from opentelemetry import trace +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging @@ -22,3 +25,12 @@ def setup_tracing( LOG.info("Setting OTel tracing provider") tracer_provider.add_span_processor(tracer.get_span_processor()) trace.set_tracer_provider(tracer_provider) + + +@inject +def setup_metering( + meter: Meter = Provide[TelemetryContainer.meter], + telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource], +) -> None: + LOG.info("Setting OTel meter provider") + metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()])) diff --git a/tests/conftest.py b/tests/conftest.py index 5b65c9405..b504dcbcf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ import pytest import re import schema_registry.controller +import schema_registry.telemetry.meter import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -194,6 +195,7 @@ def fixture_karapace_container() -> KarapaceContainer: modules=[ schema_registry.controller, schema_registry.telemetry.tracer, + schema_registry.telemetry.meter, ] ) return karapace_container diff --git a/tests/unit/schema_registry/telemetry/test_meter.py b/tests/unit/schema_registry/telemetry/test_meter.py new file mode 100644 index 000000000..24ed093d2 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_meter.py @@ -0,0 +1,51 @@ +""" +schema_registry - telemetry meter tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.config import KarapaceTelemetry +from karapace.container import KarapaceContainer +from schema_registry.telemetry.meter import Meter +from unittest.mock import patch + + +def test_meter(karapace_container: KarapaceContainer): + with patch("schema_registry.telemetry.meter.metrics") as mock_metrics: + Meter.get_meter(config=karapace_container.config()) + mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter") + + +def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} + ) + with ( + patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter, + patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, + ): + reader = Meter.get_metric_reader(config=config) + mock_console_exporter.assert_called_once() + mock_periodic_exporting_metric_reader.assert_called_once_with( + exporter=mock_console_exporter.return_value, + export_interval_millis=10000, + ) + assert reader is mock_periodic_exporting_metric_reader.return_value + + +def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} + ) + with ( + patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter, + patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, + ): + reader = Meter.get_metric_reader(config=config) + mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") + mock_periodic_exporting_metric_reader.assert_called_once_with( + exporter=mock_otlp_exporter.return_value, + export_interval_millis=10000, + ) + assert reader is mock_periodic_exporting_metric_reader.return_value diff --git a/tests/unit/schema_registry/telemetry/test_middleware.py b/tests/unit/schema_registry/telemetry/test_middleware.py index ecbe79307..bbabe449d 100644 --- a/tests/unit/schema_registry/telemetry/test_middleware.py +++ b/tests/unit/schema_registry/telemetry/test_middleware.py @@ -8,9 +8,10 @@ from _pytest.logging import LogCaptureFixture from fastapi import FastAPI, Request, Response from opentelemetry.trace import SpanKind +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware from schema_registry.telemetry.tracer import Tracer -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, call, MagicMock, patch import logging @@ -31,10 +32,11 @@ def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None: async def test_telemetry_middleware() -> None: tracer = MagicMock(spec=Tracer) + meter = MagicMock(spec=Meter, START_TIME_KEY="start_time") request_mock = AsyncMock(spec=Request) request_mock.method = "GET" - request_mock.url.path = "/test" + request_mock.url.path = "/test/inner-path" response_mock = AsyncMock(spec=Response) response_mock.status_code = 200 @@ -42,15 +44,52 @@ async def test_telemetry_middleware() -> None: call_next = AsyncMock() call_next.return_value = response_mock - response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer) - span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value - - tracer.get_tracer.assert_called_once() - tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) - tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) - tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) - - # Check that the request handler is called - call_next.assert_awaited_once_with(request_mock) - - assert response == response_mock + with patch("schema_registry.telemetry.middleware.time.monotonic", return_value=1): + response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer, meter=meter) + span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value + + tracer.get_tracer.assert_called_once() + tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) + tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) + tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request_mock) + + span.add_event.assert_has_calls( + [ + call("Creating metering resources"), + call("Metering requests in progress (increase)"), + call("Calling request handler"), + call("Metering request duration"), + call("Metering total requests"), + call("Metering requests in progress (decrease)"), + ] + ) + + meter.get_meter.assert_has_calls( + [ + call(), + call().create_up_down_counter( + name="karapace_http_requests_in_progress", description="In-progress requests for HTTP/TCP Protocol" + ), + call(), + call().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ), + call(), + call().create_counter( + name="karapace_http_requests_total", description="Total Request Count for HTTP/TCP Protocol" + ), + call().create_up_down_counter().add(amount=1, attributes={"method": "GET", "path": "/test/inner-path"}), + call().create_histogram().record(amount=0, attributes={"method": "GET", "path": "/test/inner-path"}), + call() + .create_counter() + .add(amount=1, attributes={"method": "GET", "path": "/test/inner-path", "status": 200}), + call().create_up_down_counter().add(amount=-1, attributes={"method": "GET", "path": "/test/inner-path"}), + ] + ) + + assert response == response_mock