From 058e8486eb157d115485b4daf4b28479dfd5c2b0 Mon Sep 17 00:00:00 2001 From: pepellsd Date: Sat, 21 Dec 2024 19:59:25 +0300 Subject: [PATCH] add type annotations for RabbitQueue and enum for queue type --- faststream/rabbit/schemas/queue.py | 215 ++++++++++++++++++++++------- 1 file changed, 165 insertions(+), 50 deletions(-) diff --git a/faststream/rabbit/schemas/queue.py b/faststream/rabbit/schemas/queue.py index 1eadaf1e24..86aca7d35e 100644 --- a/faststream/rabbit/schemas/queue.py +++ b/faststream/rabbit/schemas/queue.py @@ -1,7 +1,6 @@ from copy import deepcopy -from typing import TYPE_CHECKING, Optional - -from typing_extensions import Annotated, Doc +from enum import Enum +from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload from faststream.broker.schemas import NameRequired from faststream.utils.path import compile_path @@ -12,6 +11,85 @@ from faststream.types import AnyDict +class RabbitQueueType(Enum): + Classic = "classic" + Quorum = "quorum" + Stream = "stream" + + +CommonQueueArgs = TypedDict( + "CommonQueueArgs", + { + "x-queue-leader-locator": Literal["client-local", "balanced"], + "x-max-length-bytes": int, + }, + total=True, +) + +SharedQueueClassicAndQuorumArgs = TypedDict( + "SharedQueueClassicAndQuorumArgs", + { + "x-expires": int, + "x-message-ttl": int, + "x-single-active-consumer": bool, + "x-dead-letter-exchange": str, + "x-dead-letter-routing-key": str, + "x-max-length": int, + }, + total=True, +) + + +QueueClassicTypeSpecificArgs = TypedDict( + "QueueClassicTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"], + "x-max-priority": int, + }, + total=True, +) + +QueueQuorumTypeSpecificArgs = TypedDict( + "QueueQuorumTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish"], + "x-delivery-limit": int, + "x-quorum-initial-group-size": int, + "x-quorum-target-group-size": int, + "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], + }, + total=True, +) + + +QueueStreamTypeSpecificArgs = TypedDict( + "QueueStreamTypeSpecificArgs", + { + "x-max-age": str, + "x-stream-max-segment-size-bytes": int, + "x-stream-filter-size-bytes": int, + "x-initial-cluster-size": int, + }, + total=True, +) + + +class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs): + pass + + +class ClassicQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs +): + pass + + +class QuorumQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs +): + pass + + class RabbitQueue(NameRequired): """A class to represent a RabbitMQ queue. @@ -49,61 +127,98 @@ def routing(self) -> str: """Return real routing_key of object.""" return self.routing_key or self.name + @overload def __init__( self, - name: Annotated[ - str, - Doc("RabbitMQ queue name."), - ], - durable: Annotated[ - bool, - Doc("Whether the object is durable."), - ] = False, - exclusive: Annotated[ - bool, - Doc( - "The queue can be used only in the current connection " - "and will be deleted after connection closed." - ), - ] = False, - passive: Annotated[ - bool, - Doc("Do not create queue automatically."), - ] = False, - auto_delete: Annotated[ - bool, - Doc("The queue will be deleted after connection closed."), - ] = False, - arguments: Annotated[ - Optional["AnyDict"], - Doc( - "Queue declarationg arguments. " - "You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments" - ), - ] = None, - timeout: Annotated[ - "TimeoutType", - Doc("Send confirmation time from RabbitMQ."), - ] = None, - robust: Annotated[ - bool, - Doc("Whether to declare queue object as restorable."), - ] = True, - bind_arguments: Annotated[ - Optional["AnyDict"], - Doc("Queue-exchange binding options."), + name: str, + queue_type: Literal[RabbitQueueType.Classic], + durable: Literal[True, False] = False, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[ClassicQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[RabbitQueueType.Quorum], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[QuorumQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[RabbitQueueType.Stream], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[StreamQueueArgs] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + def __init__( + self, + name: str, + queue_type: Literal[ + RabbitQueueType.Quorum, RabbitQueueType.Classic, RabbitQueueType.Stream + ] = RabbitQueueType.Classic, + durable: Literal[True, False] = False, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional[ + Union[QuorumQueueArgs, ClassicQueueArgs, StreamQueueArgs] ] = None, - routing_key: Annotated[ - str, - Doc("Explicit binding routing key. Uses `name` if not presented."), - ] = "", + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", ) -> None: + """Initialize the RabbitMQ queue. + + :param name: RabbitMQ queue name. + :param durable: Whether the object is durable. + :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. + :param passive: Do not create queue automatically. + :param auto_delete: The queue will be deleted after connection closed. + :param arguments: Queue declaration arguments. + You can find information about them in the official RabbitMQ documentation: + https://www.rabbitmq.com/docs/queues#optional-arguments + :param timeout: Send confirmation time from RabbitMQ. + :param robust: Whether to declare queue object as restorable. + :param bind_arguments: Queue-exchange binding options. + :param routing_key: Explicit binding routing key. Uses name if not presented. + """ re, routing_key = compile_path( routing_key, replace_symbol="*", - patch_regex=lambda x: x.replace(r"\#", ".+"), + patch_regex=lambda x: x.replace(r"#", ".+"), ) + _arguments = {"x-queue-type": queue_type.value} + + if arguments: + _arguments.update(**arguments) + super().__init__(name) self.path_regex = re @@ -114,7 +229,7 @@ def __init__( self.robust = robust self.passive = passive self.auto_delete = auto_delete - self.arguments = arguments + self.arguments = _arguments self.timeout = timeout def add_prefix(self, prefix: str) -> "RabbitQueue":