From 5d241606ee6e71dbbade6a35e96b537278ca7c4b Mon Sep 17 00:00:00 2001 From: Roma Frolov Date: Fri, 15 Nov 2024 19:46:24 +0300 Subject: [PATCH 1/4] fixed a bug when using one register for several middleware --- faststream/prometheus/middleware.py | 20 +++++++++++++------ tests/prometheus/basic.py | 31 ++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/faststream/prometheus/middleware.py b/faststream/prometheus/middleware.py index 575d846342..422e48ef92 100644 --- a/faststream/prometheus/middleware.py +++ b/faststream/prometheus/middleware.py @@ -1,5 +1,5 @@ import time -from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Optional, Sequence from faststream import BaseMiddleware from faststream.prometheus.consts import ( @@ -167,6 +167,8 @@ async def publish_scope( class BasePrometheusMiddleware: + _container_by_registry: ClassVar[Dict["CollectorRegistry", MetricsContainer]] = {} + __slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory") def __init__( @@ -184,11 +186,17 @@ def __init__( app_name = metrics_prefix self._settings_provider_factory = settings_provider_factory - self._metrics_container = MetricsContainer( - registry, - metrics_prefix=metrics_prefix, - received_messages_size_buckets=received_messages_size_buckets, - ) + + if registry in self._container_by_registry: + self._metrics_container = self._container_by_registry[registry] + else: + self._metrics_container = MetricsContainer( + registry, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) + self._container_by_registry[registry] = self._metrics_container + self._metrics_manager = MetricsManager( self._metrics_container, app_name=app_name, diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py index f2d9a5d6cf..4ae11aaea5 100644 --- a/tests/prometheus/basic.py +++ b/tests/prometheus/basic.py @@ -11,6 +11,7 @@ from faststream.prometheus.middleware import ( PROCESSING_STATUS_BY_ACK_STATUS, PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, + BasePrometheusMiddleware, ) from faststream.prometheus.types import ProcessingStatus from tests.brokers.base.basic import BaseTestcaseConfig @@ -21,7 +22,7 @@ class LocalPrometheusTestcase(BaseTestcaseConfig): def get_broker(self, apply_types=False, **kwargs): raise NotImplementedError - def get_middleware(self, **kwargs): + def get_middleware(self, **kwargs) -> BasePrometheusMiddleware: raise NotImplementedError @staticmethod @@ -202,3 +203,31 @@ def assert_publish_metrics(self, metrics_manager: Any): status="success", ), ] + + async def test_one_registry_for_some_middlewares( + self, event: asyncio.Event, queue: str + ) -> None: + registry = CollectorRegistry() + + middleware_1 = self.get_middleware(registry=registry) + middleware_2 = self.get_middleware(registry=registry) + broker_1 = self.get_broker(apply_types=True, middlewares=(middleware_1,)) + broker_2 = self.get_broker(apply_types=True, middlewares=(middleware_2,)) + + args, kwargs = self.get_subscriber_params(queue) + + @broker_1.subscriber(*args, **kwargs) + async def handler(): + event.set() + + async with broker_1, broker_2: + await broker_1.start() + await broker_2.start() + tasks = ( + asyncio.create_task(broker_2.publish("hello", queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + assert middleware_1._metrics_container is middleware_2._metrics_container From acc77a6957ae56d31d9fc3dab97a9ab2ad9f2dac Mon Sep 17 00:00:00 2001 From: Roma Frolov Date: Mon, 18 Nov 2024 23:02:44 +0300 Subject: [PATCH 2/4] changed the way of metrics registration --- faststream/prometheus/container.py | 50 ++++++++++++++++++++++------- faststream/prometheus/middleware.py | 20 ++++-------- tests/prometheus/basic.py | 25 ++++----------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py index 6b5f813f63..c0c6dd6e21 100644 --- a/faststream/prometheus/container.py +++ b/faststream/prometheus/container.py @@ -1,6 +1,10 @@ -from typing import Optional, Sequence +from typing import TYPE_CHECKING, Optional, Sequence, Union -from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from prometheus_client import Counter, Gauge, Histogram + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + from prometheus_client.metrics import MetricWrapperBase class MetricsContainer: @@ -43,58 +47,82 @@ def __init__( self._registry = registry self._metrics_prefix = metrics_prefix - self.received_messages_total = Counter( + self.received_messages_total = self._get_registered_metric( + f"{metrics_prefix}_received_messages_total" + ) or Counter( name=f"{metrics_prefix}_received_messages_total", documentation="Count of received messages by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_messages_size_bytes = Histogram( + self.received_messages_size_bytes = self._get_registered_metric( + f"{metrics_prefix}_received_messages_size_bytes" + ) or Histogram( name=f"{metrics_prefix}_received_messages_size_bytes", documentation="Histogram of received messages size in bytes by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS, ) - self.received_messages_in_process = Gauge( + self.received_messages_in_process = self._get_registered_metric( + f"{metrics_prefix}_received_messages_in_process" + ) or Gauge( name=f"{metrics_prefix}_received_messages_in_process", documentation="Gauge of received messages in process by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_total = Counter( + self.received_processed_messages_total = self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_total" + ) or Counter( name=f"{metrics_prefix}_received_processed_messages_total", documentation="Count of received processed messages by broker, handler and status", labelnames=["app_name", "broker", "handler", "status"], registry=registry, ) - self.received_processed_messages_duration_seconds = Histogram( + self.received_processed_messages_duration_seconds = self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_duration_seconds" + ) or Histogram( name=f"{metrics_prefix}_received_processed_messages_duration_seconds", documentation="Histogram of received processed messages duration in seconds by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_exceptions_total = Counter( + self.received_processed_messages_exceptions_total = self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_exceptions_total" + ) or Counter( name=f"{metrics_prefix}_received_processed_messages_exceptions_total", documentation="Count of received processed messages exceptions by broker, handler and exception_type", labelnames=["app_name", "broker", "handler", "exception_type"], registry=registry, ) - self.published_messages_total = Counter( + self.published_messages_total = self._get_registered_metric( + f"{metrics_prefix}_published_messages_total" + ) or Counter( name=f"{metrics_prefix}_published_messages_total", documentation="Count of published messages by destination and status", labelnames=["app_name", "broker", "destination", "status"], registry=registry, ) - self.published_messages_duration_seconds = Histogram( + self.published_messages_duration_seconds = self._get_registered_metric( + f"{metrics_prefix}_published_messages_duration_seconds" + ) or Histogram( name=f"{metrics_prefix}_published_messages_duration_seconds", documentation="Histogram of published messages duration in seconds by broker and destination", labelnames=["app_name", "broker", "destination"], registry=registry, ) - self.published_messages_exceptions_total = Counter( + + self.published_messages_exceptions_total = self._get_registered_metric( + f"{metrics_prefix}_published_messages_exceptions_total" + ) or Counter( name=f"{metrics_prefix}_published_messages_exceptions_total", documentation="Count of published messages exceptions by broker, destination and exception_type", labelnames=["app_name", "broker", "destination", "exception_type"], registry=registry, ) + + def _get_registered_metric( + self, metric_name: str + ) -> Union["MetricWrapperBase", None]: + return self._registry._names_to_collectors.get(metric_name) diff --git a/faststream/prometheus/middleware.py b/faststream/prometheus/middleware.py index 422e48ef92..575d846342 100644 --- a/faststream/prometheus/middleware.py +++ b/faststream/prometheus/middleware.py @@ -1,5 +1,5 @@ import time -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Optional, Sequence +from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence from faststream import BaseMiddleware from faststream.prometheus.consts import ( @@ -167,8 +167,6 @@ async def publish_scope( class BasePrometheusMiddleware: - _container_by_registry: ClassVar[Dict["CollectorRegistry", MetricsContainer]] = {} - __slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory") def __init__( @@ -186,17 +184,11 @@ def __init__( app_name = metrics_prefix self._settings_provider_factory = settings_provider_factory - - if registry in self._container_by_registry: - self._metrics_container = self._container_by_registry[registry] - else: - self._metrics_container = MetricsContainer( - registry, - metrics_prefix=metrics_prefix, - received_messages_size_buckets=received_messages_size_buckets, - ) - self._container_by_registry[registry] = self._metrics_container - + self._metrics_container = MetricsContainer( + registry, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) self._metrics_manager = MetricsManager( self._metrics_container, app_name=app_name, diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py index 4ae11aaea5..1d749af1ee 100644 --- a/tests/prometheus/basic.py +++ b/tests/prometheus/basic.py @@ -211,23 +211,10 @@ async def test_one_registry_for_some_middlewares( middleware_1 = self.get_middleware(registry=registry) middleware_2 = self.get_middleware(registry=registry) - broker_1 = self.get_broker(apply_types=True, middlewares=(middleware_1,)) - broker_2 = self.get_broker(apply_types=True, middlewares=(middleware_2,)) + self.get_broker(middlewares=(middleware_1,)) + self.get_broker(middlewares=(middleware_2,)) - args, kwargs = self.get_subscriber_params(queue) - - @broker_1.subscriber(*args, **kwargs) - async def handler(): - event.set() - - async with broker_1, broker_2: - await broker_1.start() - await broker_2.start() - tasks = ( - asyncio.create_task(broker_2.publish("hello", queue)), - asyncio.create_task(event.wait()), - ) - await asyncio.wait(tasks, timeout=self.timeout) - - assert event.is_set() - assert middleware_1._metrics_container is middleware_2._metrics_container + assert ( + middleware_1._metrics_container.received_messages_total + is middleware_2._metrics_container.received_messages_total + ) From c5ffc88dc1c53467e27124791b0bc07faa256042 Mon Sep 17 00:00:00 2001 From: Roma Frolov Date: Mon, 18 Nov 2024 23:10:14 +0300 Subject: [PATCH 3/4] fixed types --- faststream/prometheus/container.py | 67 ++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py index c0c6dd6e21..fd972bdd67 100644 --- a/faststream/prometheus/container.py +++ b/faststream/prometheus/container.py @@ -1,10 +1,10 @@ -from typing import TYPE_CHECKING, Optional, Sequence, Union +from typing import TYPE_CHECKING, Optional, Sequence, Union, cast from prometheus_client import Counter, Gauge, Histogram if TYPE_CHECKING: from prometheus_client import CollectorRegistry - from prometheus_client.metrics import MetricWrapperBase + from prometheus_client.registry import Collector class MetricsContainer: @@ -47,16 +47,20 @@ def __init__( self._registry = registry self._metrics_prefix = metrics_prefix - self.received_messages_total = self._get_registered_metric( - f"{metrics_prefix}_received_messages_total" + self.received_messages_total = cast( + Counter, + self._get_registered_metric(f"{metrics_prefix}_received_messages_total"), ) or Counter( name=f"{metrics_prefix}_received_messages_total", documentation="Count of received messages by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_messages_size_bytes = self._get_registered_metric( - f"{metrics_prefix}_received_messages_size_bytes" + self.received_messages_size_bytes = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_received_messages_size_bytes" + ), ) or Histogram( name=f"{metrics_prefix}_received_messages_size_bytes", documentation="Histogram of received messages size in bytes by broker and handler", @@ -64,48 +68,64 @@ def __init__( registry=registry, buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS, ) - self.received_messages_in_process = self._get_registered_metric( - f"{metrics_prefix}_received_messages_in_process" + self.received_messages_in_process = cast( + Gauge, + self._get_registered_metric( + f"{metrics_prefix}_received_messages_in_process" + ), ) or Gauge( name=f"{metrics_prefix}_received_messages_in_process", documentation="Gauge of received messages in process by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_total = self._get_registered_metric( - f"{metrics_prefix}_received_processed_messages_total" + self.received_processed_messages_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_total" + ), ) or Counter( name=f"{metrics_prefix}_received_processed_messages_total", documentation="Count of received processed messages by broker, handler and status", labelnames=["app_name", "broker", "handler", "status"], registry=registry, ) - self.received_processed_messages_duration_seconds = self._get_registered_metric( - f"{metrics_prefix}_received_processed_messages_duration_seconds" + self.received_processed_messages_duration_seconds = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_duration_seconds" + ), ) or Histogram( name=f"{metrics_prefix}_received_processed_messages_duration_seconds", documentation="Histogram of received processed messages duration in seconds by broker and handler", labelnames=["app_name", "broker", "handler"], registry=registry, ) - self.received_processed_messages_exceptions_total = self._get_registered_metric( - f"{metrics_prefix}_received_processed_messages_exceptions_total" + self.received_processed_messages_exceptions_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_received_processed_messages_exceptions_total" + ), ) or Counter( name=f"{metrics_prefix}_received_processed_messages_exceptions_total", documentation="Count of received processed messages exceptions by broker, handler and exception_type", labelnames=["app_name", "broker", "handler", "exception_type"], registry=registry, ) - self.published_messages_total = self._get_registered_metric( - f"{metrics_prefix}_published_messages_total" + self.published_messages_total = cast( + Counter, + self._get_registered_metric(f"{metrics_prefix}_published_messages_total"), ) or Counter( name=f"{metrics_prefix}_published_messages_total", documentation="Count of published messages by destination and status", labelnames=["app_name", "broker", "destination", "status"], registry=registry, ) - self.published_messages_duration_seconds = self._get_registered_metric( - f"{metrics_prefix}_published_messages_duration_seconds" + self.published_messages_duration_seconds = cast( + Histogram, + self._get_registered_metric( + f"{metrics_prefix}_published_messages_duration_seconds" + ), ) or Histogram( name=f"{metrics_prefix}_published_messages_duration_seconds", documentation="Histogram of published messages duration in seconds by broker and destination", @@ -113,8 +133,11 @@ def __init__( registry=registry, ) - self.published_messages_exceptions_total = self._get_registered_metric( - f"{metrics_prefix}_published_messages_exceptions_total" + self.published_messages_exceptions_total = cast( + Counter, + self._get_registered_metric( + f"{metrics_prefix}_published_messages_exceptions_total" + ), ) or Counter( name=f"{metrics_prefix}_published_messages_exceptions_total", documentation="Count of published messages exceptions by broker, destination and exception_type", @@ -122,7 +145,5 @@ def __init__( registry=registry, ) - def _get_registered_metric( - self, metric_name: str - ) -> Union["MetricWrapperBase", None]: + def _get_registered_metric(self, metric_name: str) -> Union["Collector", None]: return self._registry._names_to_collectors.get(metric_name) From 880abab457af2c85956d1e393e026fe8bf8e5636 Mon Sep 17 00:00:00 2001 From: Roma Frolov Date: Mon, 18 Nov 2024 23:15:57 +0300 Subject: [PATCH 4/4] spaces for easy reading --- faststream/prometheus/container.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py index fd972bdd67..6ac764866e 100644 --- a/faststream/prometheus/container.py +++ b/faststream/prometheus/container.py @@ -56,6 +56,7 @@ def __init__( labelnames=["app_name", "broker", "handler"], registry=registry, ) + self.received_messages_size_bytes = cast( Histogram, self._get_registered_metric( @@ -68,6 +69,7 @@ def __init__( registry=registry, buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS, ) + self.received_messages_in_process = cast( Gauge, self._get_registered_metric( @@ -79,6 +81,7 @@ def __init__( labelnames=["app_name", "broker", "handler"], registry=registry, ) + self.received_processed_messages_total = cast( Counter, self._get_registered_metric( @@ -90,6 +93,7 @@ def __init__( labelnames=["app_name", "broker", "handler", "status"], registry=registry, ) + self.received_processed_messages_duration_seconds = cast( Histogram, self._get_registered_metric( @@ -101,6 +105,7 @@ def __init__( labelnames=["app_name", "broker", "handler"], registry=registry, ) + self.received_processed_messages_exceptions_total = cast( Counter, self._get_registered_metric( @@ -112,6 +117,7 @@ def __init__( labelnames=["app_name", "broker", "handler", "exception_type"], registry=registry, ) + self.published_messages_total = cast( Counter, self._get_registered_metric(f"{metrics_prefix}_published_messages_total"), @@ -121,6 +127,7 @@ def __init__( labelnames=["app_name", "broker", "destination", "status"], registry=registry, ) + self.published_messages_duration_seconds = cast( Histogram, self._get_registered_metric(