diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index a8a2947235..af9d0f7404 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -934,6 +934,7 @@ search: - [ExchangeType](api/faststream/rabbit/schemas/ExchangeType.md) - [RabbitExchange](api/faststream/rabbit/schemas/RabbitExchange.md) - [RabbitQueue](api/faststream/rabbit/schemas/RabbitQueue.md) + - [RabbitQueueType](api/faststream/rabbit/schemas/RabbitQueueType.md) - [ReplyConfig](api/faststream/rabbit/schemas/ReplyConfig.md) - constants - [ExchangeType](api/faststream/rabbit/schemas/constants/ExchangeType.md) @@ -942,7 +943,16 @@ search: - proto - [BaseRMQInformation](api/faststream/rabbit/schemas/proto/BaseRMQInformation.md) - queue + - [ClassicQueueArgs](api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md) + - [CommonQueueArgs](api/faststream/rabbit/schemas/queue/CommonQueueArgs.md) + - [QueueClassicTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md) + - [QueueQuorumTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md) + - [QueueStreamTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md) + - [QuorumQueueArgs](api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md) - [RabbitQueue](api/faststream/rabbit/schemas/queue/RabbitQueue.md) + - [RabbitQueueType](api/faststream/rabbit/schemas/queue/RabbitQueueType.md) + - [SharedQueueClassicAndQuorumArgs](api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md) + - [StreamQueueArgs](api/faststream/rabbit/schemas/queue/StreamQueueArgs.md) - reply - [ReplyConfig](api/faststream/rabbit/schemas/reply/ReplyConfig.md) - security diff --git a/docs/docs/en/api/faststream/rabbit/schemas/RabbitQueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/RabbitQueueType.md new file mode 100644 index 0000000000..a5e356b5d5 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/RabbitQueueType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.RabbitQueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md new file mode 100644 index 0000000000..48ddb0825a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.ClassicQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md new file mode 100644 index 0000000000..fe6c0f6768 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.CommonQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md new file mode 100644 index 0000000000..6c8a62838a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueClassicTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md new file mode 100644 index 0000000000..3b4bd645f5 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueQuorumTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md new file mode 100644 index 0000000000..1e3694caa3 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueStreamTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md new file mode 100644 index 0000000000..4db347081a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QuorumQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/RabbitQueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/RabbitQueueType.md new file mode 100644 index 0000000000..5b2476199c --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/RabbitQueueType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.RabbitQueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md new file mode 100644 index 0000000000..438ba0143f --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.SharedQueueClassicAndQuorumArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md new file mode 100644 index 0000000000..12c12bac47 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.StreamQueueArgs diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 2e650372d2..393dbfd1e7 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -27,6 +27,7 @@ RABBIT_REPLY, RabbitExchange, RabbitQueue, + RabbitQueueType, ) from faststream.rabbit.security import parse_security from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber @@ -475,7 +476,7 @@ async def _connect( # type: ignore[override] if max_consumers: c = AsyncAPISubscriber.build_log_context( None, - RabbitQueue(""), + RabbitQueue("", queue_type=RabbitQueueType.Classic), RabbitExchange(""), ) self._log(f"Set max consumers to {max_consumers}", extra=c) diff --git a/faststream/rabbit/schemas/__init__.py b/faststream/rabbit/schemas/__init__.py index 2742cf2a68..57fdb923ab 100644 --- a/faststream/rabbit/schemas/__init__.py +++ b/faststream/rabbit/schemas/__init__.py @@ -1,7 +1,7 @@ from faststream.rabbit.schemas.constants import ExchangeType from faststream.rabbit.schemas.exchange import RabbitExchange from faststream.rabbit.schemas.proto import BaseRMQInformation -from faststream.rabbit.schemas.queue import RabbitQueue +from faststream.rabbit.schemas.queue import RabbitQueue, RabbitQueueType from faststream.rabbit.schemas.reply import ReplyConfig __all__ = ( @@ -10,7 +10,10 @@ "ExchangeType", "RabbitExchange", "RabbitQueue", + "RabbitQueueType", "ReplyConfig", ) -RABBIT_REPLY = RabbitQueue("amq.rabbitmq.reply-to", passive=True) +RABBIT_REPLY = RabbitQueue( + "amq.rabbitmq.reply-to", passive=True, queue_type=RabbitQueueType.Classic +) diff --git a/faststream/rabbit/schemas/queue.py b/faststream/rabbit/schemas/queue.py index 1eadaf1e24..86aca7d35e 100644 --- a/faststream/rabbit/schemas/queue.py +++ b/faststream/rabbit/schemas/queue.py @@ -1,7 +1,6 @@ from copy import deepcopy -from typing import TYPE_CHECKING, Optional - -from typing_extensions import Annotated, Doc +from enum import Enum +from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload from faststream.broker.schemas import NameRequired from faststream.utils.path import compile_path @@ -12,6 +11,85 @@ from faststream.types import AnyDict +class RabbitQueueType(Enum): + Classic = "classic" + Quorum = "quorum" + Stream = "stream" + + +CommonQueueArgs = TypedDict( + "CommonQueueArgs", + { + "x-queue-leader-locator": Literal["client-local", "balanced"], + "x-max-length-bytes": int, + }, + total=True, +) + +SharedQueueClassicAndQuorumArgs = TypedDict( + "SharedQueueClassicAndQuorumArgs", + { + "x-expires": int, + "x-message-ttl": int, + "x-single-active-consumer": bool, + "x-dead-letter-exchange": str, + "x-dead-letter-routing-key": str, + "x-max-length": int, + }, + total=True, +) + + +QueueClassicTypeSpecificArgs = TypedDict( + "QueueClassicTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"], + "x-max-priority": int, + }, + total=True, +) + +QueueQuorumTypeSpecificArgs = TypedDict( + "QueueQuorumTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish"], + "x-delivery-limit": int, + "x-quorum-initial-group-size": int, + "x-quorum-target-group-size": int, + "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], + }, + total=True, +) + + +QueueStreamTypeSpecificArgs = TypedDict( + "QueueStreamTypeSpecificArgs", + { + "x-max-age": str, + "x-stream-max-segment-size-bytes": int, + "x-stream-filter-size-bytes": int, + "x-initial-cluster-size": int, + }, + total=True, +) + + +class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs): + pass + + +class ClassicQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs +): + pass + + +class QuorumQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs +): + pass + + class RabbitQueue(NameRequired): """A class to represent a RabbitMQ queue. @@ -49,61 +127,98 @@ def routing(self) -> str: """Return real routing_key of object.""" return self.routing_key or self.name + @overload def __init__( self, - name: Annotated[ - str, - Doc("RabbitMQ queue name."), - ], - durable: Annotated[ - bool, - Doc("Whether the object is durable."), - ] = False, - exclusive: Annotated[ - bool, - Doc( - "The queue can be used only in the current connection " - "and will be deleted after connection closed." - ), - ] = False, - passive: Annotated[ - bool, - Doc("Do not create queue automatically."), - ] = False, - auto_delete: Annotated[ - bool, - Doc("The queue will be deleted after connection closed."), - ] = False, - arguments: Annotated[ - Optional["AnyDict"], - Doc( - "Queue declarationg arguments. " - "You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments" - ), - ] = None, - timeout: Annotated[ - "TimeoutType", - Doc("Send confirmation time from RabbitMQ."), - ] = None, - robust: Annotated[ - bool, - Doc("Whether to declare queue object as restorable."), - ] = True, - bind_arguments: Annotated[ - Optional["AnyDict"], - Doc("Queue-exchange binding options."), + name: str, + queue_type: Literal[RabbitQueueType.Classic], + durable: Literal[True, False] = False, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[ClassicQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[RabbitQueueType.Quorum], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[QuorumQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[RabbitQueueType.Stream], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[StreamQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + def __init__( + self, + name: str, + queue_type: Literal[ + RabbitQueueType.Quorum, RabbitQueueType.Classic, RabbitQueueType.Stream + ] = RabbitQueueType.Classic, + durable: Literal[True, False] = False, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[ + Union[QuorumQueueArgs, ClassicQueueArgs, StreamQueueArgs] ] = None, - routing_key: Annotated[ - str, - Doc("Explicit binding routing key. Uses `name` if not presented."), - ] = "", + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", ) -> None: + """Initialize the RabbitMQ queue. + + :param name: RabbitMQ queue name. + :param durable: Whether the object is durable. + :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. + :param passive: Do not create queue automatically. + :param auto_delete: The queue will be deleted after connection closed. + :param arguments: Queue declaration arguments. + You can find information about them in the official RabbitMQ documentation: + https://www.rabbitmq.com/docs/queues#optional-arguments + :param timeout: Send confirmation time from RabbitMQ. + :param robust: Whether to declare queue object as restorable. + :param bind_arguments: Queue-exchange binding options. + :param routing_key: Explicit binding routing key. Uses name if not presented. + """ re, routing_key = compile_path( routing_key, replace_symbol="*", - patch_regex=lambda x: x.replace(r"\#", ".+"), + patch_regex=lambda x: x.replace(r"#", ".+"), ) + _arguments = {"x-queue-type": queue_type.value} + + if arguments: + _arguments.update(**arguments) + super().__init__(name) self.path_regex = re @@ -114,7 +229,7 @@ def __init__( self.robust = robust self.passive = passive self.auto_delete = auto_delete - self.arguments = arguments + self.arguments = _arguments self.timeout = timeout def add_prefix(self, prefix: str) -> "RabbitQueue":