From 82f5afad5b97659729f8d948b4ac8b1967b93aad Mon Sep 17 00:00:00 2001 From: Serafim Dyachenko <75319000+pepellsd@users.noreply.github.com> Date: Tue, 24 Dec 2024 00:09:55 +0300 Subject: [PATCH] refactor: add type annotations for RabbitQueue and enum for queue type (#2002) * add type annotations for RabbitQueue and enum for queue type * add default for queue type * minor fixes and docs * chore: polish types * feat: add SetupError in incorrect RabbitQueue params case * fix: correct not-empty durable processing --------- Co-authored-by: Pastukhov Nikita Co-authored-by: Nikita Pastukhov --- docs/docs/SUMMARY.md | 12 + .../en/api/faststream/rabbit/QueueType.md | 11 + .../faststream/rabbit/schemas/QueueType.md | 11 + .../rabbit/schemas/queue/ClassicQueueArgs.md | 11 + .../rabbit/schemas/queue/CommonQueueArgs.md | 11 + .../queue/QueueClassicTypeSpecificArgs.md | 11 + .../queue/QueueQuorumTypeSpecificArgs.md | 11 + .../queue/QueueStreamTypeSpecificArgs.md | 11 + .../rabbit/schemas/queue/QueueType.md | 11 + .../rabbit/schemas/queue/QuorumQueueArgs.md | 11 + .../queue/SharedQueueClassicAndQuorumArgs.md | 11 + .../rabbit/schemas/queue/StreamQueueArgs.md | 11 + docs/docs_src/rabbit/subscription/stream.py | 6 +- faststream/rabbit/__init__.py | 2 + faststream/rabbit/schemas/__init__.py | 3 +- faststream/rabbit/schemas/queue.py | 223 ++++++++++++++---- 16 files changed, 313 insertions(+), 54 deletions(-) create mode 100644 docs/docs/en/api/faststream/rabbit/QueueType.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/QueueType.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md create mode 100644 docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index a8a2947235..b41d3fa972 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -186,6 +186,7 @@ search: - [TelemetrySettingsProvider](public_api/faststream/opentelemetry/TelemetrySettingsProvider.md) - rabbit - [ExchangeType](public_api/faststream/rabbit/ExchangeType.md) + - [QueueType](public_api/faststream/rabbit/QueueType.md) - [RabbitBroker](public_api/faststream/rabbit/RabbitBroker.md) - [RabbitExchange](public_api/faststream/rabbit/RabbitExchange.md) - [RabbitPublisher](public_api/faststream/rabbit/RabbitPublisher.md) @@ -872,6 +873,7 @@ search: - [PublishingStatus](api/faststream/prometheus/types/PublishingStatus.md) - rabbit - [ExchangeType](api/faststream/rabbit/ExchangeType.md) + - [QueueType](api/faststream/rabbit/QueueType.md) - [RabbitBroker](api/faststream/rabbit/RabbitBroker.md) - [RabbitExchange](api/faststream/rabbit/RabbitExchange.md) - [RabbitPublisher](api/faststream/rabbit/RabbitPublisher.md) @@ -932,6 +934,7 @@ search: - schemas - [BaseRMQInformation](api/faststream/rabbit/schemas/BaseRMQInformation.md) - [ExchangeType](api/faststream/rabbit/schemas/ExchangeType.md) + - [QueueType](api/faststream/rabbit/schemas/QueueType.md) - [RabbitExchange](api/faststream/rabbit/schemas/RabbitExchange.md) - [RabbitQueue](api/faststream/rabbit/schemas/RabbitQueue.md) - [ReplyConfig](api/faststream/rabbit/schemas/ReplyConfig.md) @@ -942,7 +945,16 @@ search: - proto - [BaseRMQInformation](api/faststream/rabbit/schemas/proto/BaseRMQInformation.md) - queue + - [ClassicQueueArgs](api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md) + - [CommonQueueArgs](api/faststream/rabbit/schemas/queue/CommonQueueArgs.md) + - [QueueClassicTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md) + - [QueueQuorumTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md) + - [QueueStreamTypeSpecificArgs](api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md) + - [QueueType](api/faststream/rabbit/schemas/queue/QueueType.md) + - [QuorumQueueArgs](api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md) - [RabbitQueue](api/faststream/rabbit/schemas/queue/RabbitQueue.md) + - [SharedQueueClassicAndQuorumArgs](api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md) + - [StreamQueueArgs](api/faststream/rabbit/schemas/queue/StreamQueueArgs.md) - reply - [ReplyConfig](api/faststream/rabbit/schemas/reply/ReplyConfig.md) - security diff --git a/docs/docs/en/api/faststream/rabbit/QueueType.md b/docs/docs/en/api/faststream/rabbit/QueueType.md new file mode 100644 index 0000000000..32fded7b16 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/QueueType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md new file mode 100644 index 0000000000..49ab8b9abe --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/QueueType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md new file mode 100644 index 0000000000..48ddb0825a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/ClassicQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.ClassicQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md new file mode 100644 index 0000000000..fe6c0f6768 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/CommonQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.CommonQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md new file mode 100644 index 0000000000..6c8a62838a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueClassicTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueClassicTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md new file mode 100644 index 0000000000..3b4bd645f5 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueQuorumTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueQuorumTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md new file mode 100644 index 0000000000..1e3694caa3 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueStreamTypeSpecificArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueStreamTypeSpecificArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md new file mode 100644 index 0000000000..099131fea0 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QueueType diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md new file mode 100644 index 0000000000..4db347081a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/QuorumQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.QuorumQueueArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md new file mode 100644 index 0000000000..438ba0143f --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/SharedQueueClassicAndQuorumArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.SharedQueueClassicAndQuorumArgs diff --git a/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md b/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md new file mode 100644 index 0000000000..12c12bac47 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/schemas/queue/StreamQueueArgs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.schemas.queue.StreamQueueArgs diff --git a/docs/docs_src/rabbit/subscription/stream.py b/docs/docs_src/rabbit/subscription/stream.py index 1510622402..17c99753cf 100644 --- a/docs/docs_src/rabbit/subscription/stream.py +++ b/docs/docs_src/rabbit/subscription/stream.py @@ -1,5 +1,5 @@ from faststream import FastStream, Logger -from faststream.rabbit import RabbitBroker, RabbitQueue +from faststream.rabbit import RabbitBroker, RabbitQueue, QueueType broker = RabbitBroker(max_consumers=10) app = FastStream(broker) @@ -7,9 +7,7 @@ queue = RabbitQueue( name="test-stream", durable=True, - arguments={ - "x-queue-type": "stream", - }, + queue_type=QueueType.STREAM ) diff --git a/faststream/rabbit/__init__.py b/faststream/rabbit/__init__.py index 15b323772c..98f364656d 100644 --- a/faststream/rabbit/__init__.py +++ b/faststream/rabbit/__init__.py @@ -4,6 +4,7 @@ from faststream.rabbit.router import RabbitPublisher, RabbitRoute, RabbitRouter from faststream.rabbit.schemas import ( ExchangeType, + QueueType, RabbitExchange, RabbitQueue, ReplyConfig, @@ -13,6 +14,7 @@ __all__ = ( "ExchangeType", + "QueueType", "RabbitBroker", "RabbitExchange", # Annotations diff --git a/faststream/rabbit/schemas/__init__.py b/faststream/rabbit/schemas/__init__.py index 2742cf2a68..08e72e7700 100644 --- a/faststream/rabbit/schemas/__init__.py +++ b/faststream/rabbit/schemas/__init__.py @@ -1,13 +1,14 @@ from faststream.rabbit.schemas.constants import ExchangeType from faststream.rabbit.schemas.exchange import RabbitExchange from faststream.rabbit.schemas.proto import BaseRMQInformation -from faststream.rabbit.schemas.queue import RabbitQueue +from faststream.rabbit.schemas.queue import QueueType, RabbitQueue from faststream.rabbit.schemas.reply import ReplyConfig __all__ = ( "RABBIT_REPLY", "BaseRMQInformation", "ExchangeType", + "QueueType", "RabbitExchange", "RabbitQueue", "ReplyConfig", diff --git a/faststream/rabbit/schemas/queue.py b/faststream/rabbit/schemas/queue.py index 1eadaf1e24..9ae3c322d3 100644 --- a/faststream/rabbit/schemas/queue.py +++ b/faststream/rabbit/schemas/queue.py @@ -1,9 +1,10 @@ from copy import deepcopy -from typing import TYPE_CHECKING, Optional - -from typing_extensions import Annotated, Doc +from enum import Enum +from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload from faststream.broker.schemas import NameRequired +from faststream.exceptions import SetupError +from faststream.types import EMPTY from faststream.utils.path import compile_path if TYPE_CHECKING: @@ -12,6 +13,17 @@ from faststream.types import AnyDict +class QueueType(str, Enum): + """Queue types for RabbitMQ. + + Enum should be lowercase to match RabbitMQ API. + """ + + CLASSIC = "classic" + QUORUM = "quorum" + STREAM = "stream" + + class RabbitQueue(NameRequired): """A class to represent a RabbitMQ queue. @@ -49,61 +61,103 @@ def routing(self) -> str: """Return real routing_key of object.""" return self.routing_key or self.name + @overload def __init__( self, - name: Annotated[ - str, - Doc("RabbitMQ queue name."), - ], - durable: Annotated[ - bool, - Doc("Whether the object is durable."), - ] = False, - exclusive: Annotated[ - bool, - Doc( - "The queue can be used only in the current connection " - "and will be deleted after connection closed." - ), - ] = False, - passive: Annotated[ - bool, - Doc("Do not create queue automatically."), - ] = False, - auto_delete: Annotated[ - bool, - Doc("The queue will be deleted after connection closed."), - ] = False, - arguments: Annotated[ - Optional["AnyDict"], - Doc( - "Queue declarationg arguments. " - "You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments" - ), - ] = None, - timeout: Annotated[ - "TimeoutType", - Doc("Send confirmation time from RabbitMQ."), - ] = None, - robust: Annotated[ - bool, - Doc("Whether to declare queue object as restorable."), - ] = True, - bind_arguments: Annotated[ - Optional["AnyDict"], - Doc("Queue-exchange binding options."), + name: str, + queue_type: Literal[QueueType.CLASSIC] = QueueType.CLASSIC, + durable: bool = EMPTY, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional["ClassicQueueArgs"] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[QueueType.QUORUM], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional["QuorumQueueArgs"] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + @overload + def __init__( + self, + name: str, + queue_type: Literal[QueueType.STREAM], + durable: Literal[True], + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Optional["StreamQueueArgs"] = None, + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", + ) -> None: ... + + def __init__( + self, + name: str, + queue_type: QueueType = QueueType.CLASSIC, + durable: bool = EMPTY, + exclusive: bool = False, + passive: bool = False, + auto_delete: bool = False, + arguments: Union[ + "QuorumQueueArgs", + "ClassicQueueArgs", + "StreamQueueArgs", + "AnyDict", + None, ] = None, - routing_key: Annotated[ - str, - Doc("Explicit binding routing key. Uses `name` if not presented."), - ] = "", + timeout: "TimeoutType" = None, + robust: bool = True, + bind_arguments: Optional["AnyDict"] = None, + routing_key: str = "", ) -> None: + """Initialize the RabbitMQ queue. + + :param name: RabbitMQ queue name. + :param durable: Whether the object is durable. + :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. + :param passive: Do not create queue automatically. + :param auto_delete: The queue will be deleted after connection closed. + :param arguments: Queue declaration arguments. + You can find information about them in the official RabbitMQ documentation: + https://www.rabbitmq.com/docs/queues#optional-arguments + :param timeout: Send confirmation time from RabbitMQ. + :param robust: Whether to declare queue object as restorable. + :param bind_arguments: Queue-exchange binding options. + :param routing_key: Explicit binding routing key. Uses name if not presented. + """ re, routing_key = compile_path( routing_key, replace_symbol="*", patch_regex=lambda x: x.replace(r"\#", ".+"), ) + if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM: + if durable is EMPTY: + durable = True + elif not durable: + raise SetupError("Quorum and Stream queues must be durable") + elif durable is EMPTY: + durable = False + super().__init__(name) self.path_regex = re @@ -114,7 +168,7 @@ def __init__( self.robust = robust self.passive = passive self.auto_delete = auto_delete - self.arguments = arguments + self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})} self.timeout = timeout def add_prefix(self, prefix: str) -> "RabbitQueue": @@ -126,3 +180,74 @@ def add_prefix(self, prefix: str) -> "RabbitQueue": new_q.routing_key = "".join((prefix, new_q.routing_key)) return new_q + + +CommonQueueArgs = TypedDict( + "CommonQueueArgs", + { + "x-queue-leader-locator": Literal["client-local", "balanced"], + "x-max-length-bytes": int, + }, + total=False, +) + +SharedQueueClassicAndQuorumArgs = TypedDict( + "SharedQueueClassicAndQuorumArgs", + { + "x-expires": int, + "x-message-ttl": int, + "x-single-active-consumer": bool, + "x-dead-letter-exchange": str, + "x-dead-letter-routing-key": str, + "x-max-length": int, + "x-max-priority": int, + }, + total=False, +) + + +QueueClassicTypeSpecificArgs = TypedDict( + "QueueClassicTypeSpecificArgs", + {"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"]}, + total=False, +) + +QueueQuorumTypeSpecificArgs = TypedDict( + "QueueQuorumTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish"], + "x-delivery-limit": int, + "x-quorum-initial-group-size": int, + "x-quorum-target-group-size": int, + "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], + }, + total=False, +) + + +QueueStreamTypeSpecificArgs = TypedDict( + "QueueStreamTypeSpecificArgs", + { + "x-max-age": str, + "x-stream-max-segment-size-bytes": int, + "x-stream-filter-size-bytes": int, + "x-initial-cluster-size": int, + }, + total=False, +) + + +class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs): + pass + + +class ClassicQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs +): + pass + + +class QuorumQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs +): + pass