Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Issue 1386, Add rpc_prefix #1484

Merged
merged 13 commits into from
Jun 17, 2024
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,8 @@ search:
- [to_async](api/faststream/utils/functions/to_async.md)
- no_cast
- [NoCast](api/faststream/utils/no_cast/NoCast.md)
- nuid
- [NUID](api/faststream/utils/nuid/NUID.md)
- path
- [compile_path](api/faststream/utils/path/compile_path.md)
- Contributing
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/utils/nuid/NUID.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.utils.nuid.NUID
9 changes: 2 additions & 7 deletions faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
from secrets import token_hex
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import uuid4

import nats
from typing_extensions import override
Expand Down Expand Up @@ -71,9 +69,7 @@ async def publish( # type: ignore[override]
if reply_to:
raise WRONG_PUBLISH_ARGS

token = client._nuid.next()
token.extend(token_hex(2).encode())
reply_to = token.decode()
reply_to = client.new_inbox()

future: asyncio.Future[Msg] = asyncio.Future()
sub = await client.subscribe(reply_to, future=future, max_msgs=1)
Expand Down Expand Up @@ -148,8 +144,7 @@ async def publish( # type: ignore[override]
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS

reply_to = str(uuid4())
reply_to = self._connection._nc.new_inbox()
future: asyncio.Future[Msg] = asyncio.Future()
sub = await self._connection._nc.subscribe(
reply_to, future=future, max_msgs=1
Expand Down
7 changes: 4 additions & 3 deletions faststream/redis/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional
from uuid import uuid4

from typing_extensions import override

Expand All @@ -10,6 +9,7 @@
from faststream.redis.parser import RawMessage, RedisPubSubParser
from faststream.redis.schemas import INCORRECT_SETUP_MSG
from faststream.utils.functions import timeout_scope
from faststream.utils.nuid import NUID

if TYPE_CHECKING:
from redis.asyncio.client import PubSub, Redis
Expand Down Expand Up @@ -67,8 +67,9 @@ async def publish( # type: ignore[override]
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS

reply_to = str(uuid4())
nuid = NUID()
rpc_nuid = str(nuid.next(), "utf-8")
reply_to = rpc_nuid
psub = self._connection.pubsub()
await psub.subscribe(reply_to)

Expand Down
68 changes: 68 additions & 0 deletions faststream/utils/nuid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2016-2018 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from random import Random
from secrets import randbelow, token_bytes
from sys import maxsize as max_int

DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
BASE = 62
PREFIX_LENGTH = 12
SEQ_LENGTH = 10
MAX_SEQ = 839299365868340224 # BASE**10
MIN_INC = 33
MAX_INC = 333
INC = MAX_INC - MIN_INC
TOTAL_LENGTH = PREFIX_LENGTH + SEQ_LENGTH


class NUID:
"""NUID created is a utility to create a new id.

NUID is an implementation of the approach for fast generation
of unique identifiers used for inboxes in NATS.
"""

def __init__(self) -> None:
self._prand = Random(randbelow(max_int)) # nosec B311
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
self._prefix = bytearray()
self.randomize_prefix()

def next(self) -> bytearray:
"""Next returns the next unique identifier."""
self._seq += self._inc
if self._seq >= MAX_SEQ:
self.randomize_prefix()
self.reset_sequential()

l_seq = self._seq
prefix = self._prefix[:]
suffix = bytearray(SEQ_LENGTH)
for i in reversed(range(SEQ_LENGTH)):
suffix[i] = DIGITS[int(l_seq) % BASE]
l_seq //= BASE

prefix.extend(suffix)
return prefix

def randomize_prefix(self) -> None:
random_bytes = token_bytes(PREFIX_LENGTH)
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)

def reset_sequential(self) -> None:
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(0, INC)
12 changes: 12 additions & 0 deletions tests/brokers/nats/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def subscriber(m):

assert event.is_set()

@pytest.mark.nats()
async def test_inbox_prefix_with_real(
self,
queue: str,
):
broker = NatsBroker(inbox_prefix="test")

async with TestNatsBroker(broker, with_real=True) as br:
assert br._connection._inbox_prefix == b"test"
assert "test" in str(br._connection.new_inbox())


async def test_respect_middleware(self, queue):
routes = []

Expand Down
Loading