From c445a117165df491fb1ee2a360baa2d2d2d86614 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Fri, 14 Jun 2024 21:57:09 +0300 Subject: [PATCH] fix: correct NATS ObjectStorage watch processing --- faststream/nats/subscriber/usecase.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index d64cc2cf2d..52a8f43560 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -1014,20 +1014,19 @@ async def _create_subscription( # type: ignore[override] declare=self.obj_watch.declare, ) - self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"]( - await self.bucket.watch( - ignore_deletes=self.obj_watch.ignore_deletes, - include_history=self.obj_watch.include_history, - meta_only=self.obj_watch.meta_only, - ) - ) - self.add_task(self._consume_watch()) async def _consume_watch(self) -> None: - assert self.subscription, "You should call `create_subscription` at first." # nosec B101 + assert self.bucket, "You should call `create_subscription` at first." # nosec B101 + + # Should be created inside task to avoid nats-py lock + obj_watch = await self.bucket.watch( + ignore_deletes=self.obj_watch.ignore_deletes, + include_history=self.obj_watch.include_history, + meta_only=self.obj_watch.meta_only, + ) - obj_watch = self.subscription.obj + self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch) while self.running: with suppress(TimeoutError):