Skip to content

Commit

Permalink
Merge branch 'main' into issue-1386
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik authored Jun 17, 2024
2 parents a9f6781 + dfec397 commit 68f33e6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 31 deletions.
19 changes: 9 additions & 10 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,20 +1014,19 @@ async def _create_subscription( # type: ignore[override]
declare=self.obj_watch.declare,
)

self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](
await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)
)

self.add_task(self._consume_watch())

async def _consume_watch(self) -> None:
assert self.subscription, "You should call `create_subscription` at first." # nosec B101
assert self.bucket, "You should call `create_subscription` at first." # nosec B101

# Should be created inside task to avoid nats-py lock
obj_watch = await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)

obj_watch = self.subscription.obj
self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch)

while self.running:
with suppress(TimeoutError):
Expand Down
8 changes: 6 additions & 2 deletions faststream/rabbit/opentelemetry/provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union

from opentelemetry.semconv.trace import SpanAttributes

Expand All @@ -9,6 +9,7 @@
from aio_pika import IncomingMessage

from faststream.broker.message import StreamMessage
from faststream.rabbit.schemas.exchange import RabbitExchange
from faststream.types import AnyDict


Expand Down Expand Up @@ -44,9 +45,12 @@ def get_publish_attrs_from_kwargs(
self,
kwargs: "AnyDict",
) -> "AnyDict":
exchange: Union[None, str, RabbitExchange] = kwargs.get("exchange")
return {
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs.get("exchange") or "",
SpanAttributes.MESSAGING_DESTINATION_NAME: getattr(
exchange, "name", exchange or ""
),
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: kwargs[
"routing_key"
],
Expand Down
22 changes: 4 additions & 18 deletions tests/docs/nats/js/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,9 @@
async def test_basic():
from docs.docs_src.nats.js.object import app, broker, handler

async with TestNatsBroker(broker, with_real=True):
await broker.start()

os = await broker.object_storage("example-bucket")
try:
existed_files = await os.list()
except Exception:
existed_files = ()

call = True
for file in existed_files:
if file.name == "file.txt":
call = False

if call:
async with TestApp(app):
pass

async with (
TestNatsBroker(broker, with_real=True, connect_only=True),
TestApp(app),
):
await handler.wait_call(3.0)
handler.mock.assert_called_once_with("file.txt")
2 changes: 1 addition & 1 deletion tests/opentelemetry/nats/test_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def test_batch(
@broker.subscriber(
queue,
stream=stream,
pull_sub=PullSub(3, batch=True),
pull_sub=PullSub(3, batch=True, timeout=30.0),
**self.subscriber_kwargs,
)
async def handler(m):
Expand Down

0 comments on commit 68f33e6

Please sign in to comment.