From 9ad4d6d68c6016ebe99e70dfc0b04461b36f2e9c Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 4 Nov 2023 00:43:38 +0200 Subject: [PATCH 01/18] draft for pull_subscribe --- faststream/nats/broker.py | 23 +++++++++------- faststream/nats/handler.py | 23 +++++++++++----- faststream/nats/pull_sub.py | 55 +++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 faststream/nats/pull_sub.py diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 906adb4651..7bd065343b 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -36,6 +36,7 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.helpers import stream_builder from faststream.nats.js_stream import JStream +from faststream.nats.pull_sub import PullSub from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer from faststream.nats.shared.logging import NatsLoggingMixin @@ -261,6 +262,7 @@ def subscriber( # type: ignore[override] # Core arguments max_msgs: int = 0, # JS arguments + pull_sub: Optional[PullSub] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, ordered_consumer: bool = False, @@ -296,17 +298,17 @@ def subscriber( # type: ignore[override] extra_options: AnyDict = { "pending_msgs_limit": pending_msgs_limit - or ( - DEFAULT_JS_SUB_PENDING_MSGS_LIMIT - if stream - else DEFAULT_SUB_PENDING_MSGS_LIMIT - ), + or ( + DEFAULT_JS_SUB_PENDING_MSGS_LIMIT + if stream + else DEFAULT_SUB_PENDING_MSGS_LIMIT + ), "pending_bytes_limit": pending_bytes_limit - or ( - DEFAULT_JS_SUB_PENDING_BYTES_LIMIT - if stream - else DEFAULT_SUB_PENDING_BYTES_LIMIT - ), + or ( + DEFAULT_JS_SUB_PENDING_BYTES_LIMIT + if stream + else DEFAULT_SUB_PENDING_BYTES_LIMIT + ), } if stream: @@ -337,6 +339,7 @@ def subscriber( # type: ignore[override] subject=subject, queue=queue, stream=stream, + pull_sub=pull_sub, extra_options=extra_options, title=title, description=description, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index f2e5f3decb..5ecb4663ac 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -20,6 +20,7 @@ ) from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.js_stream import JStream +from faststream.nats.pull_sub import PullSub from faststream.nats.message import NatsMessage from faststream.nats.parser import JsParser, Parser from faststream.types import AnyDict @@ -27,7 +28,7 @@ class LogicNatsHandler(AsyncHandler[Msg]): - subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription]] + subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription, JetStreamContext.PullSubscription]] def __init__( self, @@ -35,6 +36,7 @@ def __init__( log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]], queue: str = "", stream: Optional[JStream] = None, + pull_sub: Optional[PullSub] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, @@ -47,6 +49,7 @@ def __init__( self.queue = queue self.stream = stream + self.pull_sub = pull_sub self.extra_options = extra_options or {} super().__init__( @@ -79,12 +82,18 @@ def add_call( @override async def start(self, connection: Union[Client, JetStreamContext]) -> None: # type: ignore[override] - self.subscription = await connection.subscribe( - subject=self.subject, - queue=self.queue, - cb=self.consume, # type: ignore[arg-type] - **self.extra_options, - ) + if self.pull_sub is not None: + self.subscription = await connection.pull_subscribe( + subject=self.subject, + ) + # pull_subscribe doesn't have cb parameter + else: + self.subscription = await connection.subscribe( + subject=self.subject, + queue=self.queue, + cb=self.consume, # type: ignore[arg-type] + **self.extra_options, + ) async def close(self) -> None: if self.subscription is not None: diff --git a/faststream/nats/pull_sub.py b/faststream/nats/pull_sub.py new file mode 100644 index 0000000000..c22661839e --- /dev/null +++ b/faststream/nats/pull_sub.py @@ -0,0 +1,55 @@ +from typing import Any, List, Optional + +from nats.js.api import ( + DiscardPolicy, + ExternalStream, + Placement, + RePublish, + RetentionPolicy, + StorageType, + StreamConfig, + StreamSource, +) +from pydantic import Field + +from faststream.broker.schemas import NameRequired + +__all__ = ( + "PullSub", + # import to prevent Pydantic ForwardRef error + "RetentionPolicy", + "DiscardPolicy", + "StorageType", + "Placement", + "StreamSource", + "ExternalStream", + "RePublish", + "Optional", +) + + +class PullSub(NameRequired): + config: StreamConfig + + subjects: List[str] = Field(default_factory=list) + declare: bool = Field(default=True) + + # idk which parameters must be here.... + + def __init__( + self, + name: str, + *args: Any, + declare: bool = True, + **kwargs: Any, + ) -> None: + super().__init__( + name=name, + declare=declare, + subjects=[], + config=StreamConfig( + *args, + name=name, + **kwargs, # type: ignore[misc] + ), + ) From 19306827e7c5b621b4e0f98fcb0883060513265b Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 4 Nov 2023 01:13:23 +0200 Subject: [PATCH 02/18] task for fetch messages --- faststream/nats/broker.py | 22 +++++++-------- faststream/nats/handler.py | 13 ++++++++- faststream/nats/pull_sub.py | 2 ++ faststream/nats/pull_sub.pyi | 53 ++++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 12 deletions(-) create mode 100644 faststream/nats/pull_sub.pyi diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 7bd065343b..45b296a3d7 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -36,9 +36,9 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.helpers import stream_builder from faststream.nats.js_stream import JStream -from faststream.nats.pull_sub import PullSub from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import AnyDict, DecodedMessage from faststream.utils.context.main import context @@ -298,17 +298,17 @@ def subscriber( # type: ignore[override] extra_options: AnyDict = { "pending_msgs_limit": pending_msgs_limit - or ( - DEFAULT_JS_SUB_PENDING_MSGS_LIMIT - if stream - else DEFAULT_SUB_PENDING_MSGS_LIMIT - ), + or ( + DEFAULT_JS_SUB_PENDING_MSGS_LIMIT + if stream + else DEFAULT_SUB_PENDING_MSGS_LIMIT + ), "pending_bytes_limit": pending_bytes_limit - or ( - DEFAULT_JS_SUB_PENDING_BYTES_LIMIT - if stream - else DEFAULT_SUB_PENDING_BYTES_LIMIT - ), + or ( + DEFAULT_JS_SUB_PENDING_BYTES_LIMIT + if stream + else DEFAULT_SUB_PENDING_BYTES_LIMIT + ), } if stream: diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 5ecb4663ac..014227f066 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -1,3 +1,5 @@ +import asyncio + from typing import Any, Callable, Dict, Optional, Sequence, Union from fast_depends.core import CallModel @@ -29,6 +31,7 @@ class LogicNatsHandler(AsyncHandler[Msg]): subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription, JetStreamContext.PullSubscription]] + task: Optional["asyncio.Task[Any]"] = None def __init__( self, @@ -58,6 +61,7 @@ def __init__( title=title, ) + self.task = None self.subscription = None def add_call( @@ -85,8 +89,9 @@ async def start(self, connection: Union[Client, JetStreamContext]) -> None: # t if self.pull_sub is not None: self.subscription = await connection.pull_subscribe( subject=self.subject, + **self.extra_options ) - # pull_subscribe doesn't have cb parameter + asyncio.create_task(self._consume()) else: self.subscription = await connection.subscribe( subject=self.subject, @@ -100,6 +105,12 @@ async def close(self) -> None: await self.subscription.unsubscribe() self.subscription = None + async def _consume(self) -> None: + while self.subscription: + messages = await self.subscription.fetch(self.pull_sub.batch_size) + for message in messages: + await self.consume(message) + @staticmethod def get_routing_hash(subject: str) -> str: return subject diff --git a/faststream/nats/pull_sub.py b/faststream/nats/pull_sub.py index c22661839e..75719f61d0 100644 --- a/faststream/nats/pull_sub.py +++ b/faststream/nats/pull_sub.py @@ -39,6 +39,7 @@ class PullSub(NameRequired): def __init__( self, name: str, + batch_size: int, *args: Any, declare: bool = True, **kwargs: Any, @@ -47,6 +48,7 @@ def __init__( name=name, declare=declare, subjects=[], + batch_size=batch_size, config=StreamConfig( *args, name=name, diff --git a/faststream/nats/pull_sub.pyi b/faststream/nats/pull_sub.pyi new file mode 100644 index 0000000000..059c7b336e --- /dev/null +++ b/faststream/nats/pull_sub.pyi @@ -0,0 +1,53 @@ +from typing import List, Optional + +from nats.js.api import ( + DiscardPolicy, + Placement, + RePublish, + RetentionPolicy, + StorageType, + StreamConfig, + StreamSource, +) +from pydantic import Field + +from faststream.broker.schemas import NameRequired + +class PullSub(NameRequired): + config: StreamConfig + + subjects: List[str] = Field(default_factory=list) + declare: bool = Field(default=True) + + def __init__( + self, + name: str, + batch_size: int, + description: Optional[str] = None, + subjects: Optional[List[str]] = None, + retention: Optional[RetentionPolicy] = None, + max_consumers: Optional[int] = None, + max_msgs: Optional[int] = None, + max_bytes: Optional[int] = None, + discard: Optional[DiscardPolicy] = DiscardPolicy.OLD, + max_age: Optional[float] = None, # in seconds + max_msgs_per_subject: int = -1, + max_msg_size: Optional[int] = -1, + storage: Optional[StorageType] = None, + num_replicas: Optional[int] = None, + no_ack: bool = False, + template_owner: Optional[str] = None, + duplicate_window: float = 0, + placement: Optional[Placement] = None, + mirror: Optional[StreamSource] = None, + sources: Optional[List[StreamSource]] = None, + sealed: bool = False, + deny_delete: bool = False, + deny_purge: bool = False, + allow_rollup_hdrs: bool = False, + republish: Optional[RePublish] = None, + allow_direct: Optional[bool] = None, + mirror_direct: Optional[bool] = None, + # custom + declare: bool = True, + ) -> None: ... From 589122a1f8e66b54e974d157adf40d8e2892e606 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 4 Nov 2023 11:20:06 +0200 Subject: [PATCH 03/18] handle messages with `asyncio.gather` --- faststream/nats/handler.py | 3 +-- faststream/nats/pull_sub.py | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 014227f066..dd067da467 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -108,8 +108,7 @@ async def close(self) -> None: async def _consume(self) -> None: while self.subscription: messages = await self.subscription.fetch(self.pull_sub.batch_size) - for message in messages: - await self.consume(message) + await asyncio.gather(*[self.consume(message)] for message in messages) @staticmethod def get_routing_hash(subject: str) -> str: diff --git a/faststream/nats/pull_sub.py b/faststream/nats/pull_sub.py index 75719f61d0..71ac1389c2 100644 --- a/faststream/nats/pull_sub.py +++ b/faststream/nats/pull_sub.py @@ -10,9 +10,8 @@ StreamConfig, StreamSource, ) -from pydantic import Field +from pydantic import Field, BaseModel -from faststream.broker.schemas import NameRequired __all__ = ( "PullSub", @@ -28,7 +27,7 @@ ) -class PullSub(NameRequired): +class PullSub(BaseModel): config: StreamConfig subjects: List[str] = Field(default_factory=list) @@ -38,20 +37,17 @@ class PullSub(NameRequired): def __init__( self, - name: str, batch_size: int, *args: Any, declare: bool = True, **kwargs: Any, ) -> None: super().__init__( - name=name, declare=declare, subjects=[], batch_size=batch_size, config=StreamConfig( *args, - name=name, **kwargs, # type: ignore[misc] ), ) From 5df60653db7f2fdb5cc8efd90d4bffbfcdcae763 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 4 Nov 2023 15:48:40 +0200 Subject: [PATCH 04/18] object and kv storage watch --- faststream/nats/broker.py | 6 ++++ faststream/nats/handler.py | 30 +++++++++++++++++- faststream/nats/kv_watch.py | 60 ++++++++++++++++++++++++++++++++++++ faststream/nats/obj_watch.py | 56 +++++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 faststream/nats/kv_watch.py create mode 100644 faststream/nats/obj_watch.py diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 45b296a3d7..b94a2e68a0 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -39,6 +39,8 @@ from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer from faststream.nats.pull_sub import PullSub +from faststream.nats.kv_watch import KvWatch +from faststream.nats.obj_watch import ObjWatch from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import AnyDict, DecodedMessage from faststream.utils.context.main import context @@ -263,6 +265,8 @@ def subscriber( # type: ignore[override] max_msgs: int = 0, # JS arguments pull_sub: Optional[PullSub] = None, + kv_watch: Optional[KvWatch] = None, + obj_watch: Optional[ObjWatch] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, ordered_consumer: bool = False, @@ -340,6 +344,8 @@ def subscriber( # type: ignore[override] queue=queue, stream=stream, pull_sub=pull_sub, + kv_watch=kv_watch, + obj_watch=obj_watch, extra_options=extra_options, title=title, description=description, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index dd067da467..b4282c5430 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -7,6 +7,8 @@ from nats.aio.msg import Msg from nats.aio.subscription import Subscription from nats.js import JetStreamContext +from nats.js.kv import KeyValue +from nats.js.object_store import ObjectStore from faststream._compat import override from faststream.broker.handler import AsyncHandler @@ -23,6 +25,8 @@ from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.js_stream import JStream from faststream.nats.pull_sub import PullSub +from faststream.nats.kv_watch import KvWatch +from faststream.nats.obj_watch import ObjWatch from faststream.nats.message import NatsMessage from faststream.nats.parser import JsParser, Parser from faststream.types import AnyDict @@ -40,6 +44,8 @@ def __init__( queue: str = "", stream: Optional[JStream] = None, pull_sub: Optional[PullSub] = None, + kv_watch: Optional[KvWatch] = None, + obj_watch: Optional[ObjWatch] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, @@ -53,6 +59,8 @@ def __init__( self.stream = stream self.pull_sub = pull_sub + self.kv_watch = kv_watch + self.obj_watch = obj_watch self.extra_options = extra_options or {} super().__init__( @@ -91,7 +99,18 @@ async def start(self, connection: Union[Client, JetStreamContext]) -> None: # t subject=self.subject, **self.extra_options ) - asyncio.create_task(self._consume()) + self.task = asyncio.create_task(self._consume()) + + elif self.kv_watch is not None: + bucket = await connection.key_value(self.kv_watch.bucket) + watcher = await bucket.watch(self.kv_watch.keys, **self.extra_options) + self.task = asyncio.create_task(self._cosume_watch(watcher)) + + elif self.obj_watch is not None: + bucket = await connection.object_store(self.obj_watch.bucket) + watcher = await bucket.watch(**self.extra_options) + self.task = asyncio.create_task(self._cosume_watch(watcher)) + else: self.subscription = await connection.subscribe( subject=self.subject, @@ -105,11 +124,20 @@ async def close(self) -> None: await self.subscription.unsubscribe() self.subscription = None + if self.task is not None: + self.task.cancel() + self.task = None + async def _consume(self) -> None: while self.subscription: messages = await self.subscription.fetch(self.pull_sub.batch_size) await asyncio.gather(*[self.consume(message)] for message in messages) + async def _cosume_watch(self, watcher: Union[KeyValue.KeyWatcher, ObjectStore.ObjectWatcher]) -> None: + while self.subscription: + message = await watcher.updates() + await self.consume(message) + @staticmethod def get_routing_hash(subject: str) -> str: return subject diff --git a/faststream/nats/kv_watch.py b/faststream/nats/kv_watch.py new file mode 100644 index 0000000000..13a9d62f11 --- /dev/null +++ b/faststream/nats/kv_watch.py @@ -0,0 +1,60 @@ +from typing import Any, List, Optional + +from nats.js.api import ( + DiscardPolicy, + ExternalStream, + Placement, + RePublish, + RetentionPolicy, + StorageType, + StreamConfig, + StreamSource, +) +from pydantic import Field, BaseModel + +__all__ = ( + "KvWatch", + # import to prevent Pydantic ForwardRef error + "RetentionPolicy", + "DiscardPolicy", + "StorageType", + "Placement", + "StreamSource", + "ExternalStream", + "RePublish", + "Optional", +) + + +class KvWatch(BaseModel): + config: StreamConfig + + subjects: List[str] = Field(default_factory=list) + declare: bool = Field(default=True) + + def __init__( + self, + bucket: str, + keys, + headers_only: bool = False, + include_history: bool = False, + ignore_deletes: bool = False, + meta_only: bool = False, + *args: Any, + declare: bool = True, + **kwargs: Any, + ) -> None: + super().__init__( + declare=declare, + subjects=[], + keys=keys, + bucket=bucket, + headers_only=headers_only, + include_history=include_history, + ignore_deletes=ignore_deletes, + meta_only=meta_only, + config=StreamConfig( + *args, + **kwargs, # type: ignore[misc] + ), + ) diff --git a/faststream/nats/obj_watch.py b/faststream/nats/obj_watch.py new file mode 100644 index 0000000000..a5f28d6f2e --- /dev/null +++ b/faststream/nats/obj_watch.py @@ -0,0 +1,56 @@ +from typing import Any, List, Optional + +from nats.js.api import ( + DiscardPolicy, + ExternalStream, + Placement, + RePublish, + RetentionPolicy, + StorageType, + StreamConfig, + StreamSource, +) +from pydantic import Field, BaseModel + +__all__ = ( + "ObjWatch", + # import to prevent Pydantic ForwardRef error + "RetentionPolicy", + "DiscardPolicy", + "StorageType", + "Placement", + "StreamSource", + "ExternalStream", + "RePublish", + "Optional", +) + + +class ObjWatch(BaseModel): + config: StreamConfig + + subjects: List[str] = Field(default_factory=list) + declare: bool = Field(default=True) + + def __init__( + self, + bucket: str, + ignore_deletes: bool = False, + include_history: bool = False, + meta_only: bool = False, + *args: Any, + declare: bool = True, + **kwargs: Any, + ) -> None: + super().__init__( + declare=declare, + subjects=[], + bucket=bucket, + ignore_deletes=ignore_deletes, + include_history=include_history, + meta_only=meta_only, + config=StreamConfig( + *args, + **kwargs, # type: ignore[misc] + ), + ) From 6f535202edd2184b135fd641312da3e06fe845f5 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 11:15:59 +0300 Subject: [PATCH 05/18] fix: correct NATS pull subscriber --- faststream/nats/__init__.py | 2 ++ faststream/nats/broker.py | 7 ++++-- faststream/nats/broker.pyi | 2 ++ faststream/nats/fastapi.pyi | 2 ++ faststream/nats/handler.py | 19 +++++++++----- faststream/nats/kv_watch.py | 2 +- faststream/nats/obj_watch.py | 2 +- faststream/nats/pull_sub.py | 49 ++++++------------------------------ faststream/nats/router.pyi | 2 ++ faststream/nats/schemas.py | 15 +++++++++++ 10 files changed, 50 insertions(+), 52 deletions(-) create mode 100644 faststream/nats/schemas.py diff --git a/faststream/nats/__init__.py b/faststream/nats/__init__.py index 9efebcb5f7..43d7a0c1a5 100644 --- a/faststream/nats/__init__.py +++ b/faststream/nats/__init__.py @@ -17,6 +17,7 @@ from faststream.nats.annotations import NatsMessage from faststream.nats.broker import NatsBroker from faststream.nats.js_stream import JStream +from faststream.nats.pull_sub import PullSub from faststream.nats.router import NatsRouter from faststream.nats.shared.router import NatsRoute from faststream.nats.test import TestNatsBroker @@ -29,6 +30,7 @@ "NatsRouter", "NatsRoute", "JStream", + "PullSub", # Nats imports "ConsumerConfig", "DeliverPolicy", diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index b94a2e68a0..38cb5bd5fe 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -36,11 +36,11 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.helpers import stream_builder from faststream.nats.js_stream import JStream +from faststream.nats.kv_watch import KvWatch from faststream.nats.message import NatsMessage +from faststream.nats.obj_watch import ObjWatch from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer from faststream.nats.pull_sub import PullSub -from faststream.nats.kv_watch import KvWatch -from faststream.nats.obj_watch import ObjWatch from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import AnyDict, DecodedMessage from faststream.utils.context.main import context @@ -293,6 +293,9 @@ def subscriber( # type: ignore[override] ]: stream = stream_builder.stream(stream) + if pull_sub is not None and stream is None: + raise ValueError("Pull subscriber can be used only with a stream") + self._setup_log_context( queue=queue, subject=subject, diff --git a/faststream/nats/broker.pyi b/faststream/nats/broker.pyi index 8015a286e6..2c32deabb9 100644 --- a/faststream/nats/broker.pyi +++ b/faststream/nats/broker.pyi @@ -47,6 +47,7 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.logging import NatsLoggingMixin from faststream.types import DecodedMessage, SendableMessage @@ -226,6 +227,7 @@ class NatsBroker( max_msgs: int = 0, ack_first: bool = False, # JS arguments + pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, diff --git a/faststream/nats/fastapi.pyi b/faststream/nats/fastapi.pyi index f65a96e111..d2d36980b6 100644 --- a/faststream/nats/fastapi.pyi +++ b/faststream/nats/fastapi.pyi @@ -58,6 +58,7 @@ from faststream.nats.asyncapi import Publisher from faststream.nats.broker import NatsBroker from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage +from faststream.nats.pull_sub import PullSub class NatsRouter(StreamRouter[Msg]): broker_class = NatsBroker @@ -189,6 +190,7 @@ class NatsRouter(StreamRouter[Msg]): max_msgs: int = 0, ack_first: bool = False, # JS arguments + pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index b4282c5430..ffbc00455b 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -1,5 +1,4 @@ import asyncio - from typing import Any, Callable, Dict, Optional, Sequence, Union from fast_depends.core import CallModel @@ -24,13 +23,15 @@ ) from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.js_stream import JStream -from faststream.nats.pull_sub import PullSub from faststream.nats.kv_watch import KvWatch -from faststream.nats.obj_watch import ObjWatch from faststream.nats.message import NatsMessage +from faststream.nats.obj_watch import ObjWatch from faststream.nats.parser import JsParser, Parser +from faststream.nats.pull_sub import PullSub +from faststream.nats.schemas import PullSubKwargs from faststream.types import AnyDict from faststream.utils.context.path import compile_path +from faststream.utils.data import filter_by_dict class LogicNatsHandler(AsyncHandler[Msg]): @@ -95,9 +96,11 @@ def add_call( @override async def start(self, connection: Union[Client, JetStreamContext]) -> None: # type: ignore[override] if self.pull_sub is not None: + if self.stream is None: + raise ValueError("Pull subscriber can be used only with a stream") self.subscription = await connection.pull_subscribe( subject=self.subject, - **self.extra_options + **filter_by_dict(PullSubKwargs, self.extra_options) ) self.task = asyncio.create_task(self._consume()) @@ -130,8 +133,12 @@ async def close(self) -> None: async def _consume(self) -> None: while self.subscription: - messages = await self.subscription.fetch(self.pull_sub.batch_size) - await asyncio.gather(*[self.consume(message)] for message in messages) + assert self.pull_sub + messages = await self.subscription.fetch( + batch=self.pull_sub.batch_size, + timeout=self.pull_sub.timeout, + ) + await asyncio.gather(*(self.consume(m) for m in messages)) async def _cosume_watch(self, watcher: Union[KeyValue.KeyWatcher, ObjectStore.ObjectWatcher]) -> None: while self.subscription: diff --git a/faststream/nats/kv_watch.py b/faststream/nats/kv_watch.py index 13a9d62f11..0c3d5e03ba 100644 --- a/faststream/nats/kv_watch.py +++ b/faststream/nats/kv_watch.py @@ -10,7 +10,7 @@ StreamConfig, StreamSource, ) -from pydantic import Field, BaseModel +from pydantic import BaseModel, Field __all__ = ( "KvWatch", diff --git a/faststream/nats/obj_watch.py b/faststream/nats/obj_watch.py index a5f28d6f2e..8c598edfcf 100644 --- a/faststream/nats/obj_watch.py +++ b/faststream/nats/obj_watch.py @@ -10,7 +10,7 @@ StreamConfig, StreamSource, ) -from pydantic import Field, BaseModel +from pydantic import BaseModel, Field __all__ = ( "ObjWatch", diff --git a/faststream/nats/pull_sub.py b/faststream/nats/pull_sub.py index 71ac1389c2..4454a0e0e5 100644 --- a/faststream/nats/pull_sub.py +++ b/faststream/nats/pull_sub.py @@ -1,53 +1,18 @@ -from typing import Any, List, Optional +from typing import Optional -from nats.js.api import ( - DiscardPolicy, - ExternalStream, - Placement, - RePublish, - RetentionPolicy, - StorageType, - StreamConfig, - StreamSource, -) -from pydantic import Field, BaseModel - - -__all__ = ( - "PullSub", - # import to prevent Pydantic ForwardRef error - "RetentionPolicy", - "DiscardPolicy", - "StorageType", - "Placement", - "StreamSource", - "ExternalStream", - "RePublish", - "Optional", -) +from pydantic import BaseModel, Field class PullSub(BaseModel): - config: StreamConfig - - subjects: List[str] = Field(default_factory=list) - declare: bool = Field(default=True) - - # idk which parameters must be here.... + batch_size: int = Field(default=1) + timeout: Optional[float] = Field(default=5.0) def __init__( self, - batch_size: int, - *args: Any, - declare: bool = True, - **kwargs: Any, + batch_size: int = 1, + timeout: Optional[float] = 5.0, ) -> None: super().__init__( - declare=declare, - subjects=[], batch_size=batch_size, - config=StreamConfig( - *args, - **kwargs, # type: ignore[misc] - ), + timeout=timeout, ) diff --git a/faststream/nats/router.pyi b/faststream/nats/router.pyi index 7dab1e732c..508c1d631d 100644 --- a/faststream/nats/router.pyi +++ b/faststream/nats/router.pyi @@ -18,6 +18,7 @@ from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.asyncapi import Publisher from faststream.nats.js_stream import JStream from faststream.nats.message import NatsMessage +from faststream.nats.pull_sub import PullSub from faststream.nats.shared.router import NatsRoute from faststream.nats.shared.router import NatsRouter as BaseRouter @@ -65,6 +66,7 @@ class NatsRouter(BaseRouter): max_msgs: int = 0, ack_first: bool = False, # JS arguments + pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, diff --git a/faststream/nats/schemas.py b/faststream/nats/schemas.py new file mode 100644 index 0000000000..3856569912 --- /dev/null +++ b/faststream/nats/schemas.py @@ -0,0 +1,15 @@ +from typing import Optional + +from nats.js import api + +from faststream._compat import TypedDict + + +class PullSubKwargs(TypedDict, total=False): + subject: str + durable: Optional[str] + stream: Optional[str] + config: Optional[api.ConsumerConfig] + pending_msgs_limit: int + pending_bytes_limit: int + inbox_prefix: bytes From 0ad12bf7521d6c8d5c97b59bcb76d1041904b60b Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 5 Nov 2023 14:53:49 +0200 Subject: [PATCH 06/18] fix: correct `PullSub` stub file --- faststream/nats/pull_sub.pyi | 53 ++++-------------------------------- 1 file changed, 6 insertions(+), 47 deletions(-) diff --git a/faststream/nats/pull_sub.pyi b/faststream/nats/pull_sub.pyi index 059c7b336e..05ebb821d9 100644 --- a/faststream/nats/pull_sub.pyi +++ b/faststream/nats/pull_sub.pyi @@ -1,53 +1,12 @@ -from typing import List, Optional +from typing import Optional -from nats.js.api import ( - DiscardPolicy, - Placement, - RePublish, - RetentionPolicy, - StorageType, - StreamConfig, - StreamSource, -) -from pydantic import Field - -from faststream.broker.schemas import NameRequired - -class PullSub(NameRequired): - config: StreamConfig - - subjects: List[str] = Field(default_factory=list) - declare: bool = Field(default=True) +from pydantic import BaseModel, Field +class PullSub(BaseModel): + batch_size: int = Field(default=1) + timeout: Optional[float] = Field(default=5.0) def __init__( self, - name: str, batch_size: int, - description: Optional[str] = None, - subjects: Optional[List[str]] = None, - retention: Optional[RetentionPolicy] = None, - max_consumers: Optional[int] = None, - max_msgs: Optional[int] = None, - max_bytes: Optional[int] = None, - discard: Optional[DiscardPolicy] = DiscardPolicy.OLD, - max_age: Optional[float] = None, # in seconds - max_msgs_per_subject: int = -1, - max_msg_size: Optional[int] = -1, - storage: Optional[StorageType] = None, - num_replicas: Optional[int] = None, - no_ack: bool = False, - template_owner: Optional[str] = None, - duplicate_window: float = 0, - placement: Optional[Placement] = None, - mirror: Optional[StreamSource] = None, - sources: Optional[List[StreamSource]] = None, - sealed: bool = False, - deny_delete: bool = False, - deny_purge: bool = False, - allow_rollup_hdrs: bool = False, - republish: Optional[RePublish] = None, - allow_direct: Optional[bool] = None, - mirror_direct: Optional[bool] = None, - # custom - declare: bool = True, + timeout: float ) -> None: ... From 589b0f851d3d07b8ab223386f8fc23881600e443 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 5 Nov 2023 15:00:00 +0200 Subject: [PATCH 07/18] fix: missed Optional in NATS `PullSub` stub file --- faststream/nats/pull_sub.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/nats/pull_sub.pyi b/faststream/nats/pull_sub.pyi index 05ebb821d9..ecbe65c5ca 100644 --- a/faststream/nats/pull_sub.pyi +++ b/faststream/nats/pull_sub.pyi @@ -8,5 +8,5 @@ class PullSub(BaseModel): def __init__( self, batch_size: int, - timeout: float + timeout: Optional[float], ) -> None: ... From f719b022ca0e397fe7dac814ff001780503860c2 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 18:51:48 +0300 Subject: [PATCH 08/18] fix: process NATS pull timeout --- faststream/nats/broker.py | 28 ++++++++++++++----- faststream/nats/broker.pyi | 4 ++- faststream/nats/fastapi.pyi | 4 ++- faststream/nats/handler.py | 21 ++++++++------ faststream/nats/pull_sub.pyi | 53 ------------------------------------ faststream/nats/router.pyi | 4 ++- faststream/nats/schemas.py | 15 ---------- 7 files changed, 42 insertions(+), 87 deletions(-) delete mode 100644 faststream/nats/pull_sub.pyi delete mode 100644 faststream/nats/schemas.py diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 38cb5bd5fe..84b6cbee82 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -264,7 +264,6 @@ def subscriber( # type: ignore[override] # Core arguments max_msgs: int = 0, # JS arguments - pull_sub: Optional[PullSub] = None, kv_watch: Optional[KvWatch] = None, obj_watch: Optional[ObjWatch] = None, durable: Optional[str] = None, @@ -274,6 +273,9 @@ def subscriber( # type: ignore[override] flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # custom ack_first: bool = False, stream: Union[str, JStream, None] = None, @@ -324,14 +326,26 @@ def subscriber( # type: ignore[override] "durable": durable, "stream": stream.name, "config": config, - "ordered_consumer": ordered_consumer, - "idle_heartbeat": idle_heartbeat, - "flow_control": flow_control, - "deliver_policy": deliver_policy, - "headers_only": headers_only, - "manual_ack": not ack_first, } ) + + if pull_sub is not None: + extra_options.update({ + "inbox_prefix": inbox_prefix + }) + + else: + extra_options.update( + { + "ordered_consumer": ordered_consumer, + "idle_heartbeat": idle_heartbeat, + "flow_control": flow_control, + "deliver_policy": deliver_policy, + "headers_only": headers_only, + "manual_ack": not ack_first, + } + ) + else: extra_options.update( { diff --git a/faststream/nats/broker.pyi b/faststream/nats/broker.pyi index 2c32deabb9..5e3c44df6f 100644 --- a/faststream/nats/broker.pyi +++ b/faststream/nats/broker.pyi @@ -227,7 +227,6 @@ class NatsBroker( max_msgs: int = 0, ack_first: bool = False, # JS arguments - pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, @@ -236,6 +235,9 @@ class NatsBroker( flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/faststream/nats/fastapi.pyi b/faststream/nats/fastapi.pyi index d2d36980b6..d9da1afcd2 100644 --- a/faststream/nats/fastapi.pyi +++ b/faststream/nats/fastapi.pyi @@ -190,7 +190,6 @@ class NatsRouter(StreamRouter[Msg]): max_msgs: int = 0, ack_first: bool = False, # JS arguments - pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, @@ -199,6 +198,9 @@ class NatsRouter(StreamRouter[Msg]): flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index ffbc00455b..26b0b533c0 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -1,10 +1,12 @@ import asyncio +from contextlib import suppress from typing import Any, Callable, Dict, Optional, Sequence, Union from fast_depends.core import CallModel from nats.aio.client import Client from nats.aio.msg import Msg from nats.aio.subscription import Subscription +from nats.errors import TimeoutError from nats.js import JetStreamContext from nats.js.kv import KeyValue from nats.js.object_store import ObjectStore @@ -28,10 +30,8 @@ from faststream.nats.obj_watch import ObjWatch from faststream.nats.parser import JsParser, Parser from faststream.nats.pull_sub import PullSub -from faststream.nats.schemas import PullSubKwargs from faststream.types import AnyDict from faststream.utils.context.path import compile_path -from faststream.utils.data import filter_by_dict class LogicNatsHandler(AsyncHandler[Msg]): @@ -98,9 +98,10 @@ async def start(self, connection: Union[Client, JetStreamContext]) -> None: # t if self.pull_sub is not None: if self.stream is None: raise ValueError("Pull subscriber can be used only with a stream") + self.subscription = await connection.pull_subscribe( subject=self.subject, - **filter_by_dict(PullSubKwargs, self.extra_options) + **self.extra_options, ) self.task = asyncio.create_task(self._consume()) @@ -132,13 +133,15 @@ async def close(self) -> None: self.task = None async def _consume(self) -> None: + assert self.pull_sub + while self.subscription: - assert self.pull_sub - messages = await self.subscription.fetch( - batch=self.pull_sub.batch_size, - timeout=self.pull_sub.timeout, - ) - await asyncio.gather(*(self.consume(m) for m in messages)) + with suppress(TimeoutError): + messages = await self.subscription.fetch( + batch=self.pull_sub.batch_size, + timeout=self.pull_sub.timeout, + ) + await asyncio.gather(*(self.consume(m) for m in messages)) async def _cosume_watch(self, watcher: Union[KeyValue.KeyWatcher, ObjectStore.ObjectWatcher]) -> None: while self.subscription: diff --git a/faststream/nats/pull_sub.pyi b/faststream/nats/pull_sub.pyi deleted file mode 100644 index 059c7b336e..0000000000 --- a/faststream/nats/pull_sub.pyi +++ /dev/null @@ -1,53 +0,0 @@ -from typing import List, Optional - -from nats.js.api import ( - DiscardPolicy, - Placement, - RePublish, - RetentionPolicy, - StorageType, - StreamConfig, - StreamSource, -) -from pydantic import Field - -from faststream.broker.schemas import NameRequired - -class PullSub(NameRequired): - config: StreamConfig - - subjects: List[str] = Field(default_factory=list) - declare: bool = Field(default=True) - - def __init__( - self, - name: str, - batch_size: int, - description: Optional[str] = None, - subjects: Optional[List[str]] = None, - retention: Optional[RetentionPolicy] = None, - max_consumers: Optional[int] = None, - max_msgs: Optional[int] = None, - max_bytes: Optional[int] = None, - discard: Optional[DiscardPolicy] = DiscardPolicy.OLD, - max_age: Optional[float] = None, # in seconds - max_msgs_per_subject: int = -1, - max_msg_size: Optional[int] = -1, - storage: Optional[StorageType] = None, - num_replicas: Optional[int] = None, - no_ack: bool = False, - template_owner: Optional[str] = None, - duplicate_window: float = 0, - placement: Optional[Placement] = None, - mirror: Optional[StreamSource] = None, - sources: Optional[List[StreamSource]] = None, - sealed: bool = False, - deny_delete: bool = False, - deny_purge: bool = False, - allow_rollup_hdrs: bool = False, - republish: Optional[RePublish] = None, - allow_direct: Optional[bool] = None, - mirror_direct: Optional[bool] = None, - # custom - declare: bool = True, - ) -> None: ... diff --git a/faststream/nats/router.pyi b/faststream/nats/router.pyi index 508c1d631d..50d5fa0d28 100644 --- a/faststream/nats/router.pyi +++ b/faststream/nats/router.pyi @@ -66,7 +66,6 @@ class NatsRouter(BaseRouter): max_msgs: int = 0, ack_first: bool = False, # JS arguments - pull_sub: Optional[PullSub] = None, stream: Union[str, JStream, None] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, @@ -75,6 +74,9 @@ class NatsRouter(BaseRouter): flow_control: bool = False, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, + # pull arguments + pull_sub: Optional[PullSub] = None, + inbox_prefix: bytes = api.INBOX_PREFIX, # broker arguments dependencies: Sequence[Depends] = (), parser: Optional[CustomParser[Msg, NatsMessage]] = None, diff --git a/faststream/nats/schemas.py b/faststream/nats/schemas.py deleted file mode 100644 index 3856569912..0000000000 --- a/faststream/nats/schemas.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import Optional - -from nats.js import api - -from faststream._compat import TypedDict - - -class PullSubKwargs(TypedDict, total=False): - subject: str - durable: Optional[str] - stream: Optional[str] - config: Optional[api.ConsumerConfig] - pending_msgs_limit: int - pending_bytes_limit: int - inbox_prefix: bytes From 12edb3bd642ae1ef5919dfdcc9c76a5969a2d597 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 19:06:56 +0300 Subject: [PATCH 09/18] test: add PullSub tests --- examples/nats/e09_pull_sub.py | 14 ++++++++++++ tests/brokers/nats/test_consume.py | 29 +++++++++++++++++++++++- tests/brokers/nats/test_test_client.py | 16 ++++++++++++- tests/examples/nats/test_e09_pull_sub.py | 13 +++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 examples/nats/e09_pull_sub.py create mode 100644 tests/examples/nats/test_e09_pull_sub.py diff --git a/examples/nats/e09_pull_sub.py b/examples/nats/e09_pull_sub.py new file mode 100644 index 0000000000..887d7dab91 --- /dev/null +++ b/examples/nats/e09_pull_sub.py @@ -0,0 +1,14 @@ +from faststream import FastStream, Logger +from faststream.nats import NatsBroker, PullSub + +broker = NatsBroker() +app = FastStream(broker) + + +@broker.subscriber( + subject="test", + stream="stream", + pull_sub=PullSub(batch_size=10), +) +async def handle(msg, logger: Logger): + logger.info(msg) diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index b7a5e1cdc2..268303f3ca 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -5,7 +5,7 @@ from nats.aio.msg import Msg from faststream.exceptions import AckMessage -from faststream.nats import JStream, NatsBroker +from faststream.nats import JStream, NatsBroker, PullSub from faststream.nats.annotations import NatsMessage from tests.brokers.base.consume import BrokerRealConsumeTestcase from tests.tools import spy_decorator @@ -37,6 +37,33 @@ def subscriber(m): assert event.is_set() + async def test_consume_pull( + self, + queue: str, + consume_broker: NatsBroker, + stream: JStream, + event: asyncio.Event, + mock, + ): + @consume_broker.subscriber(queue, stream=stream, pull_sub=PullSub(1)) + def subscriber(m): + mock(m) + event.set() + + await consume_broker.start() + await asyncio.wait( + ( + asyncio.create_task( + consume_broker.publish("hello", queue, stream=stream.name) + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with("hello") + @pytest.mark.asyncio async def test_consume_ack( self, diff --git a/tests/brokers/nats/test_test_client.py b/tests/brokers/nats/test_test_client.py index 165f936e82..21afc44fbf 100644 --- a/tests/brokers/nats/test_test_client.py +++ b/tests/brokers/nats/test_test_client.py @@ -3,7 +3,7 @@ import pytest from faststream import BaseMiddleware -from faststream.nats import JStream, NatsBroker, TestNatsBroker +from faststream.nats import JStream, NatsBroker, TestNatsBroker, PullSub from tests.brokers.base.testclient import BrokerTestclientTestcase @@ -135,3 +135,17 @@ def subscriber(): await test_broker.start() await test_broker.publish("hello", "test.a.subj.b.c") subscriber.mock.assert_called_once_with("hello") + + async def test_consume_pull( + self, + queue: str, + test_broker: NatsBroker, + stream: JStream, + ): + @test_broker.subscriber(queue, stream=stream, pull_sub=PullSub(1)) + def subscriber(m): + ... + + await test_broker.start() + await test_broker.publish("hello", queue) + subscriber.mock.assert_called_once_with("hello") diff --git a/tests/examples/nats/test_e09_pull_sub.py b/tests/examples/nats/test_e09_pull_sub.py new file mode 100644 index 0000000000..a13f2f6566 --- /dev/null +++ b/tests/examples/nats/test_e09_pull_sub.py @@ -0,0 +1,13 @@ +import pytest + +from faststream.nats import TestApp, TestNatsBroker + + +@pytest.mark.asyncio +async def test_basic(): + from examples.nats.e09_pull_sub import app, broker, handle + + async with TestNatsBroker(broker): + async with TestApp(app): + await broker.publish("Hi!", "test") + handle.mock.assert_called_once_with("Hi!") From f0519a820e58f71eebc5369cf7040462e4541fd9 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 19:52:35 +0300 Subject: [PATCH 10/18] refactore: remove watch subscriber --- faststream/nats/broker.py | 6 ---- faststream/nats/handler.py | 16 ---------- faststream/nats/kv_watch.py | 60 ------------------------------------ faststream/nats/obj_watch.py | 56 --------------------------------- 4 files changed, 138 deletions(-) delete mode 100644 faststream/nats/kv_watch.py delete mode 100644 faststream/nats/obj_watch.py diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 84b6cbee82..3ffab185e7 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -36,9 +36,7 @@ from faststream.nats.asyncapi import Handler, Publisher from faststream.nats.helpers import stream_builder from faststream.nats.js_stream import JStream -from faststream.nats.kv_watch import KvWatch from faststream.nats.message import NatsMessage -from faststream.nats.obj_watch import ObjWatch from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer from faststream.nats.pull_sub import PullSub from faststream.nats.shared.logging import NatsLoggingMixin @@ -264,8 +262,6 @@ def subscriber( # type: ignore[override] # Core arguments max_msgs: int = 0, # JS arguments - kv_watch: Optional[KvWatch] = None, - obj_watch: Optional[ObjWatch] = None, durable: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, ordered_consumer: bool = False, @@ -361,8 +357,6 @@ def subscriber( # type: ignore[override] queue=queue, stream=stream, pull_sub=pull_sub, - kv_watch=kv_watch, - obj_watch=obj_watch, extra_options=extra_options, title=title, description=description, diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 26b0b533c0..3bf036963b 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -25,9 +25,7 @@ ) from faststream.broker.wrapper import HandlerCallWrapper from faststream.nats.js_stream import JStream -from faststream.nats.kv_watch import KvWatch from faststream.nats.message import NatsMessage -from faststream.nats.obj_watch import ObjWatch from faststream.nats.parser import JsParser, Parser from faststream.nats.pull_sub import PullSub from faststream.types import AnyDict @@ -45,8 +43,6 @@ def __init__( queue: str = "", stream: Optional[JStream] = None, pull_sub: Optional[PullSub] = None, - kv_watch: Optional[KvWatch] = None, - obj_watch: Optional[ObjWatch] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, @@ -60,8 +56,6 @@ def __init__( self.stream = stream self.pull_sub = pull_sub - self.kv_watch = kv_watch - self.obj_watch = obj_watch self.extra_options = extra_options or {} super().__init__( @@ -105,16 +99,6 @@ async def start(self, connection: Union[Client, JetStreamContext]) -> None: # t ) self.task = asyncio.create_task(self._consume()) - elif self.kv_watch is not None: - bucket = await connection.key_value(self.kv_watch.bucket) - watcher = await bucket.watch(self.kv_watch.keys, **self.extra_options) - self.task = asyncio.create_task(self._cosume_watch(watcher)) - - elif self.obj_watch is not None: - bucket = await connection.object_store(self.obj_watch.bucket) - watcher = await bucket.watch(**self.extra_options) - self.task = asyncio.create_task(self._cosume_watch(watcher)) - else: self.subscription = await connection.subscribe( subject=self.subject, diff --git a/faststream/nats/kv_watch.py b/faststream/nats/kv_watch.py deleted file mode 100644 index 0c3d5e03ba..0000000000 --- a/faststream/nats/kv_watch.py +++ /dev/null @@ -1,60 +0,0 @@ -from typing import Any, List, Optional - -from nats.js.api import ( - DiscardPolicy, - ExternalStream, - Placement, - RePublish, - RetentionPolicy, - StorageType, - StreamConfig, - StreamSource, -) -from pydantic import BaseModel, Field - -__all__ = ( - "KvWatch", - # import to prevent Pydantic ForwardRef error - "RetentionPolicy", - "DiscardPolicy", - "StorageType", - "Placement", - "StreamSource", - "ExternalStream", - "RePublish", - "Optional", -) - - -class KvWatch(BaseModel): - config: StreamConfig - - subjects: List[str] = Field(default_factory=list) - declare: bool = Field(default=True) - - def __init__( - self, - bucket: str, - keys, - headers_only: bool = False, - include_history: bool = False, - ignore_deletes: bool = False, - meta_only: bool = False, - *args: Any, - declare: bool = True, - **kwargs: Any, - ) -> None: - super().__init__( - declare=declare, - subjects=[], - keys=keys, - bucket=bucket, - headers_only=headers_only, - include_history=include_history, - ignore_deletes=ignore_deletes, - meta_only=meta_only, - config=StreamConfig( - *args, - **kwargs, # type: ignore[misc] - ), - ) diff --git a/faststream/nats/obj_watch.py b/faststream/nats/obj_watch.py deleted file mode 100644 index 8c598edfcf..0000000000 --- a/faststream/nats/obj_watch.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Any, List, Optional - -from nats.js.api import ( - DiscardPolicy, - ExternalStream, - Placement, - RePublish, - RetentionPolicy, - StorageType, - StreamConfig, - StreamSource, -) -from pydantic import BaseModel, Field - -__all__ = ( - "ObjWatch", - # import to prevent Pydantic ForwardRef error - "RetentionPolicy", - "DiscardPolicy", - "StorageType", - "Placement", - "StreamSource", - "ExternalStream", - "RePublish", - "Optional", -) - - -class ObjWatch(BaseModel): - config: StreamConfig - - subjects: List[str] = Field(default_factory=list) - declare: bool = Field(default=True) - - def __init__( - self, - bucket: str, - ignore_deletes: bool = False, - include_history: bool = False, - meta_only: bool = False, - *args: Any, - declare: bool = True, - **kwargs: Any, - ) -> None: - super().__init__( - declare=declare, - subjects=[], - bucket=bucket, - ignore_deletes=ignore_deletes, - include_history=include_history, - meta_only=meta_only, - config=StreamConfig( - *args, - **kwargs, # type: ignore[misc] - ), - ) From 551d8ebca99fe5507d37bda576d8c142ee1d4331 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 19:53:50 +0300 Subject: [PATCH 11/18] chore: bump version --- faststream/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/__about__.py b/faststream/__about__.py index 844c7183db..368244d5bc 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices""" -__version__ = "0.2.11" +__version__ = "0.2.12" INSTALL_YAML = """ From 4b64e4e076b07c1c262a8eaf400f4daf4852c0b8 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 20:01:59 +0300 Subject: [PATCH 12/18] lint: fix mypy --- docs/docs/SUMMARY.md | 1 + docs/docs/en/nats/jetstream/pull.md | 0 docs/docs/summary_template.txt | 1 + faststream/nats/broker.py | 4 +--- faststream/nats/handler.py | 25 ++++++++++++++----------- tests/brokers/nats/test_test_client.py | 2 +- 6 files changed, 18 insertions(+), 15 deletions(-) create mode 100644 docs/docs/en/nats/jetstream/pull.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 1c8f0b39e0..89d851c87e 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -65,6 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) + - [Pull Subscriber](nast/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) diff --git a/docs/docs/en/nats/jetstream/pull.md b/docs/docs/en/nats/jetstream/pull.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/docs/summary_template.txt b/docs/docs/summary_template.txt index 5ae6fd0fd1..7ff5687846 100644 --- a/docs/docs/summary_template.txt +++ b/docs/docs/summary_template.txt @@ -65,6 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) + - [Pull Subscriber](nast/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index 3ffab185e7..7dc520e515 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -326,9 +326,7 @@ def subscriber( # type: ignore[override] ) if pull_sub is not None: - extra_options.update({ - "inbox_prefix": inbox_prefix - }) + extra_options.update({"inbox_prefix": inbox_prefix}) else: extra_options.update( diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 3bf036963b..89903528cc 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -1,6 +1,6 @@ import asyncio from contextlib import suppress -from typing import Any, Callable, Dict, Optional, Sequence, Union +from typing import Any, Callable, Dict, Optional, Sequence, Union, cast from fast_depends.core import CallModel from nats.aio.client import Client @@ -8,8 +8,6 @@ from nats.aio.subscription import Subscription from nats.errors import TimeoutError from nats.js import JetStreamContext -from nats.js.kv import KeyValue -from nats.js.object_store import ObjectStore from faststream._compat import override from faststream.broker.handler import AsyncHandler @@ -33,7 +31,13 @@ class LogicNatsHandler(AsyncHandler[Msg]): - subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription, JetStreamContext.PullSubscription]] + subscription: Optional[ + Union[ + Subscription, + JetStreamContext.PushSubscription, + JetStreamContext.PullSubscription, + ] + ] task: Optional["asyncio.Task[Any]"] = None def __init__( @@ -90,6 +94,8 @@ def add_call( @override async def start(self, connection: Union[Client, JetStreamContext]) -> None: # type: ignore[override] if self.pull_sub is not None: + connection = cast(JetStreamContext, connection) + if self.stream is None: raise ValueError("Pull subscriber can be used only with a stream") @@ -119,19 +125,16 @@ async def close(self) -> None: async def _consume(self) -> None: assert self.pull_sub - while self.subscription: + sub = cast(JetStreamContext.PullSubscription, self.subscription) + + while self.subscription is not None: with suppress(TimeoutError): - messages = await self.subscription.fetch( + messages = await sub.fetch( batch=self.pull_sub.batch_size, timeout=self.pull_sub.timeout, ) await asyncio.gather(*(self.consume(m) for m in messages)) - async def _cosume_watch(self, watcher: Union[KeyValue.KeyWatcher, ObjectStore.ObjectWatcher]) -> None: - while self.subscription: - message = await watcher.updates() - await self.consume(message) - @staticmethod def get_routing_hash(subject: str) -> str: return subject diff --git a/tests/brokers/nats/test_test_client.py b/tests/brokers/nats/test_test_client.py index 21afc44fbf..832a82e1eb 100644 --- a/tests/brokers/nats/test_test_client.py +++ b/tests/brokers/nats/test_test_client.py @@ -3,7 +3,7 @@ import pytest from faststream import BaseMiddleware -from faststream.nats import JStream, NatsBroker, TestNatsBroker, PullSub +from faststream.nats import JStream, NatsBroker, PullSub, TestNatsBroker from tests.brokers.base.testclient import BrokerTestclientTestcase From b1bee643ddd7b11ffb8d31fa1e0d654cd42f50a3 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sun, 5 Nov 2023 21:16:19 +0300 Subject: [PATCH 13/18] docs: add NATS Pull page --- docs/docs/SUMMARY.md | 2 +- docs/docs/en/nats/jetstream/pull.md | 3 +++ docs/docs/summary_template.txt | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 89d851c87e..03088c528d 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -65,7 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) - - [Pull Subscriber](nast/jetstream/pull.md) + - [Pull Subscriber](nats/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) diff --git a/docs/docs/en/nats/jetstream/pull.md b/docs/docs/en/nats/jetstream/pull.md index e69de29bb2..2d8283e51e 100644 --- a/docs/docs/en/nats/jetstream/pull.md +++ b/docs/docs/en/nats/jetstream/pull.md @@ -0,0 +1,3 @@ +# Pull Subscriber + +**NATS JetStream** supports two various way to consume messages: **Push** and [**Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers. diff --git a/docs/docs/summary_template.txt b/docs/docs/summary_template.txt index 7ff5687846..7927242b03 100644 --- a/docs/docs/summary_template.txt +++ b/docs/docs/summary_template.txt @@ -65,7 +65,7 @@ - [Direct](nats/examples/direct.md) - [Pattern](nats/examples/pattern.md) - [JetStream](nats/jetstream/index.md) - - [Pull Subscriber](nast/jetstream/pull.md) + - [Pull Subscriber](nats/jetstream/pull.md) - [Key-Value Storage](nats/jetstream/key-value.md) - [Object Storage](nats/jetstream/object.md) - [Acknowledgement](nats/jetstream/ack.md) From 269290e9f3bba85aea67710ff0ec152b94c0e63b Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Tue, 7 Nov 2023 22:01:55 +0300 Subject: [PATCH 14/18] docs: fill NATS PULL consumer page --- docs/docs/en/nats/jetstream/pull.md | 24 ++++++++++++++++++++++++ docs/docs_src/nats/js/pull_sub.py | 14 ++++++++++++++ faststream/nats/handler.py | 2 +- tests/docs/nats/js/test_pull_sub.py | 13 +++++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 docs/docs_src/nats/js/pull_sub.py create mode 100644 tests/docs/nats/js/test_pull_sub.py diff --git a/docs/docs/en/nats/jetstream/pull.md b/docs/docs/en/nats/jetstream/pull.md index 2d8283e51e..76fdb99f8f 100644 --- a/docs/docs/en/nats/jetstream/pull.md +++ b/docs/docs/en/nats/jetstream/pull.md @@ -1,3 +1,27 @@ # Pull Subscriber +## Overview + **NATS JetStream** supports two various way to consume messages: **Push** and [**Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers. + +**Push** consumer is using by default to consume messages with the **FastStream**. It means, that **NATS** server delivers messages to your consumer as far as possible by itself. But also it means, that **NATS** should control all current consumer connections and increase server load. + +Thus, **Pull** consumer is a recommended by *NATS TEAM* way to consume JetStream messages. Using it, you just asking **NATS** for a new messages with a some interval. It sounds a little bit worse than the automatic message delivery, but it provides you with a several advantages: + +* consumer scaling without *queue group* +* handle messages in batches +* reduce **NATS** server load + +So, if you want to consume a huge messages flow without strict time limitations, **Pull** consumer is the right choice for you. + +## FastStream details + +**Pull** consumer is just a regular *Stream* consumer, but with the `pull_sub` argument, controls consuming messages batch size and block interval. + +```python linenums="1" hl_lines="10-11" +{!> docs_src/nats/js/pull_sub.py !} +``` + +Batch size doesn't mean that your `msg` argument is a list of messages, but it means, that you consume up to `10` messages for one request to **NATS** and call your handler per each message in an `asyncio.gather` pool. + +So, your subject will be processed much faster, without blocking per message processing. But, if your subject has less than `10` messages, your reguest to **NATS** will be blocked for `timeout` (5 seconds by default) trying to collect required messages amount. So, you should choise `batch_size` and `timeout` accurately to optimize your consumer efficiency. diff --git a/docs/docs_src/nats/js/pull_sub.py b/docs/docs_src/nats/js/pull_sub.py new file mode 100644 index 0000000000..887d7dab91 --- /dev/null +++ b/docs/docs_src/nats/js/pull_sub.py @@ -0,0 +1,14 @@ +from faststream import FastStream, Logger +from faststream.nats import NatsBroker, PullSub + +broker = NatsBroker() +app = FastStream(broker) + + +@broker.subscriber( + subject="test", + stream="stream", + pull_sub=PullSub(batch_size=10), +) +async def handle(msg, logger: Logger): + logger.info(msg) diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 89903528cc..351298038b 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -133,7 +133,7 @@ async def _consume(self) -> None: batch=self.pull_sub.batch_size, timeout=self.pull_sub.timeout, ) - await asyncio.gather(*(self.consume(m) for m in messages)) + await asyncio.gather(*map(self.consume, messages)) @staticmethod def get_routing_hash(subject: str) -> str: diff --git a/tests/docs/nats/js/test_pull_sub.py b/tests/docs/nats/js/test_pull_sub.py new file mode 100644 index 0000000000..635a805802 --- /dev/null +++ b/tests/docs/nats/js/test_pull_sub.py @@ -0,0 +1,13 @@ +import pytest + +from faststream.nats import TestApp, TestNatsBroker + + +@pytest.mark.asyncio +async def test_basic(): + from docs.docs_src.nats.js.pull_sub import app, broker, handle + + async with TestNatsBroker(broker): + async with TestApp(app): + await broker.publish("Hi!", "test") + handle.mock.assert_called_once_with("Hi!") From 3f1028ab952fba8cb79ebfe91a68da045f8e0ee9 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Tue, 7 Nov 2023 22:04:38 +0300 Subject: [PATCH 15/18] lint: useless Optional --- faststream/nats/handler.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 351298038b..af05ecb332 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -31,12 +31,11 @@ class LogicNatsHandler(AsyncHandler[Msg]): - subscription: Optional[ - Union[ - Subscription, - JetStreamContext.PushSubscription, - JetStreamContext.PullSubscription, - ] + subscription: Union[ + None, + Subscription, + JetStreamContext.PushSubscription, + JetStreamContext.PullSubscription, ] task: Optional["asyncio.Task[Any]"] = None From eacc65ddedef76eb63df59846f6072dfda29866f Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Tue, 7 Nov 2023 22:10:08 +0300 Subject: [PATCH 16/18] lint: ignore bandit --- faststream/nats/handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index af05ecb332..5302b1bb18 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -45,7 +45,7 @@ def __init__( log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]], queue: str = "", stream: Optional[JStream] = None, - pull_sub: Optional[PullSub] = None, + pull_sub: [PullSub] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, @@ -122,7 +122,7 @@ async def close(self) -> None: self.task = None async def _consume(self) -> None: - assert self.pull_sub + assert self.pull_sub # nosec B101 sub = cast(JetStreamContext.PullSubscription, self.subscription) From 48b04999e5784a5d034283a6cfe174d584ff0f20 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Tue, 7 Nov 2023 22:12:08 +0300 Subject: [PATCH 17/18] lint: fix typo --- faststream/nats/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/nats/handler.py b/faststream/nats/handler.py index 5302b1bb18..524272430c 100644 --- a/faststream/nats/handler.py +++ b/faststream/nats/handler.py @@ -45,7 +45,7 @@ def __init__( log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]], queue: str = "", stream: Optional[JStream] = None, - pull_sub: [PullSub] = None, + pull_sub: Optional[PullSub] = None, extra_options: Optional[AnyDict] = None, # AsyncAPI information description: Optional[str] = None, From 00dd957639822187fb2210326043ee7edde7df2a Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Thu, 9 Nov 2023 18:24:49 +0530 Subject: [PATCH 18/18] Update docs --- docs/docs/en/nats/jetstream/pull.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/docs/en/nats/jetstream/pull.md b/docs/docs/en/nats/jetstream/pull.md index 76fdb99f8f..df8fb04cb8 100644 --- a/docs/docs/en/nats/jetstream/pull.md +++ b/docs/docs/en/nats/jetstream/pull.md @@ -2,26 +2,26 @@ ## Overview -**NATS JetStream** supports two various way to consume messages: **Push** and [**Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers. +**NATS JetStream** supports two various way to consume messages: [**Push** and **Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers. -**Push** consumer is using by default to consume messages with the **FastStream**. It means, that **NATS** server delivers messages to your consumer as far as possible by itself. But also it means, that **NATS** should control all current consumer connections and increase server load. +The **Push** consumer is used by default to consume messages with the **FastStream**. It means that the **NATS** server delivers messages to your consumer as far as possible by itself. However, it also means that **NATS** should control all current consumer connections and increase server load. -Thus, **Pull** consumer is a recommended by *NATS TEAM* way to consume JetStream messages. Using it, you just asking **NATS** for a new messages with a some interval. It sounds a little bit worse than the automatic message delivery, but it provides you with a several advantages: +Thus, the **Pull** consumer is the recommended way to consume JetStream messages by the *NATS TEAM*. Using it, you simply ask **NATS** for new messages at some interval. It may sound a little less convenient than automatic message delivery, but it provides several advantages, such as: -* consumer scaling without *queue group* -* handle messages in batches -* reduce **NATS** server load +* Consumer scaling without a *queue group* +* Handling messages in batches +* Reducing **NATS** server load -So, if you want to consume a huge messages flow without strict time limitations, **Pull** consumer is the right choice for you. +So, if you want to consume a large flow of messages without strict time limitations, the **Pull** consumer is the right choice for you. -## FastStream details +## FastStream Details -**Pull** consumer is just a regular *Stream* consumer, but with the `pull_sub` argument, controls consuming messages batch size and block interval. +The **Pull** consumer is just a regular *Stream* consumer, but with the `pull_sub` argument, which controls consuming messages with batch size and block interval. ```python linenums="1" hl_lines="10-11" {!> docs_src/nats/js/pull_sub.py !} ``` -Batch size doesn't mean that your `msg` argument is a list of messages, but it means, that you consume up to `10` messages for one request to **NATS** and call your handler per each message in an `asyncio.gather` pool. +The batch size doesn't mean that your `msg` argument is a list of messages, but it means that you consume up to `10` messages for one request to **NATS** and call your handler for each message in an `asyncio.gather` pool. -So, your subject will be processed much faster, without blocking per message processing. But, if your subject has less than `10` messages, your reguest to **NATS** will be blocked for `timeout` (5 seconds by default) trying to collect required messages amount. So, you should choise `batch_size` and `timeout` accurately to optimize your consumer efficiency. +So, your subject will be processed much faster, without blocking for each message processing. However, if your subject has fewer than `10` messages, your request to **NATS** will be blocked for `timeout` (5 seconds by default) while trying to collect the required number of messages. Therefor, you should choose `batch_size` and `timeout` accurately to optimize your consumer efficiency.