diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 1c8f0b39e0..03088c528d 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -65,6 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) + - [Pull Subscriber](nats/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) diff --git a/docs/docs/en/nats/jetstream/pull.md b/docs/docs/en/nats/jetstream/pull.md new file mode 100644 index 0000000000..df8fb04cb8 --- /dev/null +++ b/docs/docs/en/nats/jetstream/pull.md @@ -0,0 +1,27 @@ +# Pull Subscriber + +## Overview + +**NATS JetStream** supports two various way to consume messages: [**Push** and **Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers. + +The **Push** consumer is used by default to consume messages with the **FastStream**. It means that the **NATS** server delivers messages to your consumer as far as possible by itself. However, it also means that **NATS** should control all current consumer connections and increase server load. + +Thus, the **Pull** consumer is the recommended way to consume JetStream messages by the *NATS TEAM*. Using it, you simply ask **NATS** for new messages at some interval. It may sound a little less convenient than automatic message delivery, but it provides several advantages, such as: + +* Consumer scaling without a *queue group* +* Handling messages in batches +* Reducing **NATS** server load + +So, if you want to consume a large flow of messages without strict time limitations, the **Pull** consumer is the right choice for you. + +## FastStream Details + +The **Pull** consumer is just a regular *Stream* consumer, but with the `pull_sub` argument, which controls consuming messages with batch size and block interval. + +```python linenums="1" hl_lines="10-11" +{!> docs_src/nats/js/pull_sub.py !} +``` + +The batch size doesn't mean that your `msg` argument is a list of messages, but it means that you consume up to `10` messages for one request to **NATS** and call your handler for each message in an `asyncio.gather` pool. + +So, your subject will be processed much faster, without blocking for each message processing. However, if your subject has fewer than `10` messages, your request to **NATS** will be blocked for `timeout` (5 seconds by default) while trying to collect the required number of messages. Therefor, you should choose `batch_size` and `timeout` accurately to optimize your consumer efficiency. diff --git a/docs/docs/summary_template.txt b/docs/docs/summary_template.txt index 5ae6fd0fd1..7927242b03 100644 --- a/docs/docs/summary_template.txt +++ b/docs/docs/summary_template.txt @@ -65,6 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) + - [Pull Subscriber](nats/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) diff --git a/docs/docs_src/nats/js/pull_sub.py b/docs/docs_src/nats/js/pull_sub.py new file mode 100644 index 0000000000..887d7dab91 --- /dev/null +++ b/docs/docs_src/nats/js/pull_sub.py @@ -0,0 +1,14 @@ +from faststream import FastStream, Logger +from faststream.nats import NatsBroker, PullSub + +broker = NatsBroker() +app = FastStream(broker) + + +@broker.subscriber( + subject="test", + stream="stream", + pull_sub=PullSub(batch_size=10), +) +async def handle(msg, logger: Logger): + logger.info(msg) diff --git a/examples/nats/e09_pull_sub.py b/examples/nats/e09_pull_sub.py new file mode 100644 index 0000000000..887d7dab91 --- /dev/null +++ b/examples/nats/e09_pull_sub.py @@ -0,0 +1,14 @@ +from faststream import FastStream, Logger +from faststream.nats import NatsBroker, PullSub + +broker = NatsBroker() +app = FastStream(broker) + + +@broker.subscriber( + subject="test", + stream="stream", + pull_sub=PullSub(batch_size=10), +) +async def handle(msg, logger: Logger): + logger.info(msg) diff --git a/faststream/__about__.py b/faststream/__about__.py index 844c7183db..368244d5bc 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices""" -__version__ = "0.2.11" +__version__ = "0.2.12" INSTALL_YAML = """ diff --git a/faststream/nats/__init__.py b/faststream/nats/__init__.py index 9efebcb5f7..43d7a0c1a5 100644 --- a/faststream/nats/__init__.py +++ b/faststream/nats/__init__.py @@ -17,6 +17,7 @@ from faststream.nats.annotations import NatsMessage from faststream.nats.broker import NatsBroker from faststream.nats.js_stream import JStream +from faststream.nats.pull_sub import PullSub from faststream.nats.router import NatsRouter from faststream.nats.shared.router import NatsRoute from faststream.nats.test import TestNatsBroker @@ -29,6 +30,7 @@ "NatsRouter", "NatsRoute", "JStream", + "PullSub", # Nats imports "ConsumerConfig", "DeliverPolicy", diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 906adb4651..7dc520e515 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -38,6 +38,7 @@ from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import AnyDict, DecodedMessage from faststream.utils.context.main import context @@ -268,6 +269,9 @@ def subscriber( # type: ignore[override] flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # custom ack_first: bool = False, stream: Union[str, JStream, None] = None, @@ -287,6 +291,9 @@ def subscriber( # type: ignore[override] ]: stream = stream_builder.stream(stream) + if pull_sub is not None and stream is None: + raise ValueError("Pull subscriber can be used only with a stream") + self._setup_log_context( queue=queue, subject=subject, @@ -315,14 +322,24 @@ def subscriber( # type: ignore[override] "durable": durable, "stream": stream.name, "config": config, - "ordered_consumer": ordered_consumer, - "idle_heartbeat": idle_heartbeat, - "flow_control": flow_control, - "deliver_policy": deliver_policy, - "headers_only": headers_only, - "manual_ack": not ack_first, } ) + + if pull_sub is not None: + extra_options.update({"inbox_prefix": inbox_prefix}) + + else: + extra_options.update( + { + "ordered_consumer": ordered_consumer, + "idle_heartbeat": idle_heartbeat, + "flow_control": flow_control, + "deliver_policy": deliver_policy, + "headers_only": headers_only, + "manual_ack": not ack_first, + } + ) + else: extra_options.update( { @@ -337,6 +354,7 @@ def subscriber( # type: ignore[override] subject=subject, queue=queue, stream=stream, + pull_sub=pull_sub, extra_options=extra_options, title=title, description=description, diff --git a/faststream/nats/broker.pyi b/faststream/nats/broker.pyi index 8015a286e6..5e3c44df6f 100644 --- a/faststream/nats/broker.pyi +++ b/faststream/nats/broker.pyi @@ -47,6 +47,7 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import DecodedMessage, SendableMessage @@ -234,6 +235,9 @@ class NatsBroker( flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/faststream/nats/fastapi.pyi b/faststream/nats/fastapi.pyi index f65a96e111..d9da1afcd2 100644 --- a/faststream/nats/fastapi.pyi +++ b/faststream/nats/fastapi.pyi @@ -58,6 +58,7 @@ from faststream.nats.asyncapi import Publisher from faststream.nats.broker import NatsBroker from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage +from faststream.nats.pull_sub import PullSub class NatsRouter(StreamRouter[Msg]): broker_class = NatsBroker @@ -197,6 +198,9 @@ class NatsRouter(StreamRouter[Msg]): flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index f2e5f3decb..524272430c 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -1,9 +1,12 @@ -from typing import Any, Callable, Dict, Optional, Sequence, Union +import asyncio +from contextlib import suppress +from typing import Any, Callable, Dict, Optional, Sequence, Union, cast from fast_depends.core import CallModel from nats.aio.client import Client from nats.aio.msg import Msg from nats.aio.subscription import Subscription +from nats.errors import TimeoutError from nats.js import JetStreamContext from faststream._compat import override @@ -22,12 +25,19 @@ from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage from faststream.nats.parser import JsParser, Parser +from faststream.nats.pull_sub import PullSub from faststream.types import AnyDict from faststream.utils.context.path import compile_path class LogicNatsHandler(AsyncHandler[Msg]): - subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription]] + subscription: Union[ + None, + Subscription, + JetStreamContext.PushSubscription, + JetStreamContext.PullSubscription, + ] + task: Optional["asyncio.Task[Any]"] = None def __init__( self, @@ -35,6 +45,7 @@ def __init__( log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]], queue: str = "", stream: Optional[JStream] = None, + pull_sub: Optional[PullSub] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, @@ -47,6 +58,7 @@ def __init__( self.queue = queue self.stream = stream + self.pull_sub = pull_sub self.extra_options = extra_options or {} super().__init__( @@ -55,6 +67,7 @@ def __init__( title=title, ) + self.task = None self.subscription = None def add_call( @@ -79,18 +92,48 @@ def add_call( @override async def start(self, connection: Union[Client, JetStreamContext]) -> None: # type: ignore[override] - self.subscription = await connection.subscribe( - subject=self.subject, - queue=self.queue, - cb=self.consume, # type: ignore[arg-type] - **self.extra_options, - ) + if self.pull_sub is not None: + connection = cast(JetStreamContext, connection) + + if self.stream is None: + raise ValueError("Pull subscriber can be used only with a stream") + + self.subscription = await connection.pull_subscribe( + subject=self.subject, + **self.extra_options, + ) + self.task = asyncio.create_task(self._consume()) + + else: + self.subscription = await connection.subscribe( + subject=self.subject, + queue=self.queue, + cb=self.consume, # type: ignore[arg-type] + **self.extra_options, + ) async def close(self) -> None: if self.subscription is not None: await self.subscription.unsubscribe() self.subscription = None + if self.task is not None: + self.task.cancel() + self.task = None + + async def _consume(self) -> None: + assert self.pull_sub # nosec B101 + + sub = cast(JetStreamContext.PullSubscription, self.subscription) + + while self.subscription is not None: + with suppress(TimeoutError): + messages = await sub.fetch( + batch=self.pull_sub.batch_size, + timeout=self.pull_sub.timeout, + ) + await asyncio.gather(*map(self.consume, messages)) + @staticmethod def get_routing_hash(subject: str) -> str: return subject diff --git a/faststream/nats/pull_sub.py b/faststream/nats/pull_sub.py new file mode 100644 index 0000000000..4454a0e0e5 --- /dev/null +++ b/faststream/nats/pull_sub.py @@ -0,0 +1,18 @@ +from typing import Optional + +from pydantic import BaseModel, Field + + +class PullSub(BaseModel): + batch_size: int = Field(default=1) + timeout: Optional[float] = Field(default=5.0) + + def __init__( + self, + batch_size: int = 1, + timeout: Optional[float] = 5.0, + ) -> None: + super().__init__( + batch_size=batch_size, + timeout=timeout, + ) diff --git a/faststream/nats/router.pyi b/faststream/nats/router.pyi index 7dab1e732c..50d5fa0d28 100644 --- a/faststream/nats/router.pyi +++ b/faststream/nats/router.pyi @@ -18,6 +18,7 @@ from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.asyncapi import Publisher from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.router import NatsRoute from faststream.nats.shared.router import NatsRouter as BaseRouter @@ -73,6 +74,9 @@ class NatsRouter(BaseRouter): flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index b7a5e1cdc2..268303f3ca 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -5,7 +5,7 @@ from nats.aio.msg import Msg from faststream.exceptions import AckMessage -from faststream.nats import JStream, NatsBroker +from faststream.nats import JStream, NatsBroker, PullSub from faststream.nats.annotations import NatsMessage from tests.brokers.base.consume import BrokerRealConsumeTestcase from tests.tools import spy_decorator @@ -37,6 +37,33 @@ def subscriber(m): assert event.is_set() + async def test_consume_pull( + self, + queue: str, + consume_broker: NatsBroker, + stream: JStream, + event: asyncio.Event, + mock, + ): + @consume_broker.subscriber(queue, stream=stream, pull_sub=PullSub(1)) + def subscriber(m): + mock(m) + event.set() + + await consume_broker.start() + await asyncio.wait( + ( + asyncio.create_task( + consume_broker.publish("hello", queue, stream=stream.name) + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with("hello") + @pytest.mark.asyncio async def test_consume_ack( self, diff --git a/tests/brokers/nats/test_test_client.py b/tests/brokers/nats/test_test_client.py index 165f936e82..832a82e1eb 100644 --- a/tests/brokers/nats/test_test_client.py +++ b/tests/brokers/nats/test_test_client.py @@ -3,7 +3,7 @@ import pytest from faststream import BaseMiddleware -from faststream.nats import JStream, NatsBroker, TestNatsBroker +from faststream.nats import JStream, NatsBroker, PullSub, TestNatsBroker from tests.brokers.base.testclient import BrokerTestclientTestcase @@ -135,3 +135,17 @@ def subscriber(): await test_broker.start() await test_broker.publish("hello", "test.a.subj.b.c") subscriber.mock.assert_called_once_with("hello") + + async def test_consume_pull( + self, + queue: str, + test_broker: NatsBroker, + stream: JStream, + ): + @test_broker.subscriber(queue, stream=stream, pull_sub=PullSub(1)) + def subscriber(m): + ... + + await test_broker.start() + await test_broker.publish("hello", queue) + subscriber.mock.assert_called_once_with("hello") diff --git a/tests/docs/nats/js/test_pull_sub.py b/tests/docs/nats/js/test_pull_sub.py new file mode 100644 index 0000000000..635a805802 --- /dev/null +++ b/tests/docs/nats/js/test_pull_sub.py @@ -0,0 +1,13 @@ +import pytest + +from faststream.nats import TestApp, TestNatsBroker + + +@pytest.mark.asyncio +async def test_basic(): + from docs.docs_src.nats.js.pull_sub import app, broker, handle + + async with TestNatsBroker(broker): + async with TestApp(app): + await broker.publish("Hi!", "test") + handle.mock.assert_called_once_with("Hi!") diff --git a/tests/examples/nats/test_e09_pull_sub.py b/tests/examples/nats/test_e09_pull_sub.py new file mode 100644 index 0000000000..a13f2f6566 --- /dev/null +++ b/tests/examples/nats/test_e09_pull_sub.py @@ -0,0 +1,13 @@ +import pytest + +from faststream.nats import TestApp, TestNatsBroker + + +@pytest.mark.asyncio +async def test_basic(): + from examples.nats.e09_pull_sub import app, broker, handle + + async with TestNatsBroker(broker): + async with TestApp(app): + await broker.publish("Hi!", "test") + handle.mock.assert_called_once_with("Hi!")