Skip to content

Commit

Permalink
add type annotations for RabbitQueue and enum for queue type
Browse files Browse the repository at this point in the history
  • Loading branch information
pepellsd committed Dec 21, 2024
1 parent 684ef1a commit 058e848
Showing 1 changed file with 165 additions and 50 deletions.
215 changes: 165 additions & 50 deletions faststream/rabbit/schemas/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from copy import deepcopy
from typing import TYPE_CHECKING, Optional

from typing_extensions import Annotated, Doc
from enum import Enum
from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload

from faststream.broker.schemas import NameRequired
from faststream.utils.path import compile_path
Expand All @@ -12,6 +11,85 @@
from faststream.types import AnyDict


class RabbitQueueType(Enum):
Classic = "classic"
Quorum = "quorum"
Stream = "stream"


CommonQueueArgs = TypedDict(
"CommonQueueArgs",
{
"x-queue-leader-locator": Literal["client-local", "balanced"],
"x-max-length-bytes": int,
},
total=True,
)

SharedQueueClassicAndQuorumArgs = TypedDict(
"SharedQueueClassicAndQuorumArgs",
{
"x-expires": int,
"x-message-ttl": int,
"x-single-active-consumer": bool,
"x-dead-letter-exchange": str,
"x-dead-letter-routing-key": str,
"x-max-length": int,
},
total=True,
)


QueueClassicTypeSpecificArgs = TypedDict(
"QueueClassicTypeSpecificArgs",
{
"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"],
"x-max-priority": int,
},
total=True,
)

QueueQuorumTypeSpecificArgs = TypedDict(
"QueueQuorumTypeSpecificArgs",
{
"x-overflow": Literal["drop-head", "reject-publish"],
"x-delivery-limit": int,
"x-quorum-initial-group-size": int,
"x-quorum-target-group-size": int,
"x-dead-letter-strategy": Literal["at-most-once", "at-least-once"],
},
total=True,
)


QueueStreamTypeSpecificArgs = TypedDict(
"QueueStreamTypeSpecificArgs",
{
"x-max-age": str,
"x-stream-max-segment-size-bytes": int,
"x-stream-filter-size-bytes": int,
"x-initial-cluster-size": int,
},
total=True,
)


class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs):
pass


class ClassicQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs
):
pass


class QuorumQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs
):
pass


class RabbitQueue(NameRequired):
"""A class to represent a RabbitMQ queue.
Expand Down Expand Up @@ -49,61 +127,98 @@ def routing(self) -> str:
"""Return real routing_key of object."""
return self.routing_key or self.name

@overload
def __init__(
self,
name: Annotated[
str,
Doc("RabbitMQ queue name."),
],
durable: Annotated[
bool,
Doc("Whether the object is durable."),
] = False,
exclusive: Annotated[
bool,
Doc(
"The queue can be used only in the current connection "
"and will be deleted after connection closed."
),
] = False,
passive: Annotated[
bool,
Doc("Do not create queue automatically."),
] = False,
auto_delete: Annotated[
bool,
Doc("The queue will be deleted after connection closed."),
] = False,
arguments: Annotated[
Optional["AnyDict"],
Doc(
"Queue declarationg arguments. "
"You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments"
),
] = None,
timeout: Annotated[
"TimeoutType",
Doc("Send confirmation time from RabbitMQ."),
] = None,
robust: Annotated[
bool,
Doc("Whether to declare queue object as restorable."),
] = True,
bind_arguments: Annotated[
Optional["AnyDict"],
Doc("Queue-exchange binding options."),
name: str,
queue_type: Literal[RabbitQueueType.Classic],
durable: Literal[True, False] = False,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[ClassicQueueArgs] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
routing_key: str = "",
) -> None: ...

@overload
def __init__(
self,
name: str,
queue_type: Literal[RabbitQueueType.Quorum],
durable: Literal[True],
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[QuorumQueueArgs] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
routing_key: str = "",
) -> None: ...

@overload
def __init__(
self,
name: str,
queue_type: Literal[RabbitQueueType.Stream],
durable: Literal[True],
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[StreamQueueArgs] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
routing_key: str = "",
) -> None: ...

def __init__(
self,
name: str,
queue_type: Literal[
RabbitQueueType.Quorum, RabbitQueueType.Classic, RabbitQueueType.Stream
] = RabbitQueueType.Classic,
durable: Literal[True, False] = False,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[
Union[QuorumQueueArgs, ClassicQueueArgs, StreamQueueArgs]
] = None,
routing_key: Annotated[
str,
Doc("Explicit binding routing key. Uses `name` if not presented."),
] = "",
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
routing_key: str = "",
) -> None:
"""Initialize the RabbitMQ queue.
:param name: RabbitMQ queue name.
:param durable: Whether the object is durable.
:param exclusive: The queue can be used only in the current connection and will be deleted after connection closed.
:param passive: Do not create queue automatically.
:param auto_delete: The queue will be deleted after connection closed.
:param arguments: Queue declaration arguments.
You can find information about them in the official RabbitMQ documentation:
https://www.rabbitmq.com/docs/queues#optional-arguments
:param timeout: Send confirmation time from RabbitMQ.
:param robust: Whether to declare queue object as restorable.
:param bind_arguments: Queue-exchange binding options.
:param routing_key: Explicit binding routing key. Uses name if not presented.
"""
re, routing_key = compile_path(
routing_key,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\#", ".+"),
patch_regex=lambda x: x.replace(r"#", ".+"),
)

_arguments = {"x-queue-type": queue_type.value}

if arguments:
_arguments.update(**arguments)

super().__init__(name)

self.path_regex = re
Expand All @@ -114,7 +229,7 @@ def __init__(
self.robust = robust
self.passive = passive
self.auto_delete = auto_delete
self.arguments = arguments
self.arguments = _arguments
self.timeout = timeout

def add_prefix(self, prefix: str) -> "RabbitQueue":
Expand Down

0 comments on commit 058e848

Please sign in to comment.