diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index b3326591ec..30c97ae298 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -407,7 +407,6 @@ def __init__( apply_types=apply_types, validate=validate, ) - self.client_id = client_id self._producer = None diff --git a/faststream/confluent/opentelemetry/middleware.py b/faststream/confluent/opentelemetry/middleware.py index a6d94f9d3f..d8e5906dd3 100644 --- a/faststream/confluent/opentelemetry/middleware.py +++ b/faststream/confluent/opentelemetry/middleware.py @@ -22,4 +22,5 @@ def __init__( tracer_provider=tracer_provider, meter_provider=meter_provider, meter=meter, + include_messages_counters=True, ) diff --git a/faststream/confluent/opentelemetry/provider.py b/faststream/confluent/opentelemetry/provider.py index 93041e1868..6add7330ca 100644 --- a/faststream/confluent/opentelemetry/provider.py +++ b/faststream/confluent/opentelemetry/provider.py @@ -4,6 +4,7 @@ from faststream.broker.types import MsgType from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME if TYPE_CHECKING: from confluent_kafka import Message @@ -57,7 +58,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition(), SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset(), - "messaging.destination_publish.name": msg.raw_message.topic(), + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic(), } if (key := msg.raw_message.key()) is not None: @@ -89,7 +90,7 @@ def get_consume_attrs_from_message( bytearray().join(cast(Sequence[bytes], msg.body)) ), SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition(), - "messaging.destination_publish.name": raw_message.topic(), + MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic(), } return attrs diff --git a/faststream/kafka/opentelemetry/middleware.py b/faststream/kafka/opentelemetry/middleware.py index 99b9074398..2f06486c33 100644 --- a/faststream/kafka/opentelemetry/middleware.py +++ b/faststream/kafka/opentelemetry/middleware.py @@ -22,4 +22,5 @@ def __init__( tracer_provider=tracer_provider, meter_provider=meter_provider, meter=meter, + include_messages_counters=True, ) diff --git a/faststream/kafka/opentelemetry/provider.py b/faststream/kafka/opentelemetry/provider.py index 539166c6ca..b1702b6022 100644 --- a/faststream/kafka/opentelemetry/provider.py +++ b/faststream/kafka/opentelemetry/provider.py @@ -4,6 +4,7 @@ from faststream.broker.types import MsgType from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME if TYPE_CHECKING: from aiokafka import ConsumerRecord @@ -57,7 +58,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition, SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset, - "messaging.destination_publish.name": msg.raw_message.topic, + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic, } if msg.raw_message.key is not None: @@ -90,7 +91,7 @@ def get_consume_attrs_from_message( ), SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message), SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition, - "messaging.destination_publish.name": raw_message.topic, + MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic, } return attrs diff --git a/faststream/nats/opentelemetry/middleware.py b/faststream/nats/opentelemetry/middleware.py index e88904f474..cafd8787d8 100644 --- a/faststream/nats/opentelemetry/middleware.py +++ b/faststream/nats/opentelemetry/middleware.py @@ -20,4 +20,5 @@ def __init__( tracer_provider=tracer_provider, meter_provider=meter_provider, meter=meter, + include_messages_counters=True, ) diff --git a/faststream/nats/opentelemetry/provider.py b/faststream/nats/opentelemetry/provider.py index 0bd4b41765..7aefafed2c 100644 --- a/faststream/nats/opentelemetry/provider.py +++ b/faststream/nats/opentelemetry/provider.py @@ -5,6 +5,7 @@ from faststream.__about__ import SERVICE_NAME from faststream.broker.types import MsgType from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME if TYPE_CHECKING: from nats.aio.msg import Msg @@ -47,7 +48,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), - "messaging.destination_publish.name": msg.raw_message.subject, + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.subject, } @staticmethod @@ -70,7 +71,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message), - "messaging.destination_publish.name": msg.raw_message[0].subject, + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message[0].subject, } @staticmethod diff --git a/faststream/opentelemetry/consts.py b/faststream/opentelemetry/consts.py index ad597f1e99..2436d568ee 100644 --- a/faststream/opentelemetry/consts.py +++ b/faststream/opentelemetry/consts.py @@ -3,3 +3,7 @@ class MessageAction: PUBLISH = "publish" PROCESS = "process" RECEIVE = "receive" + + +ERROR_TYPE = "error.type" +MESSAGING_DESTINATION_PUBLISH_NAME = "messaging.destination_publish.name" diff --git a/faststream/opentelemetry/middleware.py b/faststream/opentelemetry/middleware.py index a656d39cdf..f509b2ae5f 100644 --- a/faststream/opentelemetry/middleware.py +++ b/faststream/opentelemetry/middleware.py @@ -1,11 +1,16 @@ import time +from copy import copy from typing import TYPE_CHECKING, Any, Callable, Optional, Type from opentelemetry import context, metrics, propagate, trace from opentelemetry.semconv.trace import SpanAttributes from faststream import BaseMiddleware -from faststream.opentelemetry.consts import MessageAction +from faststream.opentelemetry.consts import ( + ERROR_TYPE, + MESSAGING_DESTINATION_PUBLISH_NAME, + MessageAction, +) from faststream.opentelemetry.provider import TelemetrySettingsProvider if TYPE_CHECKING: @@ -16,7 +21,7 @@ from opentelemetry.trace import Span, Tracer, TracerProvider from faststream.broker.message import StreamMessage - from faststream.types import AsyncFunc, AsyncFuncAny + from faststream.types import AnyDict, AsyncFunc, AsyncFuncAny _OTEL_SCHEMA = "https://opentelemetry.io/schemas/1.11.0" @@ -28,33 +33,68 @@ def _create_span_name(destination: str, action: str) -> str: class _MetricsContainer: __slots__ = ( - "active_requests_counter", - "duration_histogram", - "consumer_message_size_histogram", - "publisher_message_size_histogram", + "include_messages_counters", + "publish_duration", + "publish_counter", + "process_duration", + "process_counter", ) - def __init__(self, meter: "Meter") -> None: - self.active_requests_counter = meter.create_up_down_counter( - name="faststream.consumer.active_requests", - unit="requests", - description="Measures the number of concurrent messages that are currently in-flight.", + def __init__(self, meter: "Meter", include_messages_counters: bool) -> None: + self.include_messages_counters = include_messages_counters + + self.publish_duration = meter.create_histogram( + name="messaging.publish.duration", + unit="s", + description="Measures the duration of publish operation.", ) - self.duration_histogram = meter.create_histogram( - name="faststream.consumer.duration", + self.process_duration = meter.create_histogram( + name="messaging.process.duration", unit="s", - description="Measures the duration of message processing.", + description="Measures the duration of process operation.", ) - self.consumer_message_size_histogram = meter.create_histogram( - name="faststream.consumer.message_size", - unit="By", - description="Measures the size of consumed messages.", + + if include_messages_counters: + self.process_counter = meter.create_counter( + name="messaging.process.messages", + unit="message", + description="Measures the number of processed messages.", + ) + self.publish_counter = meter.create_counter( + name="messaging.publish.messages", + unit="message", + description="Measures the number of published messages.", + ) + + def observe_publish( + self, attrs: "AnyDict", duration: float, msg_count: int + ) -> None: + self.publish_duration.record( + amount=duration, + attributes=attrs, ) - self.publisher_message_size_histogram = meter.create_histogram( - name="faststream.publisher.message_size", - unit="By", - description="Measures the size of published messages.", + if self.include_messages_counters: + counter_attrs = copy(attrs) + counter_attrs.pop(ERROR_TYPE, None) + self.publish_counter.add( + amount=msg_count, + attributes=counter_attrs, + ) + + def observe_consume( + self, attrs: "AnyDict", duration: float, msg_count: int + ) -> None: + self.process_duration.record( + amount=duration, + attributes=attrs, ) + if self.include_messages_counters: + counter_attrs = copy(attrs) + counter_attrs.pop(ERROR_TYPE, None) + self.process_counter.add( + amount=msg_count, + attributes=counter_attrs, + ) class BaseTelemetryMiddleware(BaseMiddleware): @@ -83,12 +123,16 @@ async def publish_scope( ) -> Any: provider = self.__settings_provider - attributes = provider.get_publish_attrs_from_kwargs(kwargs) - + headers = kwargs.pop("headers", {}) or {} current_context = context.get_current() destination_name = provider.get_publish_destination_name(kwargs) - headers = kwargs.pop("headers", {}) or {} + trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs) + metrics_attributes = { + SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system, + SpanAttributes.MESSAGING_DESTINATION_NAME: destination_name, + } + if self._current_span and self._current_span.is_recording(): current_context = trace.set_span_in_context( self._current_span, current_context @@ -99,27 +143,36 @@ async def publish_scope( create_span = self._tracer.start_span( name=_create_span_name(destination_name, MessageAction.CREATE), kind=trace.SpanKind.PRODUCER, - attributes=attributes, + attributes=trace_attributes, ) current_context = trace.set_span_in_context(create_span) propagate.inject(headers, context=current_context) create_span.end() - with self._tracer.start_as_current_span( - name=_create_span_name(destination_name, MessageAction.PUBLISH), - kind=trace.SpanKind.PRODUCER, - attributes=attributes, - context=current_context, - ) as span: - span.set_attribute( - SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH - ) - result = await call_next(msg, *args, headers=headers, **kwargs) + start_time = time.perf_counter() - self._metrics.publisher_message_size_histogram.record( - len(str(msg) or ""), - attributes, - ) + try: + with self._tracer.start_as_current_span( + name=_create_span_name(destination_name, MessageAction.PUBLISH), + kind=trace.SpanKind.PRODUCER, + attributes=trace_attributes, + context=current_context, + ) as span: + span.set_attribute( + SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH + ) + result = await call_next(msg, *args, headers=headers, **kwargs) + + except Exception as e: + metrics_attributes[ERROR_TYPE] = type(e).__name__ + raise + + finally: + duration = time.perf_counter() - start_time + msg_count = trace_attributes.get( + SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1 + ) + self._metrics.observe_publish(metrics_attributes, duration, msg_count) return result @@ -130,30 +183,33 @@ async def consume_scope( ) -> Any: provider = self.__settings_provider - start_time = time.perf_counter() current_context = propagate.extract(msg.headers) destination_name = provider.get_consume_destination_name(msg) - attributes = provider.get_consume_attrs_from_message(msg) + + trace_attributes = provider.get_consume_attrs_from_message(msg) + metrics_attributes = { + SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system, + MESSAGING_DESTINATION_PUBLISH_NAME: destination_name, + } if not len(current_context): create_span = self._tracer.start_span( name=_create_span_name(destination_name, MessageAction.CREATE), kind=trace.SpanKind.CONSUMER, - attributes=attributes, + attributes=trace_attributes, ) current_context = trace.set_span_in_context(create_span) create_span.end() self._origin_context = current_context - self._metrics.active_requests_counter.add(1, attributes) - self._metrics.consumer_message_size_histogram.record(len(msg.body), attributes) + start_time = time.perf_counter() try: with self._tracer.start_as_current_span( name=_create_span_name(destination_name, MessageAction.PROCESS), kind=trace.SpanKind.CONSUMER, context=current_context, - attributes=attributes, + attributes=trace_attributes, end_on_exit=False, ) as span: span.set_attribute( @@ -165,12 +221,16 @@ async def consume_scope( result = await call_next(msg) context.detach(token) - total_time = time.perf_counter() - start_time - self._metrics.duration_histogram.record( - amount=total_time, attributes=attributes - ) + except Exception as e: + metrics_attributes[ERROR_TYPE] = type(e).__name__ + raise + finally: - self._metrics.active_requests_counter.add(-1, attributes) + duration = time.perf_counter() - start_time + msg_count = trace_attributes.get( + SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1 + ) + self._metrics.observe_consume(metrics_attributes, duration, msg_count) return result @@ -201,10 +261,11 @@ def __init__( tracer_provider: Optional["TracerProvider"] = None, meter_provider: Optional["MeterProvider"] = None, meter: Optional["Meter"] = None, + include_messages_counters: bool = False, ) -> None: self._tracer = _get_tracer(tracer_provider) self._meter = _get_meter(meter_provider, meter) - self._metrics = _MetricsContainer(self._meter) + self._metrics = _MetricsContainer(self._meter, include_messages_counters) self._settings_provider_factory = settings_provider_factory def __call__(self, msg: Optional[Any]) -> BaseMiddleware: diff --git a/faststream/rabbit/opentelemetry/middleware.py b/faststream/rabbit/opentelemetry/middleware.py index 72dba1ff4f..29a553a7f0 100644 --- a/faststream/rabbit/opentelemetry/middleware.py +++ b/faststream/rabbit/opentelemetry/middleware.py @@ -20,4 +20,5 @@ def __init__( tracer_provider=tracer_provider, meter_provider=meter_provider, meter=meter, + include_messages_counters=False, ) diff --git a/faststream/rabbit/opentelemetry/provider.py b/faststream/rabbit/opentelemetry/provider.py index 1e6acb4b41..da62338e70 100644 --- a/faststream/rabbit/opentelemetry/provider.py +++ b/faststream/rabbit/opentelemetry/provider.py @@ -3,6 +3,7 @@ from opentelemetry.semconv.trace import SpanAttributes from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME if TYPE_CHECKING: from aio_pika import IncomingMessage @@ -28,7 +29,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: msg.raw_message.routing_key, "messaging.rabbitmq.message.delivery_tag": msg.raw_message.delivery_tag, - "messaging.destination_publish.name": msg.raw_message.exchange, + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.exchange, } @staticmethod diff --git a/faststream/redis/opentelemetry/middleware.py b/faststream/redis/opentelemetry/middleware.py index 343fe0d29d..1ab6a0ea14 100644 --- a/faststream/redis/opentelemetry/middleware.py +++ b/faststream/redis/opentelemetry/middleware.py @@ -20,4 +20,5 @@ def __init__( tracer_provider=tracer_provider, meter_provider=meter_provider, meter=meter, + include_messages_counters=False, ) diff --git a/faststream/redis/opentelemetry/provider.py b/faststream/redis/opentelemetry/provider.py index d93cf878b6..3524b04465 100644 --- a/faststream/redis/opentelemetry/provider.py +++ b/faststream/redis/opentelemetry/provider.py @@ -3,6 +3,7 @@ from opentelemetry.semconv.trace import SpanAttributes from faststream.opentelemetry import TelemetrySettingsProvider +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME if TYPE_CHECKING: from faststream.broker.message import StreamMessage @@ -24,7 +25,7 @@ def get_consume_attrs_from_message( SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), - "messaging.destination_publish.name": msg.raw_message["channel"], + MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message["channel"], } def get_consume_destination_name( diff --git a/tests/opentelemetry/basic.py b/tests/opentelemetry/basic.py index 1ec43b858f..d854562ebb 100644 --- a/tests/opentelemetry/basic.py +++ b/tests/opentelemetry/basic.py @@ -3,7 +3,7 @@ from unittest.mock import Mock import pytest -from dirty_equals import IsUUID +from dirty_equals import IsFloat, IsUUID from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics._internal.point import Metric from opentelemetry.sdk.metrics.export import InMemoryMetricReader @@ -15,6 +15,10 @@ from opentelemetry.trace import SpanKind from faststream.broker.core.usecase import BrokerUsecase +from faststream.opentelemetry.consts import ( + ERROR_TYPE, + MESSAGING_DESTINATION_PUBLISH_NAME, +) from faststream.opentelemetry.middleware import MessageAction as Action from faststream.opentelemetry.middleware import TelemetryMiddleware @@ -22,6 +26,7 @@ @pytest.mark.asyncio() class LocalTelemetryTestcase: messaging_system: str + include_messages_counters: bool broker_class: Type[BrokerUsecase] timeout: int = 3 subscriber_kwargs: ClassVar[Dict[str, Any]] = {} @@ -43,20 +48,19 @@ def get_spans(exporter: InMemorySpanExporter) -> List[Span]: @staticmethod def get_metrics( reader: InMemoryMetricReader, - ) -> Tuple[Metric, Metric, Metric, Metric]: + ) -> List[Metric]: """Get sorted metrics. Return order: - - faststream.consumer.active_requests - - faststream.consumer.duration - - faststream.consumer.message_size - - faststream.publisher.message_size + - messaging.process.duration + - messaging.process.messages + - messaging.publish.duration + - messaging.publish.messages """ metrics = reader.get_metrics_data() metrics = metrics.resource_metrics[0].scope_metrics[0].metrics metrics = sorted(metrics, key=lambda m: m.name) - requests, cons_mes_size, duration, pub_mes_size = metrics - return requests, duration, cons_mes_size, pub_mes_size + return cast(List[Metric], metrics) @pytest.fixture() def tracer_provider(self) -> TracerProvider: @@ -101,8 +105,8 @@ def assert_span( ] if span.kind == SpanKind.CONSUMER and action in (Action.CREATE, Action.PROCESS): - assert attrs["messaging.destination_publish.name"] == queue, attrs[ - "messaging.destination_publish.name" + assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == queue, attrs[ + MESSAGING_DESTINATION_PUBLISH_NAME ] assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsUUID, attrs[ SpanAttr.MESSAGING_MESSAGE_ID @@ -124,6 +128,32 @@ def assert_span( if parent_span_id: assert span.parent.span_id == parent_span_id, span.parent.span_id + def assert_metrics( + self, + metrics: List[Metric], + count: int, + error_type: Optional[str] = None, + ) -> None: + if self.include_messages_counters: + assert len(metrics) == 4 + proc_dur, proc_msg, pub_dur, pub_msg = metrics + + assert proc_msg.data.data_points[0].value == count + assert pub_msg.data.data_points[0].value == count + + else: + assert len(metrics) == 2 + proc_dur, pub_dur = metrics + + if error_type: + assert proc_dur.data.data_points[0].attributes[ERROR_TYPE] == error_type + + assert proc_dur.data.data_points[0].count == 1 + assert proc_dur.data.data_points[0].sum == IsFloat + + assert pub_dur.data.data_points[0].count == 1 + assert pub_dur.data.data_points[0].sum == IsFloat + async def test_subscriber_create_publish_process_span( self, event: asyncio.Event, @@ -254,7 +284,7 @@ async def handler(m): assert event.is_set() mock.assert_called_once_with(msg) - async def test_subscriber_metrics( + async def test_metrics( self, event: asyncio.Event, queue: str, @@ -262,10 +292,6 @@ async def test_subscriber_metrics( meter_provider: MeterProvider, metric_reader: InMemoryMetricReader, ): - expected_requests_in_flight = 0 - expected_publishing_count = 1 - expected_consuming_count = 1 - mid = self.telemetry_middleware_class(meter_provider=meter_provider) broker = self.broker_class(middlewares=(mid,)) @@ -286,17 +312,46 @@ async def handler(m): await asyncio.wait(tasks, timeout=self.timeout) metrics = self.get_metrics(metric_reader) - requests, cons_mes_size, duration, pub_mes_size = metrics - assert requests.data.data_points[0].value == expected_requests_in_flight + self.assert_metrics(metrics, 1) + + assert event.is_set() + mock.assert_called_once_with(msg) + + async def test_error_metrics( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + meter_provider: MeterProvider, + metric_reader: InMemoryMetricReader, + ): + mid = self.telemetry_middleware_class(meter_provider=meter_provider) + broker = self.broker_class(middlewares=(mid,)) + expected_value_type = "ValueError" + + @broker.subscriber(queue, **self.subscriber_kwargs) + async def handler(m): + try: + raise ValueError + finally: + mock(m) + event.set() + + broker = self.patch_broker(broker) + msg = "start" - assert cons_mes_size.data.data_points[0].count == expected_consuming_count - assert cons_mes_size.data.data_points[0].sum == len(msg) + async with broker: + await broker.start() + tasks = ( + asyncio.create_task(broker.publish(msg, queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) - assert duration.data.data_points[0].count == expected_consuming_count + metrics = self.get_metrics(metric_reader) - assert pub_mes_size.data.data_points[0].count == expected_publishing_count - assert pub_mes_size.data.data_points[0].sum == len(msg) + self.assert_metrics(metrics, 1, expected_value_type) assert event.is_set() mock.assert_called_once_with(msg) diff --git a/tests/opentelemetry/confluent/test_confluent.py b/tests/opentelemetry/confluent/test_confluent.py index 5ef6597c22..901496814d 100644 --- a/tests/opentelemetry/confluent/test_confluent.py +++ b/tests/opentelemetry/confluent/test_confluent.py @@ -8,6 +8,7 @@ from faststream.confluent import KafkaBroker from faststream.confluent.opentelemetry import KafkaTelemetryMiddleware +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action from tests.brokers.confluent.test_consume import TestConsume from tests.brokers.confluent.test_publish import TestPublish @@ -18,6 +19,7 @@ @pytest.mark.confluent() class TestTelemetry(LocalTelemetryTestcase): messaging_system = "kafka" + include_messages_counters = True timeout: int = 10 subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"} broker_class = KafkaBroker @@ -41,7 +43,7 @@ def assert_span( assert attrs[SpanAttr.MESSAGING_DESTINATION_NAME] == queue if span.kind == SpanKind.CONSUMER and action in (Action.CREATE, Action.PROCESS): - assert attrs["messaging.destination_publish.name"] == queue + assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == queue assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsStr(regex=r"0-.+") assert attrs[SpanAttr.MESSAGING_KAFKA_DESTINATION_PARTITION] == 0 assert attrs[SpanAttr.MESSAGING_KAFKA_MESSAGE_OFFSET] == 0 diff --git a/tests/opentelemetry/kafka/test_kafka.py b/tests/opentelemetry/kafka/test_kafka.py index 120c31d7c7..ff04b0ffce 100644 --- a/tests/opentelemetry/kafka/test_kafka.py +++ b/tests/opentelemetry/kafka/test_kafka.py @@ -8,6 +8,7 @@ from faststream.kafka import KafkaBroker from faststream.kafka.opentelemetry import KafkaTelemetryMiddleware +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action from tests.brokers.kafka.test_consume import TestConsume from tests.brokers.kafka.test_publish import TestPublish @@ -18,6 +19,7 @@ @pytest.mark.kafka() class TestTelemetry(LocalTelemetryTestcase): messaging_system = "kafka" + include_messages_counters = True broker_class = KafkaBroker telemetry_middleware_class = KafkaTelemetryMiddleware @@ -39,7 +41,7 @@ def assert_span( assert attrs[SpanAttr.MESSAGING_DESTINATION_NAME] == queue if span.kind == SpanKind.CONSUMER and action in (Action.CREATE, Action.PROCESS): - assert attrs["messaging.destination_publish.name"] == queue + assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == queue assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsStr(regex=r"0-.+") assert attrs[SpanAttr.MESSAGING_KAFKA_DESTINATION_PARTITION] == 0 assert attrs[SpanAttr.MESSAGING_KAFKA_MESSAGE_OFFSET] == 0 diff --git a/tests/opentelemetry/nats/test_nats.py b/tests/opentelemetry/nats/test_nats.py index ededecbf6b..4a76cdb300 100644 --- a/tests/opentelemetry/nats/test_nats.py +++ b/tests/opentelemetry/nats/test_nats.py @@ -16,6 +16,7 @@ def stream(queue): @pytest.mark.nats() class TestTelemetry(LocalTelemetryTestcase): messaging_system = "nats" + include_messages_counters = True broker_class = NatsBroker telemetry_middleware_class = NatsTelemetryMiddleware diff --git a/tests/opentelemetry/rabbit/test_rabbit.py b/tests/opentelemetry/rabbit/test_rabbit.py index f145fc49eb..120ac3cd1c 100644 --- a/tests/opentelemetry/rabbit/test_rabbit.py +++ b/tests/opentelemetry/rabbit/test_rabbit.py @@ -6,6 +6,7 @@ from opentelemetry.semconv.trace import SpanAttributes as SpanAttr from opentelemetry.trace import SpanKind +from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action from faststream.rabbit import RabbitBroker, RabbitExchange from faststream.rabbit.opentelemetry import RabbitTelemetryMiddleware @@ -23,6 +24,7 @@ def exchange(queue): @pytest.mark.rabbit() class TestTelemetry(LocalTelemetryTestcase): messaging_system = "rabbitmq" + include_messages_counters = False broker_class = RabbitBroker telemetry_middleware_class = RabbitTelemetryMiddleware @@ -48,7 +50,7 @@ def assert_span( assert attrs[SpanAttr.MESSAGING_DESTINATION_NAME] == "" if span.kind == SpanKind.CONSUMER and action in (Action.CREATE, Action.PROCESS): - assert attrs["messaging.destination_publish.name"] == "" + assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == "" assert attrs["messaging.rabbitmq.message.delivery_tag"] == IsInt assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsUUID diff --git a/tests/opentelemetry/redis/test_redis.py b/tests/opentelemetry/redis/test_redis.py index 30a9da84aa..26ae4238a8 100644 --- a/tests/opentelemetry/redis/test_redis.py +++ b/tests/opentelemetry/redis/test_redis.py @@ -15,6 +15,7 @@ @pytest.mark.redis() class TestTelemetry(LocalTelemetryTestcase): messaging_system = "redis" + include_messages_counters = False broker_class = RedisBroker telemetry_middleware_class = RedisTelemetryMiddleware