From e5b65ed7073dff3e96af5653da0a61cca0601b0c Mon Sep 17 00:00:00 2001 From: spataphore1337 Date: Wed, 11 Dec 2024 21:19:05 +0300 Subject: [PATCH] feat: change returns kafka publish, change annotation and convert Doc to doc string --- faststream/kafka/publisher/producer.py | 13 +- faststream/kafka/publisher/usecase.py | 244 ++++++++++++++----------- 2 files changed, 143 insertions(+), 114 deletions(-) diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index a6574e104d..fca1a01ea0 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Union from typing_extensions import override @@ -13,9 +13,8 @@ if TYPE_CHECKING: import asyncio - from aiokafka import AIOKafkaProducer - + from aiokafka.structs import RecordMetadata from faststream._internal.types import CustomCallable from faststream.kafka.response import KafkaPublishCommand @@ -58,7 +57,7 @@ def closed(self) -> bool: async def publish( # type: ignore[override] self, cmd: "KafkaPublishCommand", - ) -> "asyncio.Future": + ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a message to a topic.""" message, content_type = encode_message(cmd.body) @@ -77,13 +76,13 @@ async def publish( # type: ignore[override] ) if not cmd.no_confirm: - await send_future + return await send_future return send_future async def publish_batch( self, cmd: "KafkaPublishCommand", - ) -> "asyncio.Future": + ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a batch of messages to a topic.""" batch = self._producer.producer.create_batch() @@ -113,7 +112,7 @@ async def publish_batch( partition=cmd.partition, ) if not cmd.no_confirm: - await send_future + return await send_future return send_future @override diff --git a/faststream/kafka/publisher/usecase.py b/faststream/kafka/publisher/usecase.py index 895abe044c..5992898a60 100644 --- a/faststream/kafka/publisher/usecase.py +++ b/faststream/kafka/publisher/usecase.py @@ -5,6 +5,7 @@ Any, Optional, Union, + overload, Literal, ) from aiokafka import ConsumerRecord @@ -19,7 +20,7 @@ if TYPE_CHECKING: import asyncio - + from aiokafka.structs import RecordMetadata from faststream._internal.basic_types import SendableMessage from faststream._internal.types import BrokerMiddleware, PublisherMiddleware from faststream.kafka.message import KafkaMessage @@ -27,6 +28,7 @@ from faststream.response.response import PublishCommand + class LogicPublisher(PublisherUsecase[MsgType]): """A class to publish messages to a Kafka topic.""" @@ -157,70 +159,83 @@ def __init__( self.key = key + @overload + async def publish( + self, + message: "SendableMessage", + topic: str = "", + *, + key: Union[bytes, Any, None] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + correlation_id: Optional[str] = None, + reply_to: str = "", + no_confirm: Literal[True] = False, + ) -> "asyncio.Future[RecordMetadata]": ... + + @overload + async def publish( + self, + message: "SendableMessage", + topic: str = "", + *, + key: Union[bytes, Any, None] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + correlation_id: Optional[str] = None, + reply_to: str = "", + no_confirm: Literal[False] = False, + ) -> "RecordMetadata": ... + @override async def publish( self, - message: Annotated[ - "SendableMessage", - Doc("Message body to send."), - ], - topic: Annotated[ - str, - Doc("Topic where the message will be published."), - ] = "", + message: "SendableMessage", + topic: str = "", *, - key: Annotated[ - Union[bytes, Any, None], - Doc( - """ - A key to associate with the message. Can be used to - determine which partition to send the message to. If partition - is `None` (and producer's partitioner config is left as default), - then messages with the same key will be delivered to the same - partition (but if key is `None`, partition is chosen randomly). - Must be type `bytes`, or be serializable to bytes via configured - `key_serializer`. - """, - ), - ] = None, - partition: Annotated[ - Optional[int], - Doc( - """ - Specify a partition. If not set, the partition will be - selected using the configured `partitioner`. - """, - ), - ] = None, - timestamp_ms: Annotated[ - Optional[int], - Doc( - """ - Epoch milliseconds (from Jan 1 1970 UTC) to use as - the message timestamp. Defaults to current time. - """, - ), - ] = None, - headers: Annotated[ - Optional[dict[str, str]], - Doc("Message headers to store metainformation."), - ] = None, - correlation_id: Annotated[ - Optional[str], - Doc( - "Manual message **correlation_id** setter. " - "**correlation_id** is a useful option to trace messages.", - ), - ] = None, - reply_to: Annotated[ - str, - Doc("Reply message topic name to send response."), - ] = "", - no_confirm: Annotated[ - bool, - Doc("Do not wait for Kafka publish confirmation."), - ] = False, - ) -> "asyncio.Future": + key: Union[bytes, Any, None] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + correlation_id: Optional[str] = None, + reply_to: str = "", + no_confirm: bool = False, + ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: + """ + Args: + message: + Message body to send. + topic: + Topic where the message will be published. + key: + A key to associate with the message. Can be used to + determine which partition to send the message to. If partition + is `None` (and producer's partitioner config is left as default), + then messages with the same key will be delivered to the same + partition (but if key is `None`, partition is chosen randomly). + Must be type `bytes`, or be serializable to bytes via configured + `key_serializer` + partition: + Specify a partition. If not set, the partition will be + selected using the configured `partitioner` + timestamp_ms: + Epoch milliseconds (from Jan 1 1970 UTC) to use as + the message timestamp. Defaults to current time. + headers: + Message headers to store metainformation. + correlation_id: + Manual message **correlation_id** setter. + **correlation_id** is a useful option to trace messages. + reply_to: + Reply message topic name to send response. + no_confirm: + Do not wait for Kafka publish confirmation. + Returns: + `asyncio.Future[RecordMetadata]` if no_confirm = True. + `RecordMetadata` if no_confirm = False. + """ cmd = KafkaPublishCommand( message, topic=topic or self.topic, @@ -327,55 +342,70 @@ async def request( class BatchPublisher(LogicPublisher[tuple["ConsumerRecord", ...]]): + + @overload + async def publish( + self, + *messages: "SendableMessage", + topic: str = "", + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + reply_to: str = "", + correlation_id: Optional[str] = None, + no_confirm: Literal[True] = False, + ) -> "asyncio.Future[RecordMetadata]": ... + + @overload + async def publish( + self, + *messages: "SendableMessage", + topic: str = "", + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + reply_to: str = "", + correlation_id: Optional[str] = None, + no_confirm: Literal[False] = False, + ) -> "RecordMetadata": ... + @override async def publish( self, - *messages: Annotated[ - "SendableMessage", - Doc("Messages bodies to send."), - ], - topic: Annotated[ - str, - Doc("Topic where the message will be published."), - ] = "", - partition: Annotated[ - Optional[int], - Doc( - """ - Specify a partition. If not set, the partition will be - selected using the configured `partitioner`. - """, - ), - ] = None, - timestamp_ms: Annotated[ - Optional[int], - Doc( - """ - Epoch milliseconds (from Jan 1 1970 UTC) to use as - the message timestamp. Defaults to current time. - """, - ), - ] = None, - headers: Annotated[ - Optional[dict[str, str]], - Doc("Messages headers to store metainformation."), - ] = None, - reply_to: Annotated[ - str, - Doc("Reply message topic name to send response."), - ] = "", - correlation_id: Annotated[ - Optional[str], - Doc( - "Manual message **correlation_id** setter. " - "**correlation_id** is a useful option to trace messages.", - ), - ] = None, - no_confirm: Annotated[ - bool, - Doc("Do not wait for Kafka publish confirmation."), - ] = False, - ) -> "asyncio.Future": + *messages: "SendableMessage", + topic: str = "", + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[dict[str, str]] = None, + reply_to: str = "", + correlation_id: Optional[str] = None, + no_confirm: bool = False, + ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: + """ + Args: + *messages: + Messages bodies to send. + topic: + Topic where the message will be published. + partition: + Specify a partition. If not set, the partition will be + selected using the configured `partitioner` + timestamp_ms: + Epoch milliseconds (from Jan 1 1970 UTC) to use as + the message timestamp. Defaults to current time. + headers: + Message headers to store metainformation. + reply_to: + Reply message topic name to send response. + correlation_id: + Manual message **correlation_id** setter. + **correlation_id** is a useful option to trace messages. + no_confirm: + Do not wait for Kafka publish confirmation. + Returns: + `asyncio.Future[RecordMetadata]` if no_confirm = True. + `RecordMetadata` if no_confirm = False. + """ cmd = KafkaPublishCommand( *messages, key=None,