diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index e71b34d952..92a04da868 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -1007,6 +1007,8 @@ search: - [to_async](api/faststream/utils/functions/to_async.md) - no_cast - [NoCast](api/faststream/utils/no_cast/NoCast.md) + - nuid + - [NUID](api/faststream/utils/nuid/NUID.md) - path - [compile_path](api/faststream/utils/path/compile_path.md) - Contributing diff --git a/docs/docs/en/api/faststream/utils/nuid/NUID.md b/docs/docs/en/api/faststream/utils/nuid/NUID.md new file mode 100644 index 0000000000..4e43844efe --- /dev/null +++ b/docs/docs/en/api/faststream/utils/nuid/NUID.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.utils.nuid.NUID diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index 5f08774651..a7b2a82702 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -1,7 +1,5 @@ import asyncio -from secrets import token_hex from typing import TYPE_CHECKING, Any, Dict, Optional -from uuid import uuid4 import nats from typing_extensions import override @@ -71,9 +69,7 @@ async def publish( # type: ignore[override] if reply_to: raise WRONG_PUBLISH_ARGS - token = client._nuid.next() - token.extend(token_hex(2).encode()) - reply_to = token.decode() + reply_to = client.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() sub = await client.subscribe(reply_to, future=future, max_msgs=1) @@ -148,8 +144,7 @@ async def publish( # type: ignore[override] if rpc: if reply_to: raise WRONG_PUBLISH_ARGS - - reply_to = str(uuid4()) + reply_to = self._connection._nc.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() sub = await self._connection._nc.subscribe( reply_to, future=future, max_msgs=1 diff --git a/faststream/redis/publisher/producer.py b/faststream/redis/publisher/producer.py index ce807aeab8..1fb31d07a1 100644 --- a/faststream/redis/publisher/producer.py +++ b/faststream/redis/publisher/producer.py @@ -1,5 +1,4 @@ from typing import TYPE_CHECKING, Any, Optional -from uuid import uuid4 from typing_extensions import override @@ -10,6 +9,7 @@ from faststream.redis.parser import RawMessage, RedisPubSubParser from faststream.redis.schemas import INCORRECT_SETUP_MSG from faststream.utils.functions import timeout_scope +from faststream.utils.nuid import NUID if TYPE_CHECKING: from redis.asyncio.client import PubSub, Redis @@ -67,8 +67,9 @@ async def publish( # type: ignore[override] if rpc: if reply_to: raise WRONG_PUBLISH_ARGS - - reply_to = str(uuid4()) + nuid = NUID() + rpc_nuid = str(nuid.next(), "utf-8") + reply_to = rpc_nuid psub = self._connection.pubsub() await psub.subscribe(reply_to) diff --git a/faststream/utils/nuid.py b/faststream/utils/nuid.py new file mode 100644 index 0000000000..d804d1a19c --- /dev/null +++ b/faststream/utils/nuid.py @@ -0,0 +1,68 @@ +# Copyright 2016-2018 The NATS Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from random import Random +from secrets import randbelow, token_bytes +from sys import maxsize as max_int + +DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +BASE = 62 +PREFIX_LENGTH = 12 +SEQ_LENGTH = 10 +MAX_SEQ = 839299365868340224 # BASE**10 +MIN_INC = 33 +MAX_INC = 333 +INC = MAX_INC - MIN_INC +TOTAL_LENGTH = PREFIX_LENGTH + SEQ_LENGTH + + +class NUID: + """NUID created is a utility to create a new id. + + NUID is an implementation of the approach for fast generation + of unique identifiers used for inboxes in NATS. + """ + + def __init__(self) -> None: + self._prand = Random(randbelow(max_int)) # nosec B311 + self._seq = self._prand.randint(0, MAX_SEQ) + self._inc = MIN_INC + self._prand.randint(BASE + 1, INC) + self._prefix = bytearray() + self.randomize_prefix() + + def next(self) -> bytearray: + """Next returns the next unique identifier.""" + self._seq += self._inc + if self._seq >= MAX_SEQ: + self.randomize_prefix() + self.reset_sequential() + + l_seq = self._seq + prefix = self._prefix[:] + suffix = bytearray(SEQ_LENGTH) + for i in reversed(range(SEQ_LENGTH)): + suffix[i] = DIGITS[int(l_seq) % BASE] + l_seq //= BASE + + prefix.extend(suffix) + return prefix + + def randomize_prefix(self) -> None: + random_bytes = token_bytes(PREFIX_LENGTH) + self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes) + + def reset_sequential(self) -> None: + self._seq = self._prand.randint(0, MAX_SEQ) + self._inc = MIN_INC + self._prand.randint(0, INC) diff --git a/tests/brokers/nats/test_test_client.py b/tests/brokers/nats/test_test_client.py index 9718b558b6..94a4be5dac 100644 --- a/tests/brokers/nats/test_test_client.py +++ b/tests/brokers/nats/test_test_client.py @@ -80,6 +80,18 @@ def subscriber(m): assert event.is_set() + @pytest.mark.nats() + async def test_inbox_prefix_with_real( + self, + queue: str, + ): + broker = NatsBroker(inbox_prefix="test") + + async with TestNatsBroker(broker, with_real=True) as br: + assert br._connection._inbox_prefix == b"test" + assert "test" in str(br._connection.new_inbox()) + + async def test_respect_middleware(self, queue): routes = []