Skip to content

Commit

Permalink
feat: NATS polling subscriber (#912)
Browse files Browse the repository at this point in the history
* draft for pull_subscribe

* task for fetch messages

* handle messages with `asyncio.gather`

* object and kv storage watch

* fix: correct NATS pull subscriber

* fix: correct `PullSub` stub file

* fix: missed Optional in NATS `PullSub` stub file

* fix: process NATS pull timeout

* test: add PullSub tests

* refactore: remove watch subscriber

* chore: bump version

* lint: fix mypy

* docs: add NATS Pull page

* docs: fill NATS PULL consumer page

* lint: useless Optional

* lint: ignore bandit

* lint: fix typo

* Update docs

---------

Co-authored-by: Nikita Pastukhov <[email protected]>
Co-authored-by: Davor Runje <[email protected]>
Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
4 people authored Nov 9, 2023
1 parent 3bcf2d6 commit 427abf1
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
- [Direct](nats/examples/direct.md)
- [Pattern](nats/examples/pattern.md)
- [JetStream](nats/jetstream/index.md)
- [Pull Subscriber](nats/jetstream/pull.md)
- [Key-Value Storage](nats/jetstream/key-value.md)
- [Object Storage](nats/jetstream/object.md)
- [Acknowledgement](nats/jetstream/ack.md)
Expand Down
27 changes: 27 additions & 0 deletions docs/docs/en/nats/jetstream/pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Pull Subscriber

## Overview

**NATS JetStream** supports two various way to consume messages: [**Push** and **Pull**](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#push-and-pull-consumers){.external-link targer="_blank} consumers.

The **Push** consumer is used by default to consume messages with the **FastStream**. It means that the **NATS** server delivers messages to your consumer as far as possible by itself. However, it also means that **NATS** should control all current consumer connections and increase server load.

Thus, the **Pull** consumer is the recommended way to consume JetStream messages by the *NATS TEAM*. Using it, you simply ask **NATS** for new messages at some interval. It may sound a little less convenient than automatic message delivery, but it provides several advantages, such as:

* Consumer scaling without a *queue group*
* Handling messages in batches
* Reducing **NATS** server load

So, if you want to consume a large flow of messages without strict time limitations, the **Pull** consumer is the right choice for you.

## FastStream Details

The **Pull** consumer is just a regular *Stream* consumer, but with the `pull_sub` argument, which controls consuming messages with batch size and block interval.

```python linenums="1" hl_lines="10-11"
{!> docs_src/nats/js/pull_sub.py !}
```

The batch size doesn't mean that your `msg` argument is a list of messages, but it means that you consume up to `10` messages for one request to **NATS** and call your handler for each message in an `asyncio.gather` pool.

So, your subject will be processed much faster, without blocking for each message processing. However, if your subject has fewer than `10` messages, your request to **NATS** will be blocked for `timeout` (5 seconds by default) while trying to collect the required number of messages. Therefor, you should choose `batch_size` and `timeout` accurately to optimize your consumer efficiency.
1 change: 1 addition & 0 deletions docs/docs/summary_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
- [Direct](nats/examples/direct.md)
- [Pattern](nats/examples/pattern.md)
- [JetStream](nats/jetstream/index.md)
- [Pull Subscriber](nats/jetstream/pull.md)
- [Key-Value Storage](nats/jetstream/key-value.md)
- [Object Storage](nats/jetstream/object.md)
- [Acknowledgement](nats/jetstream/ack.md)
Expand Down
14 changes: 14 additions & 0 deletions docs/docs_src/nats/js/pull_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faststream import FastStream, Logger
from faststream.nats import NatsBroker, PullSub

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber(
subject="test",
stream="stream",
pull_sub=PullSub(batch_size=10),
)
async def handle(msg, logger: Logger):
logger.info(msg)
14 changes: 14 additions & 0 deletions examples/nats/e09_pull_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faststream import FastStream, Logger
from faststream.nats import NatsBroker, PullSub

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber(
subject="test",
stream="stream",
pull_sub=PullSub(batch_size=10),
)
async def handle(msg, logger: Logger):
logger.info(msg)
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices"""
__version__ = "0.2.11"
__version__ = "0.2.12"


INSTALL_YAML = """
Expand Down
2 changes: 2 additions & 0 deletions faststream/nats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from faststream.nats.annotations import NatsMessage
from faststream.nats.broker import NatsBroker
from faststream.nats.js_stream import JStream
from faststream.nats.pull_sub import PullSub
from faststream.nats.router import NatsRouter
from faststream.nats.shared.router import NatsRoute
from faststream.nats.test import TestNatsBroker
Expand All @@ -29,6 +30,7 @@
"NatsRouter",
"NatsRoute",
"JStream",
"PullSub",
# Nats imports
"ConsumerConfig",
"DeliverPolicy",
Expand Down
30 changes: 24 additions & 6 deletions faststream/nats/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from faststream.nats.js_stream import JStream
from faststream.nats.message import NatsMessage
from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer
from faststream.nats.pull_sub import PullSub
from faststream.nats.shared.logging import NatsLoggingMixin
from faststream.types import AnyDict, DecodedMessage
from faststream.utils.context.main import context
Expand Down Expand Up @@ -268,6 +269,9 @@ def subscriber( # type: ignore[override]
flow_control: bool = False,
deliver_policy: Optional[api.DeliverPolicy] = None,
headers_only: Optional[bool] = None,
# pull arguments
pull_sub: Optional[PullSub] = None,
inbox_prefix: bytes = api.INBOX_PREFIX,
# custom
ack_first: bool = False,
stream: Union[str, JStream, None] = None,
Expand All @@ -287,6 +291,9 @@ def subscriber( # type: ignore[override]
]:
stream = stream_builder.stream(stream)

if pull_sub is not None and stream is None:
raise ValueError("Pull subscriber can be used only with a stream")

self._setup_log_context(
queue=queue,
subject=subject,
Expand Down Expand Up @@ -315,14 +322,24 @@ def subscriber( # type: ignore[override]
"durable": durable,
"stream": stream.name,
"config": config,
"ordered_consumer": ordered_consumer,
"idle_heartbeat": idle_heartbeat,
"flow_control": flow_control,
"deliver_policy": deliver_policy,
"headers_only": headers_only,
"manual_ack": not ack_first,
}
)

if pull_sub is not None:
extra_options.update({"inbox_prefix": inbox_prefix})

else:
extra_options.update(
{
"ordered_consumer": ordered_consumer,
"idle_heartbeat": idle_heartbeat,
"flow_control": flow_control,
"deliver_policy": deliver_policy,
"headers_only": headers_only,
"manual_ack": not ack_first,
}
)

else:
extra_options.update(
{
Expand All @@ -337,6 +354,7 @@ def subscriber( # type: ignore[override]
subject=subject,
queue=queue,
stream=stream,
pull_sub=pull_sub,
extra_options=extra_options,
title=title,
description=description,
Expand Down
4 changes: 4 additions & 0 deletions faststream/nats/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ from faststream.nats.asyncapi import Handler, Publisher
from faststream.nats.js_stream import JStream
from faststream.nats.message import NatsMessage
from faststream.nats.producer import NatsFastProducer, NatsJSFastProducer
from faststream.nats.pull_sub import PullSub
from faststream.nats.shared.logging import NatsLoggingMixin
from faststream.types import DecodedMessage, SendableMessage

Expand Down Expand Up @@ -234,6 +235,9 @@ class NatsBroker(
flow_control: bool = False,
deliver_policy: Optional[api.DeliverPolicy] = None,
headers_only: Optional[bool] = None,
# pull arguments
pull_sub: Optional[PullSub] = None,
inbox_prefix: bytes = api.INBOX_PREFIX,
# broker arguments
dependencies: Sequence[Depends] = (),
parser: Optional[CustomParser[Msg, NatsMessage]] = None,
Expand Down
4 changes: 4 additions & 0 deletions faststream/nats/fastapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ from faststream.nats.asyncapi import Publisher
from faststream.nats.broker import NatsBroker
from faststream.nats.js_stream import JStream
from faststream.nats.message import NatsMessage
from faststream.nats.pull_sub import PullSub

class NatsRouter(StreamRouter[Msg]):
broker_class = NatsBroker
Expand Down Expand Up @@ -197,6 +198,9 @@ class NatsRouter(StreamRouter[Msg]):
flow_control: bool = False,
deliver_policy: Optional[api.DeliverPolicy] = None,
headers_only: Optional[bool] = None,
# pull arguments
pull_sub: Optional[PullSub] = None,
inbox_prefix: bytes = api.INBOX_PREFIX,
# broker arguments
dependencies: Sequence[Depends] = (),
parser: Optional[CustomParser[Msg, NatsMessage]] = None,
Expand Down
59 changes: 51 additions & 8 deletions faststream/nats/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any, Callable, Dict, Optional, Sequence, Union
import asyncio
from contextlib import suppress
from typing import Any, Callable, Dict, Optional, Sequence, Union, cast

from fast_depends.core import CallModel
from nats.aio.client import Client
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
from nats.errors import TimeoutError
from nats.js import JetStreamContext

from faststream._compat import override
Expand All @@ -22,19 +25,27 @@
from faststream.nats.js_stream import JStream
from faststream.nats.message import NatsMessage
from faststream.nats.parser import JsParser, Parser
from faststream.nats.pull_sub import PullSub
from faststream.types import AnyDict
from faststream.utils.context.path import compile_path


class LogicNatsHandler(AsyncHandler[Msg]):
subscription: Optional[Union[Subscription, JetStreamContext.PushSubscription]]
subscription: Union[
None,
Subscription,
JetStreamContext.PushSubscription,
JetStreamContext.PullSubscription,
]
task: Optional["asyncio.Task[Any]"] = None

def __init__(
self,
subject: str,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
queue: str = "",
stream: Optional[JStream] = None,
pull_sub: Optional[PullSub] = None,
extra_options: Optional[AnyDict] = None,
# AsyncAPI information
description: Optional[str] = None,
Expand All @@ -47,6 +58,7 @@ def __init__(
self.queue = queue

self.stream = stream
self.pull_sub = pull_sub
self.extra_options = extra_options or {}

super().__init__(
Expand All @@ -55,6 +67,7 @@ def __init__(
title=title,
)

self.task = None
self.subscription = None

def add_call(
Expand All @@ -79,18 +92,48 @@ def add_call(

@override
async def start(self, connection: Union[Client, JetStreamContext]) -> None: # type: ignore[override]
self.subscription = await connection.subscribe(
subject=self.subject,
queue=self.queue,
cb=self.consume, # type: ignore[arg-type]
**self.extra_options,
)
if self.pull_sub is not None:
connection = cast(JetStreamContext, connection)

if self.stream is None:
raise ValueError("Pull subscriber can be used only with a stream")

self.subscription = await connection.pull_subscribe(
subject=self.subject,
**self.extra_options,
)
self.task = asyncio.create_task(self._consume())

else:
self.subscription = await connection.subscribe(
subject=self.subject,
queue=self.queue,
cb=self.consume, # type: ignore[arg-type]
**self.extra_options,
)

async def close(self) -> None:
if self.subscription is not None:
await self.subscription.unsubscribe()
self.subscription = None

if self.task is not None:
self.task.cancel()
self.task = None

async def _consume(self) -> None:
assert self.pull_sub # nosec B101

sub = cast(JetStreamContext.PullSubscription, self.subscription)

while self.subscription is not None:
with suppress(TimeoutError):
messages = await sub.fetch(
batch=self.pull_sub.batch_size,
timeout=self.pull_sub.timeout,
)
await asyncio.gather(*map(self.consume, messages))

@staticmethod
def get_routing_hash(subject: str) -> str:
return subject
18 changes: 18 additions & 0 deletions faststream/nats/pull_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional

from pydantic import BaseModel, Field


class PullSub(BaseModel):
batch_size: int = Field(default=1)
timeout: Optional[float] = Field(default=5.0)

def __init__(
self,
batch_size: int = 1,
timeout: Optional[float] = 5.0,
) -> None:
super().__init__(
batch_size=batch_size,
timeout=timeout,
)
4 changes: 4 additions & 0 deletions faststream/nats/router.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ from faststream.broker.wrapper import HandlerCallWrapper
from faststream.nats.asyncapi import Publisher
from faststream.nats.js_stream import JStream
from faststream.nats.message import NatsMessage
from faststream.nats.pull_sub import PullSub
from faststream.nats.shared.router import NatsRoute
from faststream.nats.shared.router import NatsRouter as BaseRouter

Expand Down Expand Up @@ -73,6 +74,9 @@ class NatsRouter(BaseRouter):
flow_control: bool = False,
deliver_policy: Optional[api.DeliverPolicy] = None,
headers_only: Optional[bool] = None,
# pull arguments
pull_sub: Optional[PullSub] = None,
inbox_prefix: bytes = api.INBOX_PREFIX,
# broker arguments
dependencies: Sequence[Depends] = (),
parser: Optional[CustomParser[Msg, NatsMessage]] = None,
Expand Down
29 changes: 28 additions & 1 deletion tests/brokers/nats/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from nats.aio.msg import Msg

from faststream.exceptions import AckMessage
from faststream.nats import JStream, NatsBroker
from faststream.nats import JStream, NatsBroker, PullSub
from faststream.nats.annotations import NatsMessage
from tests.brokers.base.consume import BrokerRealConsumeTestcase
from tests.tools import spy_decorator
Expand Down Expand Up @@ -37,6 +37,33 @@ def subscriber(m):

assert event.is_set()

async def test_consume_pull(
self,
queue: str,
consume_broker: NatsBroker,
stream: JStream,
event: asyncio.Event,
mock,
):
@consume_broker.subscriber(queue, stream=stream, pull_sub=PullSub(1))
def subscriber(m):
mock(m)
event.set()

await consume_broker.start()
await asyncio.wait(
(
asyncio.create_task(
consume_broker.publish("hello", queue, stream=stream.name)
),
asyncio.create_task(event.wait()),
),
timeout=3,
)

assert event.is_set()
mock.assert_called_once_with("hello")

@pytest.mark.asyncio
async def test_consume_ack(
self,
Expand Down
Loading

0 comments on commit 427abf1

Please sign in to comment.