diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 4a92d9649..4f2dc0b95 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -21,6 +21,7 @@ import schema_registry.routers.mode import schema_registry.routers.schemas import schema_registry.routers.subjects +import schema_registry.telemetry.meter import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -34,6 +35,7 @@ __name__, schema_registry.controller, schema_registry.telemetry.tracer, + schema_registry.telemetry.meter, ] ) diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py index b02c131c6..472064f94 100644 --- a/src/schema_registry/factory.py +++ b/src/schema_registry/factory.py @@ -17,7 +17,7 @@ from schema_registry.middlewares import setup_middlewares from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.setup import setup_routers -from schema_registry.telemetry.setup import setup_tracing +from schema_registry.telemetry.setup import setup_metering, setup_tracing from typing import AsyncContextManager import logging @@ -59,6 +59,7 @@ def create_karapace_application( app = FastAPI(lifespan=lifespan) # type: ignore[arg-type] setup_tracing() + setup_metering() setup_routers(app=app) setup_exception_handlers(app=app) setup_middlewares(app=app) diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py index d9d53ea2f..d60bdc102 100644 --- a/src/schema_registry/telemetry/container.py +++ b/src/schema_registry/telemetry/container.py @@ -9,10 +9,11 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.semconv.attributes import telemetry_attributes as T +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer -def create_tracing_resource(config: Config) -> Resource: +def create_telemetry_resource(config: Config) -> Resource: return Resource.create( { "service.name": config.telemetry.resource_service_name, @@ -26,6 +27,9 @@ def create_tracing_resource(config: Config) -> Resource: class TelemetryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) - tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) - tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) + + telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config) + + meter = providers.Singleton(Meter) tracer = providers.Singleton(Tracer) + tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource) diff --git a/src/schema_registry/telemetry/middleware.py b/src/schema_registry/telemetry/middleware.py index c6d14bbe9..56f0809ba 100644 --- a/src/schema_registry/telemetry/middleware.py +++ b/src/schema_registry/telemetry/middleware.py @@ -6,11 +6,14 @@ from collections.abc import Awaitable, Callable from dependency_injector.wiring import inject, Provide from fastapi import FastAPI, Request, Response +from opentelemetry.metrics import Counter, Histogram, UpDownCounter from opentelemetry.trace import SpanKind from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging +import time LOG = logging.getLogger(__name__) @@ -20,12 +23,59 @@ async def telemetry_middleware( request: Request, call_next: Callable[[Request], Awaitable[Response]], tracer: Tracer = Provide[TelemetryContainer.tracer], + meter: Meter = Provide[TelemetryContainer.meter], ) -> Response: resource = request.url.path.split("/")[1] with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: + span.add_event("Creating metering resources") + karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter( + name="karapace_http_requests_in_progress", + description="In-progress requests for HTTP/TCP Protocol", + ) + karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ) + karapace_http_requests_total: Counter = meter.get_meter().create_counter( + name="karapace_http_requests_total", + description="Total Request Count for HTTP/TCP Protocol", + ) + + # Set start time for request + setattr(request.state, meter.START_TIME_KEY, time.monotonic()) + + # Extract request labels + path = request.url.path + method = request.method + + # Increment requests in progress before response handler + span.add_event("Metering requests in progress (increase)") + karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path}) + + # Call request handler tracer.update_span_with_request(request=request, span=span) + span.add_event("Calling request handler") response: Response = await call_next(request) tracer.update_span_with_response(response=response, span=span) + + # Instrument request duration + span.add_event("Metering request duration") + karapace_http_requests_duration_seconds.record( + amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)), + attributes={"method": method, "path": path}, + ) + + # Instrument total requests + span.add_event("Metering total requests") + karapace_http_requests_total.add( + amount=1, attributes={"method": method, "path": path, "status": response.status_code} + ) + + # Decrement requests in progress after response handler + span.add_event("Metering requests in progress (decrease)") + karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path}) + return response diff --git a/src/schema_registry/telemetry/setup.py b/src/schema_registry/telemetry/setup.py index 30b423902..294f85c50 100644 --- a/src/schema_registry/telemetry/setup.py +++ b/src/schema_registry/telemetry/setup.py @@ -4,9 +4,12 @@ """ from dependency_injector.wiring import inject, Provide -from opentelemetry import trace +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.tracer import Tracer import logging @@ -22,3 +25,12 @@ def setup_tracing( LOG.info("Setting OTel tracing provider") tracer_provider.add_span_processor(tracer.get_span_processor()) trace.set_tracer_provider(tracer_provider) + + +@inject +def setup_metering( + meter: Meter = Provide[TelemetryContainer.meter], + telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource], +) -> None: + LOG.info("Setting OTel meter provider") + metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()])) diff --git a/tests/unit/schema_registry/telemetry/test_middleware.py b/tests/unit/schema_registry/telemetry/test_middleware.py index ecbe79307..bbabe449d 100644 --- a/tests/unit/schema_registry/telemetry/test_middleware.py +++ b/tests/unit/schema_registry/telemetry/test_middleware.py @@ -8,9 +8,10 @@ from _pytest.logging import LogCaptureFixture from fastapi import FastAPI, Request, Response from opentelemetry.trace import SpanKind +from schema_registry.telemetry.meter import Meter from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware from schema_registry.telemetry.tracer import Tracer -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, call, MagicMock, patch import logging @@ -31,10 +32,11 @@ def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None: async def test_telemetry_middleware() -> None: tracer = MagicMock(spec=Tracer) + meter = MagicMock(spec=Meter, START_TIME_KEY="start_time") request_mock = AsyncMock(spec=Request) request_mock.method = "GET" - request_mock.url.path = "/test" + request_mock.url.path = "/test/inner-path" response_mock = AsyncMock(spec=Response) response_mock.status_code = 200 @@ -42,15 +44,52 @@ async def test_telemetry_middleware() -> None: call_next = AsyncMock() call_next.return_value = response_mock - response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer) - span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value - - tracer.get_tracer.assert_called_once() - tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) - tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) - tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) - - # Check that the request handler is called - call_next.assert_awaited_once_with(request_mock) - - assert response == response_mock + with patch("schema_registry.telemetry.middleware.time.monotonic", return_value=1): + response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer, meter=meter) + span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value + + tracer.get_tracer.assert_called_once() + tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) + tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) + tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request_mock) + + span.add_event.assert_has_calls( + [ + call("Creating metering resources"), + call("Metering requests in progress (increase)"), + call("Calling request handler"), + call("Metering request duration"), + call("Metering total requests"), + call("Metering requests in progress (decrease)"), + ] + ) + + meter.get_meter.assert_has_calls( + [ + call(), + call().create_up_down_counter( + name="karapace_http_requests_in_progress", description="In-progress requests for HTTP/TCP Protocol" + ), + call(), + call().create_histogram( + unit="seconds", + name="karapace_http_requests_duration_seconds", + description="Request Duration for HTTP/TCP Protocol", + ), + call(), + call().create_counter( + name="karapace_http_requests_total", description="Total Request Count for HTTP/TCP Protocol" + ), + call().create_up_down_counter().add(amount=1, attributes={"method": "GET", "path": "/test/inner-path"}), + call().create_histogram().record(amount=0, attributes={"method": "GET", "path": "/test/inner-path"}), + call() + .create_counter() + .add(amount=1, attributes={"method": "GET", "path": "/test/inner-path", "status": 200}), + call().create_up_down_counter().add(amount=-1, attributes={"method": "GET", "path": "/test/inner-path"}), + ] + ) + + assert response == response_mock