Skip to content

Commit

Permalink
Merge branch 'opentelemetry' of github.com:draincoder/faststream into…
Browse files Browse the repository at this point in the history
… opentelemetry
  • Loading branch information
Lancetnik committed May 11, 2024
2 parents 724bb84 + 223c28b commit 0eecd2f
Show file tree
Hide file tree
Showing 19 changed files with 223 additions and 86 deletions.
1 change: 0 additions & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ def __init__(
apply_types=apply_types,
validate=validate,
)

self.client_id = client_id
self._producer = None

Expand Down
1 change: 1 addition & 0 deletions faststream/confluent/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ def __init__(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
include_messages_counters=True,
)
5 changes: 3 additions & 2 deletions faststream/confluent/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from faststream.broker.types import MsgType
from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME

if TYPE_CHECKING:
from confluent_kafka import Message
Expand Down Expand Up @@ -57,7 +58,7 @@ def get_consume_attrs_from_message(
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition(),
SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset(),
"messaging.destination_publish.name": msg.raw_message.topic(),
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic(),
}

if (key := msg.raw_message.key()) is not None:
Expand Down Expand Up @@ -89,7 +90,7 @@ def get_consume_attrs_from_message(
bytearray().join(cast(Sequence[bytes], msg.body))
),
SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition(),
"messaging.destination_publish.name": raw_message.topic(),
MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic(),
}

return attrs
Expand Down
1 change: 1 addition & 0 deletions faststream/kafka/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ def __init__(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
include_messages_counters=True,
)
5 changes: 3 additions & 2 deletions faststream/kafka/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from faststream.broker.types import MsgType
from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
Expand Down Expand Up @@ -57,7 +58,7 @@ def get_consume_attrs_from_message(
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition,
SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset,
"messaging.destination_publish.name": msg.raw_message.topic,
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic,
}

if msg.raw_message.key is not None:
Expand Down Expand Up @@ -90,7 +91,7 @@ def get_consume_attrs_from_message(
),
SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message),
SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition,
"messaging.destination_publish.name": raw_message.topic,
MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic,
}

return attrs
Expand Down
1 change: 1 addition & 0 deletions faststream/nats/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ def __init__(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
include_messages_counters=True,
)
5 changes: 3 additions & 2 deletions faststream/nats/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from faststream.__about__ import SERVICE_NAME
from faststream.broker.types import MsgType
from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME

if TYPE_CHECKING:
from nats.aio.msg import Msg
Expand Down Expand Up @@ -47,7 +48,7 @@ def get_consume_attrs_from_message(
SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
"messaging.destination_publish.name": msg.raw_message.subject,
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.subject,
}

@staticmethod
Expand All @@ -70,7 +71,7 @@ def get_consume_attrs_from_message(
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message),
"messaging.destination_publish.name": msg.raw_message[0].subject,
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message[0].subject,
}

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions faststream/opentelemetry/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ class MessageAction:
PUBLISH = "publish"
PROCESS = "process"
RECEIVE = "receive"


ERROR_TYPE = "error.type"
MESSAGING_DESTINATION_PUBLISH_NAME = "messaging.destination_publish.name"
165 changes: 113 additions & 52 deletions faststream/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import time
from copy import copy
from typing import TYPE_CHECKING, Any, Callable, Optional, Type

from opentelemetry import context, metrics, propagate, trace
from opentelemetry.semconv.trace import SpanAttributes

from faststream import BaseMiddleware
from faststream.opentelemetry.consts import MessageAction
from faststream.opentelemetry.consts import (
ERROR_TYPE,
MESSAGING_DESTINATION_PUBLISH_NAME,
MessageAction,
)
from faststream.opentelemetry.provider import TelemetrySettingsProvider

if TYPE_CHECKING:
Expand All @@ -16,7 +21,7 @@
from opentelemetry.trace import Span, Tracer, TracerProvider

from faststream.broker.message import StreamMessage
from faststream.types import AsyncFunc, AsyncFuncAny
from faststream.types import AnyDict, AsyncFunc, AsyncFuncAny


_OTEL_SCHEMA = "https://opentelemetry.io/schemas/1.11.0"
Expand All @@ -28,33 +33,68 @@ def _create_span_name(destination: str, action: str) -> str:

class _MetricsContainer:
__slots__ = (
"active_requests_counter",
"duration_histogram",
"consumer_message_size_histogram",
"publisher_message_size_histogram",
"include_messages_counters",
"publish_duration",
"publish_counter",
"process_duration",
"process_counter",
)

def __init__(self, meter: "Meter") -> None:
self.active_requests_counter = meter.create_up_down_counter(
name="faststream.consumer.active_requests",
unit="requests",
description="Measures the number of concurrent messages that are currently in-flight.",
def __init__(self, meter: "Meter", include_messages_counters: bool) -> None:
self.include_messages_counters = include_messages_counters

self.publish_duration = meter.create_histogram(
name="messaging.publish.duration",
unit="s",
description="Measures the duration of publish operation.",
)
self.duration_histogram = meter.create_histogram(
name="faststream.consumer.duration",
self.process_duration = meter.create_histogram(
name="messaging.process.duration",
unit="s",
description="Measures the duration of message processing.",
description="Measures the duration of process operation.",
)
self.consumer_message_size_histogram = meter.create_histogram(
name="faststream.consumer.message_size",
unit="By",
description="Measures the size of consumed messages.",

if include_messages_counters:
self.process_counter = meter.create_counter(
name="messaging.process.messages",
unit="message",
description="Measures the number of processed messages.",
)
self.publish_counter = meter.create_counter(
name="messaging.publish.messages",
unit="message",
description="Measures the number of published messages.",
)

def observe_publish(
self, attrs: "AnyDict", duration: float, msg_count: int
) -> None:
self.publish_duration.record(
amount=duration,
attributes=attrs,
)
self.publisher_message_size_histogram = meter.create_histogram(
name="faststream.publisher.message_size",
unit="By",
description="Measures the size of published messages.",
if self.include_messages_counters:
counter_attrs = copy(attrs)
counter_attrs.pop(ERROR_TYPE, None)
self.publish_counter.add(
amount=msg_count,
attributes=counter_attrs,
)

def observe_consume(
self, attrs: "AnyDict", duration: float, msg_count: int
) -> None:
self.process_duration.record(
amount=duration,
attributes=attrs,
)
if self.include_messages_counters:
counter_attrs = copy(attrs)
counter_attrs.pop(ERROR_TYPE, None)
self.process_counter.add(
amount=msg_count,
attributes=counter_attrs,
)


class BaseTelemetryMiddleware(BaseMiddleware):
Expand Down Expand Up @@ -83,12 +123,16 @@ async def publish_scope(
) -> Any:
provider = self.__settings_provider

attributes = provider.get_publish_attrs_from_kwargs(kwargs)

headers = kwargs.pop("headers", {}) or {}
current_context = context.get_current()
destination_name = provider.get_publish_destination_name(kwargs)

headers = kwargs.pop("headers", {}) or {}
trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs)
metrics_attributes = {
SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
SpanAttributes.MESSAGING_DESTINATION_NAME: destination_name,
}

if self._current_span and self._current_span.is_recording():
current_context = trace.set_span_in_context(
self._current_span, current_context
Expand All @@ -99,27 +143,36 @@ async def publish_scope(
create_span = self._tracer.start_span(
name=_create_span_name(destination_name, MessageAction.CREATE),
kind=trace.SpanKind.PRODUCER,
attributes=attributes,
attributes=trace_attributes,
)
current_context = trace.set_span_in_context(create_span)
propagate.inject(headers, context=current_context)
create_span.end()

with self._tracer.start_as_current_span(
name=_create_span_name(destination_name, MessageAction.PUBLISH),
kind=trace.SpanKind.PRODUCER,
attributes=attributes,
context=current_context,
) as span:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH
)
result = await call_next(msg, *args, headers=headers, **kwargs)
start_time = time.perf_counter()

self._metrics.publisher_message_size_histogram.record(
len(str(msg) or ""),
attributes,
)
try:
with self._tracer.start_as_current_span(
name=_create_span_name(destination_name, MessageAction.PUBLISH),
kind=trace.SpanKind.PRODUCER,
attributes=trace_attributes,
context=current_context,
) as span:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH
)
result = await call_next(msg, *args, headers=headers, **kwargs)

except Exception as e:
metrics_attributes[ERROR_TYPE] = type(e).__name__
raise

finally:
duration = time.perf_counter() - start_time
msg_count = trace_attributes.get(
SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1
)
self._metrics.observe_publish(metrics_attributes, duration, msg_count)

return result

Expand All @@ -130,30 +183,33 @@ async def consume_scope(
) -> Any:
provider = self.__settings_provider

start_time = time.perf_counter()
current_context = propagate.extract(msg.headers)
destination_name = provider.get_consume_destination_name(msg)
attributes = provider.get_consume_attrs_from_message(msg)

trace_attributes = provider.get_consume_attrs_from_message(msg)
metrics_attributes = {
SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
MESSAGING_DESTINATION_PUBLISH_NAME: destination_name,
}

if not len(current_context):
create_span = self._tracer.start_span(
name=_create_span_name(destination_name, MessageAction.CREATE),
kind=trace.SpanKind.CONSUMER,
attributes=attributes,
attributes=trace_attributes,
)
current_context = trace.set_span_in_context(create_span)
create_span.end()

self._origin_context = current_context
self._metrics.active_requests_counter.add(1, attributes)
self._metrics.consumer_message_size_histogram.record(len(msg.body), attributes)
start_time = time.perf_counter()

try:
with self._tracer.start_as_current_span(
name=_create_span_name(destination_name, MessageAction.PROCESS),
kind=trace.SpanKind.CONSUMER,
context=current_context,
attributes=attributes,
attributes=trace_attributes,
end_on_exit=False,
) as span:
span.set_attribute(
Expand All @@ -165,12 +221,16 @@ async def consume_scope(
result = await call_next(msg)
context.detach(token)

total_time = time.perf_counter() - start_time
self._metrics.duration_histogram.record(
amount=total_time, attributes=attributes
)
except Exception as e:
metrics_attributes[ERROR_TYPE] = type(e).__name__
raise

finally:
self._metrics.active_requests_counter.add(-1, attributes)
duration = time.perf_counter() - start_time
msg_count = trace_attributes.get(
SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1
)
self._metrics.observe_consume(metrics_attributes, duration, msg_count)

return result

Expand Down Expand Up @@ -201,10 +261,11 @@ def __init__(
tracer_provider: Optional["TracerProvider"] = None,
meter_provider: Optional["MeterProvider"] = None,
meter: Optional["Meter"] = None,
include_messages_counters: bool = False,
) -> None:
self._tracer = _get_tracer(tracer_provider)
self._meter = _get_meter(meter_provider, meter)
self._metrics = _MetricsContainer(self._meter)
self._metrics = _MetricsContainer(self._meter, include_messages_counters)
self._settings_provider_factory = settings_provider_factory

def __call__(self, msg: Optional[Any]) -> BaseMiddleware:
Expand Down
1 change: 1 addition & 0 deletions faststream/rabbit/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ def __init__(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
include_messages_counters=False,
)
3 changes: 2 additions & 1 deletion faststream/rabbit/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from opentelemetry.semconv.trace import SpanAttributes

from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME

if TYPE_CHECKING:
from aio_pika import IncomingMessage
Expand All @@ -28,7 +29,7 @@ def get_consume_attrs_from_message(
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: msg.raw_message.routing_key,
"messaging.rabbitmq.message.delivery_tag": msg.raw_message.delivery_tag,
"messaging.destination_publish.name": msg.raw_message.exchange,
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.exchange,
}

@staticmethod
Expand Down
Loading

0 comments on commit 0eecd2f

Please sign in to comment.