Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed a bug when using one register for several middleware #1921

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions faststream/prometheus/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional, Sequence
from typing import TYPE_CHECKING, Optional, Sequence, Union, cast

from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import Counter, Gauge, Histogram

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
from prometheus_client.registry import Collector


class MetricsContainer:
Expand Down Expand Up @@ -43,58 +47,110 @@ def __init__(
self._registry = registry
self._metrics_prefix = metrics_prefix

self.received_messages_total = Counter(
self.received_messages_total = cast(
Counter,
self._get_registered_metric(f"{metrics_prefix}_received_messages_total"),
) or Counter(
name=f"{metrics_prefix}_received_messages_total",
documentation="Count of received messages by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_messages_size_bytes = Histogram(

self.received_messages_size_bytes = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_received_messages_size_bytes"
),
) or Histogram(
name=f"{metrics_prefix}_received_messages_size_bytes",
documentation="Histogram of received messages size in bytes by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS,
)
self.received_messages_in_process = Gauge(

self.received_messages_in_process = cast(
Gauge,
self._get_registered_metric(
f"{metrics_prefix}_received_messages_in_process"
),
) or Gauge(
name=f"{metrics_prefix}_received_messages_in_process",
documentation="Gauge of received messages in process by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_total = Counter(

self.received_processed_messages_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_total"
),
) or Counter(
name=f"{metrics_prefix}_received_processed_messages_total",
documentation="Count of received processed messages by broker, handler and status",
labelnames=["app_name", "broker", "handler", "status"],
registry=registry,
)
self.received_processed_messages_duration_seconds = Histogram(

self.received_processed_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_duration_seconds"
),
) or Histogram(
name=f"{metrics_prefix}_received_processed_messages_duration_seconds",
documentation="Histogram of received processed messages duration in seconds by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_exceptions_total = Counter(

self.received_processed_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_received_processed_messages_exceptions_total"
),
) or Counter(
name=f"{metrics_prefix}_received_processed_messages_exceptions_total",
documentation="Count of received processed messages exceptions by broker, handler and exception_type",
labelnames=["app_name", "broker", "handler", "exception_type"],
registry=registry,
)
self.published_messages_total = Counter(

self.published_messages_total = cast(
Counter,
self._get_registered_metric(f"{metrics_prefix}_published_messages_total"),
) or Counter(
name=f"{metrics_prefix}_published_messages_total",
documentation="Count of published messages by destination and status",
labelnames=["app_name", "broker", "destination", "status"],
registry=registry,
)
self.published_messages_duration_seconds = Histogram(

self.published_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(
f"{metrics_prefix}_published_messages_duration_seconds"
),
) or Histogram(
name=f"{metrics_prefix}_published_messages_duration_seconds",
documentation="Histogram of published messages duration in seconds by broker and destination",
labelnames=["app_name", "broker", "destination"],
registry=registry,
)
self.published_messages_exceptions_total = Counter(

self.published_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(
f"{metrics_prefix}_published_messages_exceptions_total"
),
) or Counter(
name=f"{metrics_prefix}_published_messages_exceptions_total",
documentation="Count of published messages exceptions by broker, destination and exception_type",
labelnames=["app_name", "broker", "destination", "exception_type"],
registry=registry,
)

def _get_registered_metric(self, metric_name: str) -> Union["Collector", None]:
return self._registry._names_to_collectors.get(metric_name)
18 changes: 17 additions & 1 deletion tests/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from faststream.prometheus.middleware import (
PROCESSING_STATUS_BY_ACK_STATUS,
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
BasePrometheusMiddleware,
)
from faststream.prometheus.types import ProcessingStatus
from tests.brokers.base.basic import BaseTestcaseConfig
Expand All @@ -21,7 +22,7 @@ class LocalPrometheusTestcase(BaseTestcaseConfig):
def get_broker(self, apply_types=False, **kwargs):
raise NotImplementedError

def get_middleware(self, **kwargs):
def get_middleware(self, **kwargs) -> BasePrometheusMiddleware:
raise NotImplementedError

@staticmethod
Expand Down Expand Up @@ -202,3 +203,18 @@ def assert_publish_metrics(self, metrics_manager: Any):
status="success",
),
]

async def test_one_registry_for_some_middlewares(
Lancetnik marked this conversation as resolved.
Show resolved Hide resolved
self, event: asyncio.Event, queue: str
) -> None:
registry = CollectorRegistry()

middleware_1 = self.get_middleware(registry=registry)
middleware_2 = self.get_middleware(registry=registry)
self.get_broker(middlewares=(middleware_1,))
self.get_broker(middlewares=(middleware_2,))

assert (
middleware_1._metrics_container.received_messages_total
is middleware_2._metrics_container.received_messages_total
)
Loading