diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index d64cc2cf2d..52a8f43560 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -1014,20 +1014,19 @@ async def _create_subscription( # type: ignore[override] declare=self.obj_watch.declare, ) - self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"]( - await self.bucket.watch( - ignore_deletes=self.obj_watch.ignore_deletes, - include_history=self.obj_watch.include_history, - meta_only=self.obj_watch.meta_only, - ) - ) - self.add_task(self._consume_watch()) async def _consume_watch(self) -> None: - assert self.subscription, "You should call `create_subscription` at first." # nosec B101 + assert self.bucket, "You should call `create_subscription` at first." # nosec B101 + + # Should be created inside task to avoid nats-py lock + obj_watch = await self.bucket.watch( + ignore_deletes=self.obj_watch.ignore_deletes, + include_history=self.obj_watch.include_history, + meta_only=self.obj_watch.meta_only, + ) - obj_watch = self.subscription.obj + self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch) while self.running: with suppress(TimeoutError): diff --git a/faststream/rabbit/opentelemetry/provider.py b/faststream/rabbit/opentelemetry/provider.py index 7ba8c1900e..6971810ff2 100644 --- a/faststream/rabbit/opentelemetry/provider.py +++ b/faststream/rabbit/opentelemetry/provider.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union from opentelemetry.semconv.trace import SpanAttributes @@ -9,6 +9,7 @@ from aio_pika import IncomingMessage from faststream.broker.message import StreamMessage + from faststream.rabbit.schemas.exchange import RabbitExchange from faststream.types import AnyDict @@ -44,9 +45,12 @@ def get_publish_attrs_from_kwargs( self, kwargs: "AnyDict", ) -> "AnyDict": + exchange: Union[None, str, RabbitExchange] = kwargs.get("exchange") return { SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, - SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs.get("exchange") or "", + SpanAttributes.MESSAGING_DESTINATION_NAME: getattr( + exchange, "name", exchange or "" + ), SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: kwargs[ "routing_key" ], diff --git a/tests/docs/nats/js/test_object.py b/tests/docs/nats/js/test_object.py index b65905d4c6..fbe9183035 100644 --- a/tests/docs/nats/js/test_object.py +++ b/tests/docs/nats/js/test_object.py @@ -9,23 +9,9 @@ async def test_basic(): from docs.docs_src.nats.js.object import app, broker, handler - async with TestNatsBroker(broker, with_real=True): - await broker.start() - - os = await broker.object_storage("example-bucket") - try: - existed_files = await os.list() - except Exception: - existed_files = () - - call = True - for file in existed_files: - if file.name == "file.txt": - call = False - - if call: - async with TestApp(app): - pass - + async with ( + TestNatsBroker(broker, with_real=True, connect_only=True), + TestApp(app), + ): await handler.wait_call(3.0) handler.mock.assert_called_once_with("file.txt") diff --git a/tests/opentelemetry/nats/test_nats.py b/tests/opentelemetry/nats/test_nats.py index b886e46d8f..db9b4ba48b 100644 --- a/tests/opentelemetry/nats/test_nats.py +++ b/tests/opentelemetry/nats/test_nats.py @@ -48,7 +48,7 @@ async def test_batch( @broker.subscriber( queue, stream=stream, - pull_sub=PullSub(3, batch=True), + pull_sub=PullSub(3, batch=True, timeout=30.0), **self.subscriber_kwargs, ) async def handler(m):