Skip to content

Commit

Permalink
fix: correct NATS ObjectStorage watch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Jun 14, 2024
1 parent 6171692 commit c445a11
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,20 +1014,19 @@ async def _create_subscription( # type: ignore[override]
declare=self.obj_watch.declare,
)

self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](
await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)
)

self.add_task(self._consume_watch())

async def _consume_watch(self) -> None:
assert self.subscription, "You should call `create_subscription` at first." # nosec B101
assert self.bucket, "You should call `create_subscription` at first." # nosec B101

# Should be created inside task to avoid nats-py lock
obj_watch = await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)

obj_watch = self.subscription.obj
self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch)

while self.running:
with suppress(TimeoutError):
Expand Down

0 comments on commit c445a11

Please sign in to comment.