Skip to content

Commit

Permalink
Add OTel baggage support (#1692)
Browse files Browse the repository at this point in the history
* feat: add current span in faststream context

* feat: add custom baggage

* test: add baggage test

* feat: refactor baggage

* test: fix confluent tests

* docs: generate API References

* feat: refactor

* fix: linters

* fix: mypy

* docs: add baggage and CurrentSpan docs

* docs: fix trailing whitespaces

---------

Co-authored-by: draincoder <[email protected]>
  • Loading branch information
draincoder and draincoder authored Sep 9, 2024
1 parent 7aaafdd commit e2e3023
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 23 deletions.
4 changes: 4 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ search:
- [TestApp](public_api/faststream/nats/TestApp.md)
- [TestNatsBroker](public_api/faststream/nats/TestNatsBroker.md)
- opentelemetry
- [Baggage](public_api/faststream/opentelemetry/Baggage.md)
- [TelemetryMiddleware](public_api/faststream/opentelemetry/TelemetryMiddleware.md)
- [TelemetrySettingsProvider](public_api/faststream/opentelemetry/TelemetrySettingsProvider.md)
- rabbit
Expand Down Expand Up @@ -795,8 +796,11 @@ search:
- [TestNatsBroker](api/faststream/nats/testing/TestNatsBroker.md)
- [build_message](api/faststream/nats/testing/build_message.md)
- opentelemetry
- [Baggage](api/faststream/opentelemetry/Baggage.md)
- [TelemetryMiddleware](api/faststream/opentelemetry/TelemetryMiddleware.md)
- [TelemetrySettingsProvider](api/faststream/opentelemetry/TelemetrySettingsProvider.md)
- baggage
- [Baggage](api/faststream/opentelemetry/baggage/Baggage.md)
- consts
- [MessageAction](api/faststream/opentelemetry/consts/MessageAction.md)
- middleware
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/opentelemetry/Baggage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.opentelemetry.Baggage
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/opentelemetry/baggage/Baggage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.opentelemetry.baggage.Baggage
76 changes: 74 additions & 2 deletions docs/docs/en/getting-started/opentelemetry/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def third_handler(msg: str):
await asyncio.sleep(0.075)
```

## FastStream Tracing
### FastStream Tracing

**OpenTelemetry** tracing support in **FastStream** adheres to the [semantic conventions for messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/){.external-link target="_blank"}.

Expand Down Expand Up @@ -98,7 +98,22 @@ To visualize traces, you can send them to a backend system that supports distrib
* **Zipkin**: Similar to **Jaeger**, you can run **Zipkin** using **Docker** and configure the **OpenTelemetry** middleware accordingly. For more details, see the [Zipkin documentation](https://zipkin.io/){.external-link target="_blank"}.
* **Grafana Tempo**: **Grafana Tempo** is a high-scale distributed tracing backend. You can configure **OpenTelemetry** to export traces to **Tempo**, which can then be visualized using **Grafana**. For more details, see the [Grafana Tempo documentation](https://grafana.com/docs/tempo/latest/){.external-link target="_blank"}.

## Example
### Context propagation

Quite often it is necessary to communicate with **other** services and to propagate the trace context, you can use the **CurrentSpan** object and follow the example:

```python linenums="1" hl_lines="5-7"
from opentelemetry import trace, propagate
from faststream.opentelemetry import CurrentSpan
@broker.subscriber("symbol")
async def handler(msg: str, span: CurrentSpan) -> None:
headers = {}
propagate.inject(headers, context=trace.set_span_in_context(span))
price = await exchange_client.get_symbol_price(msg, headers=headers)
```

### Full example

To see how to set up, visualize, and configure tracing for **FastStream** services, go to [example](https://github.com/draincoder/faststream-monitoring){.external-link target="_blank"}.

Expand All @@ -112,3 +127,60 @@ An example includes:

![HTML-page](../../../assets/img/distributed-trace.png){ loading=lazy }
`Visualized via Grafana and Tempo`

## Baggage

[OpenTelemetry Baggage](https://opentelemetry.io/docs/concepts/signals/baggage/){.external-link target="_blank"} is a context propagation mechanism that allows you to pass custom metadata or key-value pairs across service boundaries, providing additional context for distributed tracing and observability.

### FastStream Baggage

**FastStream** provides a convenient abstraction over baggage that allows you to:

* **Initialize** the baggage
* **Propagate** baggage through headers
* **Modify** the baggage
* **Stop** propagating baggage

### Example

To initialize the baggage and start distributing it, follow this example:

```python linenums="1" hl_lines="3-4"
from faststream.opentelemetry import Baggage
headers = Baggage({"hello": "world"}).to_headers({"header-type": "custom"})
await broker.publish("hello", "first", headers=headers)
```

All interaction with baggage at the **consumption level** occurs through the **CurrentBaggage** object, which is automatically substituted from the context:

```python linenums="1" hl_lines="6-10 17-18 24"
from faststream.opentelemetry import CurrentBaggage
@broker.subscriber("first")
@broker.publisher("second")
async def response_handler_first(msg: str, baggage: CurrentBaggage):
print(baggage.get_all()) # {'hello': 'world'}
baggage.remove("hello")
baggage.set("user-id", 1)
baggage.set("request-id", "UUID")
print(baggage.get("user-id")) # 1
return msg
@broker.subscriber("second")
@broker.publisher("third")
async def response_handler_second(msg: str, baggage: CurrentBaggage):
print(baggage.get_all()) # {'user-id': '1', 'request-id': 'UUID'}
baggage.clear()
return msg
@broker.subscriber("third")
async def response_handler_third(msg: str, baggage: CurrentBaggage):
print(baggage.get_all()) # {}
```

!!! note
If you consume messages in **batches**, then the baggage from each message will be merged into the **common baggage** available
through the `get_all` method, but you can still get a list of all the baggage from the batch using the `get_all_batch` method.
5 changes: 5 additions & 0 deletions faststream/opentelemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from faststream.opentelemetry.annotations import CurrentBaggage, CurrentSpan
from faststream.opentelemetry.baggage import Baggage
from faststream.opentelemetry.middleware import TelemetryMiddleware
from faststream.opentelemetry.provider import TelemetrySettingsProvider

__all__ = (
"Baggage",
"CurrentBaggage",
"CurrentSpan",
"TelemetryMiddleware",
"TelemetrySettingsProvider",
)
8 changes: 8 additions & 0 deletions faststream/opentelemetry/annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from opentelemetry.trace import Span
from typing_extensions import Annotated

from faststream import Context
from faststream.opentelemetry.baggage import Baggage

CurrentSpan = Annotated[Span, Context("span")]
CurrentBaggage = Annotated[Baggage, Context("baggage")]
77 changes: 77 additions & 0 deletions faststream/opentelemetry/baggage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import TYPE_CHECKING, Any, List, Optional, cast

from opentelemetry import baggage, context
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from typing_extensions import Self

if TYPE_CHECKING:
from faststream.broker.message import StreamMessage
from faststream.types import AnyDict

_BAGGAGE_PROPAGATOR = W3CBaggagePropagator()


class Baggage:
__slots__ = ("_baggage", "_batch_baggage")

def __init__(
self, payload: "AnyDict", batch_payload: Optional[List["AnyDict"]] = None
) -> None:
self._baggage = dict(payload)
self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else []

def get_all(self) -> "AnyDict":
"""Get a copy of the current baggage."""
return self._baggage.copy()

def get_all_batch(self) -> List["AnyDict"]:
"""Get a copy of all batch baggage if exists."""
return self._batch_baggage.copy()

def get(self, key: str) -> Optional[Any]:
"""Get a value from the baggage by key."""
return self._baggage.get(key)

def remove(self, key: str) -> None:
"""Remove a baggage item by key."""
self._baggage.pop(key, None)

def set(self, key: str, value: Any) -> None:
"""Set a key-value pair in the baggage."""
self._baggage[key] = value

def clear(self) -> None:
"""Clear the current baggage."""
self._baggage.clear()

def to_headers(self, headers: Optional["AnyDict"] = None) -> "AnyDict":
"""Convert baggage items to headers format for propagation."""
current_context = context.get_current()
if headers is None:
headers = {}

for k, v in self._baggage.items():
current_context = baggage.set_baggage(k, v, context=current_context)

_BAGGAGE_PROPAGATOR.inject(headers, current_context)
return headers

@classmethod
def from_msg(cls, msg: "StreamMessage[Any]") -> Self:
"""Create a Baggage instance from a StreamMessage."""
if len(msg.batch_headers) <= 1:
payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(msg.headers))
return cls(cast("AnyDict", payload))

cumulative_baggage: AnyDict = {}
batch_baggage: List[AnyDict] = []

for headers in msg.batch_headers:
payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers))
cumulative_baggage.update(payload)
batch_baggage.append(cast("AnyDict", payload))

return cls(cumulative_baggage, batch_baggage)

def __repr__(self) -> str:
return self._baggage.__repr__()
20 changes: 19 additions & 1 deletion faststream/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from collections import defaultdict
from copy import copy
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, cast
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, cast

from opentelemetry import baggage, context, metrics, trace
from opentelemetry.baggage.propagation import W3CBaggagePropagator
Expand All @@ -11,6 +11,8 @@
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from faststream import BaseMiddleware
from faststream import context as fs_context
from faststream.opentelemetry.baggage import Baggage
from faststream.opentelemetry.consts import (
ERROR_TYPE,
MESSAGING_DESTINATION_PUBLISH_NAME,
Expand All @@ -21,6 +23,7 @@
from faststream.opentelemetry.provider import TelemetrySettingsProvider

if TYPE_CHECKING:
from contextvars import Token
from types import TracebackType

from opentelemetry.metrics import Meter, MeterProvider
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__(
self._metrics = metrics_container
self._current_span: Optional[Span] = None
self._origin_context: Optional[Context] = None
self._scope_tokens: List[Tuple[str, Token[Any]]] = []
self.__settings_provider = settings_provider_factory(msg)

async def publish_scope(
Expand All @@ -134,6 +138,10 @@ async def publish_scope(
current_context = context.get_current()
destination_name = provider.get_publish_destination_name(kwargs)

current_baggage: Optional[Baggage] = fs_context.get_local("baggage")
if current_baggage:
headers.update(current_baggage.to_headers())

trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs)
metrics_attributes = {
SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
Expand All @@ -143,6 +151,7 @@ async def publish_scope(
# NOTE: if batch with single message?
if (msg_count := len((msg, *args))) > 1:
trace_attributes[SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT] = msg_count
current_context = _BAGGAGE_PROPAGATOR.extract(headers, current_context)
_BAGGAGE_PROPAGATOR.inject(
headers, baggage.set_baggage(WITH_BATCH, True, context=current_context)
)
Expand Down Expand Up @@ -185,6 +194,9 @@ async def publish_scope(
duration = time.perf_counter() - start_time
self._metrics.observe_publish(metrics_attributes, duration, msg_count)

for key, token in self._scope_tokens:
fs_context.reset_local(key, token)

return result

async def consume_scope(
Expand Down Expand Up @@ -234,6 +246,12 @@ async def consume_scope(
SpanAttributes.MESSAGING_OPERATION, MessageAction.PROCESS
)
self._current_span = span

self._scope_tokens.append(("span", fs_context.set_local("span", span)))
self._scope_tokens.append(
("baggage", fs_context.set_local("baggage", Baggage.from_msg(msg)))
)

new_context = trace.set_span_in_context(span, current_context)
token = context.attach(new_context)
result = await call_next(msg)
Expand Down
Loading

0 comments on commit e2e3023

Please sign in to comment.