Skip to content

Commit

Permalink
edit message count in process & fix settings provider for Nats KV and…
Browse files Browse the repository at this point in the history
… Nats OS
  • Loading branch information
roma-frolov committed Sep 17, 2024
1 parent e6db1c8 commit 2d12339
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
10 changes: 8 additions & 2 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ async def consume_scope(
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
if self._settings_provider is None:
return await call_next(msg)

messaging_system = self._settings_provider.messaging_system
consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
destination_name = consume_attrs["destination_name"]
Expand All @@ -116,7 +119,7 @@ async def consume_scope(
self._metrics.received_messages_in_process.labels(
broker=messaging_system,
handler=destination_name,
).inc()
).inc(consume_attrs["messages_count"])

start_time = time.perf_counter()

Expand All @@ -137,7 +140,7 @@ async def consume_scope(
self._metrics.received_messages_in_process.labels(
broker=messaging_system,
handler=destination_name,
).dec()
).dec(consume_attrs["messages_count"])

status = ProcessingStatus.acked

Expand Down Expand Up @@ -170,6 +173,9 @@ async def publish_scope(
*args: Any,
**kwargs: Any,
) -> Any:
if self._settings_provider is None:
return await call_next(msg, *args, **kwargs)

err: Optional[Exception] = None
start_time = time.perf_counter()

Expand Down
8 changes: 5 additions & 3 deletions tests/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ def assert_consume_metrics(
broker=settings_provider.messaging_system,
handler=consume_attrs["destination_name"],
),
call().inc(),
call().inc(consume_attrs["messages_count"]),
call(
broker=settings_provider.messaging_system,
handler=consume_attrs["destination_name"],
),
call().dec(),
call().dec(consume_attrs["messages_count"]),
]

assert metrics.received_messages_processing_time.labels.mock_calls == [
Expand All @@ -132,12 +132,14 @@ def assert_consume_metrics(
call().observe(ANY),
]

status = ProcessingStatus.acked

if exception_class:
status = (
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class)
or ProcessingStatus.error
)
else:
elif message.committed:
status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed]

assert metrics.received_processed_messages.labels.mock_calls == [
Expand Down

0 comments on commit 2d12339

Please sign in to comment.