Skip to content

Commit

Permalink
In case if core-subscriber receive a JetStream message. (#1751)
Browse files Browse the repository at this point in the history
* In case if core-subscriber receive a JetStream message.

* Remove default value and add tests
  • Loading branch information
sheldygg authored Sep 2, 2024
1 parent 12a03a0 commit 446a5ed
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 4 deletions.
7 changes: 6 additions & 1 deletion faststream/nats/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ async def decode_message(
class NatsParser(NatsBaseParser):
"""A class to parse NATS core messages."""

def __init__(self, *, pattern: str, no_ack: bool) -> None:
super().__init__(pattern=pattern)
self.no_ack = no_ack

async def parse_message(
self,
message: "Msg",
Expand All @@ -62,7 +66,8 @@ async def parse_message(

headers = message.header or {}

message._ackd = True # prevent message from acking
if not self.no_ack:
message._ackd = True # prevent message from acking

return NatsMessage(
raw_message=message,
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
) -> None:
self._connection = connection

default = NatsParser(pattern="")
default = NatsParser(pattern="", no_ack=False)
self._parser = resolve_custom_func(parser, default.parse_message)
self._decoder = resolve_custom_func(decoder, default.decode_message)

Expand Down Expand Up @@ -141,7 +141,7 @@ def __init__(
) -> None:
self._connection = connection

default = NatsParser(pattern="")
default = NatsParser(pattern="", no_ack=False)
self._parser = resolve_custom_func(parser, default.parse_message)
self._decoder = resolve_custom_func(decoder, default.decode_message)

Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def __init__(
description_: Optional[str],
include_in_schema: bool,
) -> None:
parser_ = NatsParser(pattern=subject)
parser_ = NatsParser(pattern=subject, no_ack=no_ack)

self.queue = queue

Expand Down
26 changes: 26 additions & 0 deletions tests/brokers/nats/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,32 @@ async def handler(msg: NatsMessage):

assert event.is_set()

async def test_core_consume_no_ack(
self,
queue: str,
event: asyncio.Event,
stream: JStream,
):
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(queue, no_ack=True)
async def handler(msg: NatsMessage):
if not msg.raw_message._ackd:
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(event.wait()),
),
timeout=3,
)

assert event.is_set()

async def test_consume_ack_manual(
self,
queue: str,
Expand Down

0 comments on commit 446a5ed

Please sign in to comment.