Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed a bug when using one register for several middleware #1921

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Optional, Sequence

from faststream import BaseMiddleware
from faststream.prometheus.consts import (
Expand Down Expand Up @@ -167,6 +167,8 @@ async def publish_scope(


class BasePrometheusMiddleware:
_container_by_registry: ClassVar[Dict["CollectorRegistry", MetricsContainer]] = {}

__slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory")

def __init__(
Expand All @@ -184,11 +186,17 @@ def __init__(
app_name = metrics_prefix

self._settings_provider_factory = settings_provider_factory
self._metrics_container = MetricsContainer(
registry,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
)

if registry in self._container_by_registry:
self._metrics_container = self._container_by_registry[registry]
else:
self._metrics_container = MetricsContainer(
registry,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
)
self._container_by_registry[registry] = self._metrics_container

self._metrics_manager = MetricsManager(
self._metrics_container,
app_name=app_name,
Expand Down
31 changes: 30 additions & 1 deletion tests/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from faststream.prometheus.middleware import (
PROCESSING_STATUS_BY_ACK_STATUS,
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
BasePrometheusMiddleware,
)
from faststream.prometheus.types import ProcessingStatus
from tests.brokers.base.basic import BaseTestcaseConfig
Expand All @@ -21,7 +22,7 @@ class LocalPrometheusTestcase(BaseTestcaseConfig):
def get_broker(self, apply_types=False, **kwargs):
raise NotImplementedError

def get_middleware(self, **kwargs):
def get_middleware(self, **kwargs) -> BasePrometheusMiddleware:
raise NotImplementedError

@staticmethod
Expand Down Expand Up @@ -202,3 +203,31 @@ def assert_publish_metrics(self, metrics_manager: Any):
status="success",
),
]

async def test_one_registry_for_some_middlewares(
Lancetnik marked this conversation as resolved.
Show resolved Hide resolved
self, event: asyncio.Event, queue: str
) -> None:
registry = CollectorRegistry()

middleware_1 = self.get_middleware(registry=registry)
middleware_2 = self.get_middleware(registry=registry)
broker_1 = self.get_broker(apply_types=True, middlewares=(middleware_1,))
broker_2 = self.get_broker(apply_types=True, middlewares=(middleware_2,))

args, kwargs = self.get_subscriber_params(queue)

@broker_1.subscriber(*args, **kwargs)
async def handler():
event.set()

async with broker_1, broker_2:
await broker_1.start()
await broker_2.start()
tasks = (
asyncio.create_task(broker_2.publish("hello", queue)),
asyncio.create_task(event.wait()),
)
await asyncio.wait(tasks, timeout=self.timeout)

assert event.is_set()
assert middleware_1._metrics_container is middleware_2._metrics_container
Loading