diff --git a/faststream/nats/message.py b/faststream/nats/message.py index 0f104a3310..5e5d89fd86 100644 --- a/faststream/nats/message.py +++ b/faststream/nats/message.py @@ -17,6 +17,11 @@ async def ack(self) -> None: await self.raw_message.ack() await super().ack() + async def ack_sync(self) -> None: + if not self.raw_message._ackd: + await self.raw_message.ack_sync() + await super().ack() + async def nack( self, delay: Union[int, float, None] = None, diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index a8b9778e4d..eececb8b18 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -214,6 +214,34 @@ async def handler(msg: NatsMessage): assert event.is_set() + async def test_consume_ack_sync_manual( + self, + queue: str, + event: asyncio.Event, + stream: JStream, + ): + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber(queue, stream=stream) + async def handler(msg: NatsMessage): + await msg.ack_sync() + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + with patch.object(Msg, "ack_sync", spy_decorator(Msg.ack_sync)) as m: + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello", queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + m.mock.assert_called_once() + + assert event.is_set() + async def test_consume_ack_raise( self, queue: str,