Skip to content

Commit

Permalink
metering: add middleware for HTTP metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Dec 19, 2024
1 parent cb6a7dc commit acb6f40
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import schema_registry.routers.mode
import schema_registry.routers.schemas
import schema_registry.routers.subjects
import schema_registry.telemetry.meter
import schema_registry.telemetry.middleware
import schema_registry.telemetry.setup
import schema_registry.telemetry.tracer
Expand All @@ -30,6 +31,7 @@
__name__,
schema_registry.controller,
schema_registry.telemetry.tracer,
schema_registry.telemetry.meter,
]
)

Expand Down
3 changes: 2 additions & 1 deletion src/schema_registry/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from schema_registry.middlewares import setup_middlewares
from schema_registry.registry import KarapaceSchemaRegistry
from schema_registry.routers.setup import setup_routers
from schema_registry.telemetry.setup import setup_tracing
from schema_registry.telemetry.setup import setup_metering, setup_tracing
from typing import AsyncContextManager

import logging
Expand Down Expand Up @@ -59,6 +59,7 @@ def create_karapace_application(
app = FastAPI(lifespan=lifespan) # type: ignore[arg-type]

setup_tracing()
setup_metering()
setup_routers(app=app)
setup_exception_handlers(app=app)
setup_middlewares(app=app)
Expand Down
10 changes: 7 additions & 3 deletions src/schema_registry/telemetry/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.attributes import telemetry_attributes as T
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer


def create_tracing_resource(config: Config) -> Resource:
def create_telemetry_resource(config: Config) -> Resource:
return Resource.create(
{
"service.name": config.telemetry.resource_service_name,
Expand All @@ -26,6 +27,9 @@ def create_tracing_resource(config: Config) -> Resource:

class TelemetryContainer(containers.DeclarativeContainer):
karapace_container = providers.Container(KarapaceContainer)
tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config)
tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource)

telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config)

meter = providers.Singleton(Meter)
tracer = providers.Singleton(Tracer)
tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource)
50 changes: 50 additions & 0 deletions src/schema_registry/telemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from collections.abc import Awaitable, Callable
from dependency_injector.wiring import inject, Provide
from fastapi import FastAPI, Request, Response
from opentelemetry.metrics import Counter, Histogram, UpDownCounter
from opentelemetry.trace import SpanKind
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer

import logging
import time

LOG = logging.getLogger(__name__)

Expand All @@ -20,12 +23,59 @@ async def telemetry_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
tracer: Tracer = Provide[TelemetryContainer.tracer],
meter: Meter = Provide[TelemetryContainer.meter],
) -> Response:
resource = request.url.path.split("/")[1]
with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span:
span.add_event("Creating metering resources")
karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter(
name="karapace_http_requests_in_progress",
description="In-progress requests for HTTP/TCP Protocol",
)
karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram(
unit="seconds",
name="karapace_http_requests_duration_seconds",
description="Request Duration for HTTP/TCP Protocol",
)
karapace_http_requests_total: Counter = meter.get_meter().create_counter(
name="karapace_http_requests_total",
description="Total Request Count for HTTP/TCP Protocol",
)

# Set start time for request
setattr(request.state, meter.START_TIME_KEY, time.monotonic())

# Extract request labels
path = request.url.path
method = request.method

# Increment requests in progress before response handler
span.add_event("Metering requests in progress (increase)")
karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path})

# Call request handler
tracer.update_span_with_request(request=request, span=span)
span.add_event("Calling request handler")
response: Response = await call_next(request)
tracer.update_span_with_response(response=response, span=span)

# Instrument request duration
span.add_event("Metering request duration")
karapace_http_requests_duration_seconds.record(
amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)),
attributes={"method": method, "path": path},
)

# Instrument total requests
span.add_event("Metering total requests")
karapace_http_requests_total.add(
amount=1, attributes={"method": method, "path": path, "status": response.status_code}
)

# Decrement requests in progress after response handler
span.add_event("Metering requests in progress (decrease)")
karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path})

return response


Expand Down
14 changes: 13 additions & 1 deletion src/schema_registry/telemetry/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"""

from dependency_injector.wiring import inject, Provide
from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer

import logging
Expand All @@ -22,3 +25,12 @@ def setup_tracing(
LOG.info("Setting OTel tracing provider")
tracer_provider.add_span_processor(tracer.get_span_processor())
trace.set_tracer_provider(tracer_provider)


@inject
def setup_metering(
meter: Meter = Provide[TelemetryContainer.meter],
telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource],
) -> None:
LOG.info("Setting OTel meter provider")
metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()]))
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
import re
import schema_registry.controller
import schema_registry.telemetry.meter
import schema_registry.telemetry.middleware
import schema_registry.telemetry.setup
import schema_registry.telemetry.tracer
Expand Down
18 changes: 9 additions & 9 deletions tests/unit/schema_registry/telemetry/test_meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@ def test_meter(karapace_container: KarapaceContainer):
mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter")


def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None:
def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None:
with (
patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter,
patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter,
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader,
):
karapace_container.config().telemetry.otel_endpoint_url = "http://otel:4317"
reader = Meter.get_metric_reader(config=karapace_container.config())
mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317")
mock_console_exporter.assert_called_once()
mock_periodic_exporting_metric_reader.assert_called_once_with(
exporter=mock_otlp_exporter.return_value,
exporter=mock_console_exporter.return_value,
export_interval_millis=10000,
)
assert reader is mock_periodic_exporting_metric_reader.return_value


def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None:
def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None:
with (
patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter,
patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter,
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader,
):
karapace_container.config().telemetry.otel_endpoint_url = "http://otel:4317"
reader = Meter.get_metric_reader(config=karapace_container.config())
mock_console_exporter.assert_called_once()
mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317")
mock_periodic_exporting_metric_reader.assert_called_once_with(
exporter=mock_console_exporter.return_value,
exporter=mock_otlp_exporter.return_value,
export_interval_millis=10000,
)
assert reader is mock_periodic_exporting_metric_reader.return_value
67 changes: 53 additions & 14 deletions tests/unit/schema_registry/telemetry/test_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from _pytest.logging import LogCaptureFixture
from fastapi import FastAPI, Request, Response
from opentelemetry.trace import SpanKind
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware
from schema_registry.telemetry.tracer import Tracer
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, call, MagicMock, patch

import logging

Expand All @@ -31,26 +32,64 @@ def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None:

async def test_telemetry_middleware() -> None:
tracer = MagicMock(spec=Tracer)
meter = MagicMock(spec=Meter, START_TIME_KEY="start_time")

request_mock = AsyncMock(spec=Request)
request_mock.method = "GET"
request_mock.url.path = "/test"
request_mock.url.path = "/test/inner-path"

response_mock = AsyncMock(spec=Response)
response_mock.status_code = 200

call_next = AsyncMock()
call_next.return_value = response_mock

response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer)
span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value

tracer.get_tracer.assert_called_once()
tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER)
tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span)
tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span)

# Check that the request handler is called
call_next.assert_awaited_once_with(request_mock)

assert response == response_mock
with patch("schema_registry.telemetry.middleware.time.monotonic", return_value=1):
response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer, meter=meter)
span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value

tracer.get_tracer.assert_called_once()
tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER)
tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span)
tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span)

# Check that the request handler is called
call_next.assert_awaited_once_with(request_mock)

span.add_event.assert_has_calls(
[
call("Creating metering resources"),
call("Metering requests in progress (increase)"),
call("Calling request handler"),
call("Metering request duration"),
call("Metering total requests"),
call("Metering requests in progress (decrease)"),
]
)

meter.get_meter.assert_has_calls(
[
call(),
call().create_up_down_counter(
name="karapace_http_requests_in_progress", description="In-progress requests for HTTP/TCP Protocol"
),
call(),
call().create_histogram(
unit="seconds",
name="karapace_http_requests_duration_seconds",
description="Request Duration for HTTP/TCP Protocol",
),
call(),
call().create_counter(
name="karapace_http_requests_total", description="Total Request Count for HTTP/TCP Protocol"
),
call().create_up_down_counter().add(amount=1, attributes={"method": "GET", "path": "/test/inner-path"}),
call().create_histogram().record(amount=0, attributes={"method": "GET", "path": "/test/inner-path"}),
call()
.create_counter()
.add(amount=1, attributes={"method": "GET", "path": "/test/inner-path", "status": 200}),
call().create_up_down_counter().add(amount=-1, attributes={"method": "GET", "path": "/test/inner-path"}),
]
)

assert response == response_mock

0 comments on commit acb6f40

Please sign in to comment.