diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index f5846592cc..9c3931f316 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -177,6 +177,7 @@ search: - [TestApp](public_api/faststream/nats/TestApp.md) - [TestNatsBroker](public_api/faststream/nats/TestNatsBroker.md) - opentelemetry + - [Baggage](public_api/faststream/opentelemetry/Baggage.md) - [TelemetryMiddleware](public_api/faststream/opentelemetry/TelemetryMiddleware.md) - [TelemetrySettingsProvider](public_api/faststream/opentelemetry/TelemetrySettingsProvider.md) - rabbit @@ -795,8 +796,11 @@ search: - [TestNatsBroker](api/faststream/nats/testing/TestNatsBroker.md) - [build_message](api/faststream/nats/testing/build_message.md) - opentelemetry + - [Baggage](api/faststream/opentelemetry/Baggage.md) - [TelemetryMiddleware](api/faststream/opentelemetry/TelemetryMiddleware.md) - [TelemetrySettingsProvider](api/faststream/opentelemetry/TelemetrySettingsProvider.md) + - baggage + - [Baggage](api/faststream/opentelemetry/baggage/Baggage.md) - consts - [MessageAction](api/faststream/opentelemetry/consts/MessageAction.md) - middleware diff --git a/docs/docs/en/api/faststream/opentelemetry/Baggage.md b/docs/docs/en/api/faststream/opentelemetry/Baggage.md new file mode 100644 index 0000000000..a61cb56d97 --- /dev/null +++ b/docs/docs/en/api/faststream/opentelemetry/Baggage.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.opentelemetry.Baggage diff --git a/docs/docs/en/api/faststream/opentelemetry/baggage/Baggage.md b/docs/docs/en/api/faststream/opentelemetry/baggage/Baggage.md new file mode 100644 index 0000000000..c1c6e4efec --- /dev/null +++ b/docs/docs/en/api/faststream/opentelemetry/baggage/Baggage.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.opentelemetry.baggage.Baggage diff --git a/docs/docs/en/getting-started/opentelemetry/index.md b/docs/docs/en/getting-started/opentelemetry/index.md index 44e7fe9013..051571ac36 100644 --- a/docs/docs/en/getting-started/opentelemetry/index.md +++ b/docs/docs/en/getting-started/opentelemetry/index.md @@ -41,7 +41,7 @@ async def third_handler(msg: str): await asyncio.sleep(0.075) ``` -## FastStream Tracing +### FastStream Tracing **OpenTelemetry** tracing support in **FastStream** adheres to the [semantic conventions for messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/){.external-link target="_blank"}. @@ -98,7 +98,22 @@ To visualize traces, you can send them to a backend system that supports distrib * **Zipkin**: Similar to **Jaeger**, you can run **Zipkin** using **Docker** and configure the **OpenTelemetry** middleware accordingly. For more details, see the [Zipkin documentation](https://zipkin.io/){.external-link target="_blank"}. * **Grafana Tempo**: **Grafana Tempo** is a high-scale distributed tracing backend. You can configure **OpenTelemetry** to export traces to **Tempo**, which can then be visualized using **Grafana**. For more details, see the [Grafana Tempo documentation](https://grafana.com/docs/tempo/latest/){.external-link target="_blank"}. -## Example +### Context propagation + +Quite often it is necessary to communicate with **other** services and to propagate the trace context, you can use the **CurrentSpan** object and follow the example: + +```python linenums="1" hl_lines="5-7" +from opentelemetry import trace, propagate +from faststream.opentelemetry import CurrentSpan + +@broker.subscriber("symbol") +async def handler(msg: str, span: CurrentSpan) -> None: + headers = {} + propagate.inject(headers, context=trace.set_span_in_context(span)) + price = await exchange_client.get_symbol_price(msg, headers=headers) +``` + +### Full example To see how to set up, visualize, and configure tracing for **FastStream** services, go to [example](https://github.com/draincoder/faststream-monitoring){.external-link target="_blank"}. @@ -112,3 +127,60 @@ An example includes: ![HTML-page](../../../assets/img/distributed-trace.png){ loading=lazy } `Visualized via Grafana and Tempo` + +## Baggage + +[OpenTelemetry Baggage](https://opentelemetry.io/docs/concepts/signals/baggage/){.external-link target="_blank"} is a context propagation mechanism that allows you to pass custom metadata or key-value pairs across service boundaries, providing additional context for distributed tracing and observability. + +### FastStream Baggage + +**FastStream** provides a convenient abstraction over baggage that allows you to: + +* **Initialize** the baggage +* **Propagate** baggage through headers +* **Modify** the baggage +* **Stop** propagating baggage + +### Example + +To initialize the baggage and start distributing it, follow this example: + +```python linenums="1" hl_lines="3-4" +from faststream.opentelemetry import Baggage + +headers = Baggage({"hello": "world"}).to_headers({"header-type": "custom"}) +await broker.publish("hello", "first", headers=headers) +``` + +All interaction with baggage at the **consumption level** occurs through the **CurrentBaggage** object, which is automatically substituted from the context: + +```python linenums="1" hl_lines="6-10 17-18 24" +from faststream.opentelemetry import CurrentBaggage + +@broker.subscriber("first") +@broker.publisher("second") +async def response_handler_first(msg: str, baggage: CurrentBaggage): + print(baggage.get_all()) # {'hello': 'world'} + baggage.remove("hello") + baggage.set("user-id", 1) + baggage.set("request-id", "UUID") + print(baggage.get("user-id")) # 1 + return msg + + +@broker.subscriber("second") +@broker.publisher("third") +async def response_handler_second(msg: str, baggage: CurrentBaggage): + print(baggage.get_all()) # {'user-id': '1', 'request-id': 'UUID'} + baggage.clear() + return msg + + +@broker.subscriber("third") +async def response_handler_third(msg: str, baggage: CurrentBaggage): + print(baggage.get_all()) # {} +``` + +!!! note + If you consume messages in **batches**, then the baggage from each message will be merged into the **common baggage** available + through the `get_all` method, but you can still get a list of all the baggage from the batch using the `get_all_batch` method. diff --git a/faststream/opentelemetry/__init__.py b/faststream/opentelemetry/__init__.py index 401c1be077..f6c9fe8f57 100644 --- a/faststream/opentelemetry/__init__.py +++ b/faststream/opentelemetry/__init__.py @@ -1,7 +1,12 @@ +from faststream.opentelemetry.annotations import CurrentBaggage, CurrentSpan +from faststream.opentelemetry.baggage import Baggage from faststream.opentelemetry.middleware import TelemetryMiddleware from faststream.opentelemetry.provider import TelemetrySettingsProvider __all__ = ( + "Baggage", + "CurrentBaggage", + "CurrentSpan", "TelemetryMiddleware", "TelemetrySettingsProvider", ) diff --git a/faststream/opentelemetry/annotations.py b/faststream/opentelemetry/annotations.py new file mode 100644 index 0000000000..cdf2378cc3 --- /dev/null +++ b/faststream/opentelemetry/annotations.py @@ -0,0 +1,8 @@ +from opentelemetry.trace import Span +from typing_extensions import Annotated + +from faststream import Context +from faststream.opentelemetry.baggage import Baggage + +CurrentSpan = Annotated[Span, Context("span")] +CurrentBaggage = Annotated[Baggage, Context("baggage")] diff --git a/faststream/opentelemetry/baggage.py b/faststream/opentelemetry/baggage.py new file mode 100644 index 0000000000..420b0c9081 --- /dev/null +++ b/faststream/opentelemetry/baggage.py @@ -0,0 +1,77 @@ +from typing import TYPE_CHECKING, Any, List, Optional, cast + +from opentelemetry import baggage, context +from opentelemetry.baggage.propagation import W3CBaggagePropagator +from typing_extensions import Self + +if TYPE_CHECKING: + from faststream.broker.message import StreamMessage + from faststream.types import AnyDict + +_BAGGAGE_PROPAGATOR = W3CBaggagePropagator() + + +class Baggage: + __slots__ = ("_baggage", "_batch_baggage") + + def __init__( + self, payload: "AnyDict", batch_payload: Optional[List["AnyDict"]] = None + ) -> None: + self._baggage = dict(payload) + self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else [] + + def get_all(self) -> "AnyDict": + """Get a copy of the current baggage.""" + return self._baggage.copy() + + def get_all_batch(self) -> List["AnyDict"]: + """Get a copy of all batch baggage if exists.""" + return self._batch_baggage.copy() + + def get(self, key: str) -> Optional[Any]: + """Get a value from the baggage by key.""" + return self._baggage.get(key) + + def remove(self, key: str) -> None: + """Remove a baggage item by key.""" + self._baggage.pop(key, None) + + def set(self, key: str, value: Any) -> None: + """Set a key-value pair in the baggage.""" + self._baggage[key] = value + + def clear(self) -> None: + """Clear the current baggage.""" + self._baggage.clear() + + def to_headers(self, headers: Optional["AnyDict"] = None) -> "AnyDict": + """Convert baggage items to headers format for propagation.""" + current_context = context.get_current() + if headers is None: + headers = {} + + for k, v in self._baggage.items(): + current_context = baggage.set_baggage(k, v, context=current_context) + + _BAGGAGE_PROPAGATOR.inject(headers, current_context) + return headers + + @classmethod + def from_msg(cls, msg: "StreamMessage[Any]") -> Self: + """Create a Baggage instance from a StreamMessage.""" + if len(msg.batch_headers) <= 1: + payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(msg.headers)) + return cls(cast("AnyDict", payload)) + + cumulative_baggage: AnyDict = {} + batch_baggage: List[AnyDict] = [] + + for headers in msg.batch_headers: + payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers)) + cumulative_baggage.update(payload) + batch_baggage.append(cast("AnyDict", payload)) + + return cls(cumulative_baggage, batch_baggage) + + def __repr__(self) -> str: + return self._baggage.__repr__() diff --git a/faststream/opentelemetry/middleware.py b/faststream/opentelemetry/middleware.py index 695cef979d..97cb7ddd14 100644 --- a/faststream/opentelemetry/middleware.py +++ b/faststream/opentelemetry/middleware.py @@ -1,7 +1,7 @@ import time from collections import defaultdict from copy import copy -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, cast from opentelemetry import baggage, context, metrics, trace from opentelemetry.baggage.propagation import W3CBaggagePropagator @@ -11,6 +11,8 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from faststream import BaseMiddleware +from faststream import context as fs_context +from faststream.opentelemetry.baggage import Baggage from faststream.opentelemetry.consts import ( ERROR_TYPE, MESSAGING_DESTINATION_PUBLISH_NAME, @@ -21,6 +23,7 @@ from faststream.opentelemetry.provider import TelemetrySettingsProvider if TYPE_CHECKING: + from contextvars import Token from types import TracebackType from opentelemetry.metrics import Meter, MeterProvider @@ -118,6 +121,7 @@ def __init__( self._metrics = metrics_container self._current_span: Optional[Span] = None self._origin_context: Optional[Context] = None + self._scope_tokens: List[Tuple[str, Token[Any]]] = [] self.__settings_provider = settings_provider_factory(msg) async def publish_scope( @@ -134,6 +138,10 @@ async def publish_scope( current_context = context.get_current() destination_name = provider.get_publish_destination_name(kwargs) + current_baggage: Optional[Baggage] = fs_context.get_local("baggage") + if current_baggage: + headers.update(current_baggage.to_headers()) + trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs) metrics_attributes = { SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system, @@ -143,6 +151,7 @@ async def publish_scope( # NOTE: if batch with single message? if (msg_count := len((msg, *args))) > 1: trace_attributes[SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT] = msg_count + current_context = _BAGGAGE_PROPAGATOR.extract(headers, current_context) _BAGGAGE_PROPAGATOR.inject( headers, baggage.set_baggage(WITH_BATCH, True, context=current_context) ) @@ -185,6 +194,9 @@ async def publish_scope( duration = time.perf_counter() - start_time self._metrics.observe_publish(metrics_attributes, duration, msg_count) + for key, token in self._scope_tokens: + fs_context.reset_local(key, token) + return result async def consume_scope( @@ -234,6 +246,12 @@ async def consume_scope( SpanAttributes.MESSAGING_OPERATION, MessageAction.PROCESS ) self._current_span = span + + self._scope_tokens.append(("span", fs_context.set_local("span", span))) + self._scope_tokens.append( + ("baggage", fs_context.set_local("baggage", Baggage.from_msg(msg))) + ) + new_context = trace.set_span_in_context(span, current_context) token = context.attach(new_context) result = await call_next(msg) diff --git a/tests/opentelemetry/basic.py b/tests/opentelemetry/basic.py index a5bf228a18..0c5be63a1c 100644 --- a/tests/opentelemetry/basic.py +++ b/tests/opentelemetry/basic.py @@ -12,9 +12,10 @@ from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.semconv.trace import SpanAttributes as SpanAttr -from opentelemetry.trace import SpanKind +from opentelemetry.trace import SpanKind, get_current_span from faststream.broker.core.usecase import BrokerUsecase +from faststream.opentelemetry import Baggage, CurrentBaggage, CurrentSpan from faststream.opentelemetry.consts import ( ERROR_TYPE, MESSAGING_DESTINATION_PUBLISH_NAME, @@ -366,3 +367,171 @@ async def handler(m): assert event.is_set() mock.assert_called_once_with(msg) + + async def test_span_in_context( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + tracer_provider: TracerProvider, + trace_exporter: InMemorySpanExporter, + ): + mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) + broker = self.broker_class(middlewares=(mid,)) + + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) + async def handler(m, span: CurrentSpan): + assert span is get_current_span() + mock(m) + event.set() + + broker = self.patch_broker(broker) + msg = "start" + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task(broker.publish(msg, queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + mock.assert_called_once_with(msg) + + async def test_get_baggage( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + ): + mid = self.telemetry_middleware_class() + broker = self.broker_class(middlewares=(mid,)) + expected_baggage = {"foo": "bar"} + + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) + async def handler1(m, baggage: CurrentBaggage): + assert baggage.get("foo") == "bar" + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == [] + assert baggage.__repr__() == expected_baggage.__repr__() + mock(m) + event.set() + + broker = self.patch_broker(broker) + msg = "start" + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task( + broker.publish( + msg, queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + mock.assert_called_once_with(msg) + + async def test_clear_baggage( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + ): + mid = self.telemetry_middleware_class() + broker = self.broker_class(middlewares=(mid,)) + + first_queue = queue + "1" + second_queue = queue + "2" + + args, kwargs = self.get_subscriber_params(first_queue) + + @broker.subscriber(*args, **kwargs) + @broker.publisher(second_queue) + async def handler1(m, baggage: CurrentBaggage): + baggage.clear() + assert baggage.get_all() == {} + return m + + args2, kwargs2 = self.get_subscriber_params(second_queue) + + @broker.subscriber(*args2, **kwargs2) + async def handler2(m, baggage: CurrentBaggage): + assert baggage.get_all() == {} + mock(m) + event.set() + + broker = self.patch_broker(broker) + msg = "start" + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task( + broker.publish( + msg, first_queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + mock.assert_called_once_with(msg) + + async def test_modify_baggage( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + ): + mid = self.telemetry_middleware_class() + broker = self.broker_class(middlewares=(mid,)) + expected_baggage = {"baz": "bar", "bar": "baz"} + + first_queue = queue + "1" + second_queue = queue + "2" + + args, kwargs = self.get_subscriber_params(first_queue) + + @broker.subscriber(*args, **kwargs) + @broker.publisher(second_queue) + async def handler1(m, baggage: CurrentBaggage): + baggage.set("bar", "baz") + baggage.set("baz", "bar") + baggage.remove("foo") + return m + + args2, kwargs2 = self.get_subscriber_params(second_queue) + + @broker.subscriber(*args2, **kwargs2) + async def handler2(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + mock(m) + event.set() + + broker = self.patch_broker(broker) + msg = "start" + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task( + broker.publish( + msg, first_queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + mock.assert_called_once_with(msg) diff --git a/tests/opentelemetry/confluent/test_confluent.py b/tests/opentelemetry/confluent/test_confluent.py index 87bf44ffcc..9eb52d9742 100644 --- a/tests/opentelemetry/confluent/test_confluent.py +++ b/tests/opentelemetry/confluent/test_confluent.py @@ -13,6 +13,7 @@ from faststream.confluent import KafkaBroker from faststream.confluent.opentelemetry import KafkaTelemetryMiddleware +from faststream.opentelemetry import Baggage, CurrentBaggage from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action from tests.brokers.confluent.basic import ConfluentTestcaseConfig @@ -77,11 +78,17 @@ async def test_batch( expected_msg_count = 3 expected_link_count = 1 expected_link_attrs = {"messaging.batch.message_count": 3} + expected_baggage = {"with_batch": "True", "foo": "bar"} + expected_baggage_batch = [ + {"with_batch": "True", "foo": "bar"} + ] * expected_msg_count args, kwargs = self.get_subscriber_params(queue, batch=True) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == expected_baggage_batch mock(m) event.set() @@ -90,7 +97,15 @@ async def handler(m): async with broker: await broker.start() tasks = ( - asyncio.create_task(broker.publish_batch(1, "hi", 3, topic=queue)), + asyncio.create_task( + broker.publish_batch( + 1, + "hi", + 3, + topic=queue, + headers=Baggage({"foo": "bar"}).to_headers(), + ) + ), asyncio.create_task(event.wait()), ) await asyncio.wait(tasks, timeout=self.timeout) @@ -131,18 +146,23 @@ async def test_batch_publish_with_single_consume( expected_link_count = 1 expected_span_count = 8 expected_pub_batch_count = 1 + expected_baggage = {"with_batch": "True", "foo": "bar"} args, kwargs = self.get_subscriber_params(queue) @broker.subscriber(*args, **kwargs) - async def handler(msg): + async def handler(msg, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == [] await msgs_queue.put(msg) broker = self.patch_broker(broker) async with broker: await broker.start() - await broker.publish_batch(1, "hi", 3, topic=queue) + await broker.publish_batch( + 1, "hi", 3, topic=queue, headers=Baggage({"foo": "bar"}).to_headers() + ) result, _ = await asyncio.wait( ( asyncio.create_task(msgs_queue.get()), @@ -191,11 +211,14 @@ async def test_single_publish_with_batch_consume( expected_link_count = 2 expected_span_count = 6 expected_process_batch_count = 1 + expected_baggage = {"foo": "bar", "bar": "baz"} args, kwargs = self.get_subscriber_params(queue, batch=True) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert len(baggage.get_all_batch()) == expected_msg_count m.sort() mock(m) event.set() @@ -205,8 +228,16 @@ async def handler(m): async with broker: await broker.start() tasks = ( - asyncio.create_task(broker.publish("hi", topic=queue)), - asyncio.create_task(broker.publish("buy", topic=queue)), + asyncio.create_task( + broker.publish( + "hi", topic=queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task( + broker.publish( + "buy", topic=queue, headers=Baggage({"bar": "baz"}).to_headers() + ) + ), asyncio.create_task(event.wait()), ) await asyncio.wait(tasks, timeout=self.timeout) diff --git a/tests/opentelemetry/kafka/test_kafka.py b/tests/opentelemetry/kafka/test_kafka.py index 2b272aa09d..0967069ade 100644 --- a/tests/opentelemetry/kafka/test_kafka.py +++ b/tests/opentelemetry/kafka/test_kafka.py @@ -13,6 +13,7 @@ from faststream.kafka import KafkaBroker from faststream.kafka.opentelemetry import KafkaTelemetryMiddleware +from faststream.opentelemetry import Baggage, CurrentBaggage from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action from tests.brokers.kafka.test_consume import TestConsume @@ -78,11 +79,17 @@ async def test_batch( expected_msg_count = 3 expected_link_count = 1 expected_link_attrs = {"messaging.batch.message_count": 3} + expected_baggage = {"with_batch": "True", "foo": "bar"} + expected_baggage_batch = [ + {"with_batch": "True", "foo": "bar"} + ] * expected_msg_count args, kwargs = self.get_subscriber_params(queue, batch=True) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == expected_baggage_batch mock(m) event.set() @@ -91,7 +98,15 @@ async def handler(m): async with broker: await broker.start() tasks = ( - asyncio.create_task(broker.publish_batch(1, "hi", 3, topic=queue)), + asyncio.create_task( + broker.publish_batch( + 1, + "hi", + 3, + topic=queue, + headers=Baggage({"foo": "bar"}).to_headers(), + ) + ), asyncio.create_task(event.wait()), ) await asyncio.wait(tasks, timeout=self.timeout) @@ -132,18 +147,23 @@ async def test_batch_publish_with_single_consume( expected_link_count = 1 expected_span_count = 8 expected_pub_batch_count = 1 + expected_baggage = {"with_batch": "True", "foo": "bar"} args, kwargs = self.get_subscriber_params(queue) @broker.subscriber(*args, **kwargs) - async def handler(msg): + async def handler(msg, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == [] await msgs_queue.put(msg) broker = self.patch_broker(broker) async with broker: await broker.start() - await broker.publish_batch(1, "hi", 3, topic=queue) + await broker.publish_batch( + 1, "hi", 3, topic=queue, headers=Baggage({"foo": "bar"}).to_headers() + ) result, _ = await asyncio.wait( ( asyncio.create_task(msgs_queue.get()), @@ -192,11 +212,14 @@ async def test_single_publish_with_batch_consume( expected_link_count = 2 expected_span_count = 6 expected_process_batch_count = 1 + expected_baggage = {"foo": "bar", "bar": "baz"} args, kwargs = self.get_subscriber_params(queue, batch=True) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert len(baggage.get_all_batch()) == expected_msg_count m.sort() mock(m) event.set() @@ -206,8 +229,16 @@ async def handler(m): async with broker: await broker.start() tasks = ( - asyncio.create_task(broker.publish("hi", topic=queue)), - asyncio.create_task(broker.publish("buy", topic=queue)), + asyncio.create_task( + broker.publish( + "hi", topic=queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task( + broker.publish( + "buy", topic=queue, headers=Baggage({"bar": "baz"}).to_headers() + ) + ), asyncio.create_task(event.wait()), ) await asyncio.wait(tasks, timeout=self.timeout) diff --git a/tests/opentelemetry/redis/test_redis.py b/tests/opentelemetry/redis/test_redis.py index 86a4f16e67..9729b0a27f 100644 --- a/tests/opentelemetry/redis/test_redis.py +++ b/tests/opentelemetry/redis/test_redis.py @@ -8,6 +8,7 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.semconv.trace import SpanAttributes as SpanAttr +from faststream.opentelemetry import Baggage, CurrentBaggage from faststream.redis import ListSub, RedisBroker from faststream.redis.opentelemetry import RedisTelemetryMiddleware from tests.brokers.redis.test_consume import ( @@ -44,11 +45,15 @@ async def test_batch( expected_msg_count = 3 expected_link_count = 1 expected_link_attrs = {"messaging.batch.message_count": 3} + expected_baggage = {"with_batch": "True"} + expected_baggage_batch = [{"with_batch": "True"}] * 3 args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True)) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == expected_baggage_batch mock(m) event.set() @@ -98,11 +103,15 @@ async def test_batch_publish_with_single_consume( expected_link_count = 1 expected_span_count = 8 expected_pub_batch_count = 1 + expected_baggage = {"with_batch": "True"} + expected_baggage_batch = [] args, kwargs = self.get_subscriber_params(list=ListSub(queue)) @broker.subscriber(*args, **kwargs) - async def handler(msg): + async def handler(msg, baggage: CurrentBaggage): + assert baggage.get_all() == expected_baggage + assert baggage.get_all_batch() == expected_baggage_batch await msgs_queue.put(msg) broker = self.patch_broker(broker) @@ -158,11 +167,14 @@ async def test_single_publish_with_batch_consume( expected_link_count = 2 expected_span_count = 6 expected_process_batch_count = 1 + expected_baggage = {"foo": "bar", "bar": "baz"} args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True)) @broker.subscriber(*args, **kwargs) - async def handler(m): + async def handler(m, baggage: CurrentBaggage): + assert len(baggage.get_all_batch()) == expected_msg_count + assert baggage.get_all() == expected_baggage m.sort() mock(m) event.set() @@ -171,8 +183,16 @@ async def handler(m): async with broker: tasks = ( - asyncio.create_task(broker.publish("hi", list=queue)), - asyncio.create_task(broker.publish("buy", list=queue)), + asyncio.create_task( + broker.publish( + "hi", list=queue, headers=Baggage({"foo": "bar"}).to_headers() + ) + ), + asyncio.create_task( + broker.publish( + "buy", list=queue, headers=Baggage({"bar": "baz"}).to_headers() + ) + ), ) await asyncio.wait(tasks, timeout=self.timeout) await broker.start()