Skip to content

Commit

Permalink
feat: change returns kafka publish, change annotation and convert Doc…
Browse files Browse the repository at this point in the history
… to doc string
  • Loading branch information
spataphore1337 committed Dec 11, 2024
1 parent 921ead8 commit e5b65ed
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 114 deletions.
13 changes: 6 additions & 7 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Union

from typing_extensions import override

Expand All @@ -13,9 +13,8 @@

if TYPE_CHECKING:
import asyncio

from aiokafka import AIOKafkaProducer

from aiokafka.structs import RecordMetadata
from faststream._internal.types import CustomCallable
from faststream.kafka.response import KafkaPublishCommand

Expand Down Expand Up @@ -58,7 +57,7 @@ def closed(self) -> bool:
async def publish( # type: ignore[override]
self,
cmd: "KafkaPublishCommand",
) -> "asyncio.Future":
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish a message to a topic."""
message, content_type = encode_message(cmd.body)

Expand All @@ -77,13 +76,13 @@ async def publish( # type: ignore[override]
)

if not cmd.no_confirm:
await send_future
return await send_future
return send_future

async def publish_batch(
self,
cmd: "KafkaPublishCommand",
) -> "asyncio.Future":
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish a batch of messages to a topic."""
batch = self._producer.producer.create_batch()

Expand Down Expand Up @@ -113,7 +112,7 @@ async def publish_batch(
partition=cmd.partition,
)
if not cmd.no_confirm:
await send_future
return await send_future
return send_future

@override
Expand Down
244 changes: 137 additions & 107 deletions faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Any,
Optional,
Union,
overload, Literal,
)

from aiokafka import ConsumerRecord
Expand All @@ -19,14 +20,15 @@

if TYPE_CHECKING:
import asyncio

from aiokafka.structs import RecordMetadata
from faststream._internal.basic_types import SendableMessage
from faststream._internal.types import BrokerMiddleware, PublisherMiddleware
from faststream.kafka.message import KafkaMessage
from faststream.kafka.publisher.producer import AioKafkaFastProducer
from faststream.response.response import PublishCommand



class LogicPublisher(PublisherUsecase[MsgType]):
"""A class to publish messages to a Kafka topic."""

Expand Down Expand Up @@ -157,70 +159,83 @@ def __init__(

self.key = key

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True] = False,
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

@override
async def publish(
self,
message: Annotated[
"SendableMessage",
Doc("Message body to send."),
],
topic: Annotated[
str,
Doc("Topic where the message will be published."),
] = "",
message: "SendableMessage",
topic: str = "",
*,
key: Annotated[
Union[bytes, Any, None],
Doc(
"""
A key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is `None` (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is `None`, partition is chosen randomly).
Must be type `bytes`, or be serializable to bytes via configured
`key_serializer`.
""",
),
] = None,
partition: Annotated[
Optional[int],
Doc(
"""
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`.
""",
),
] = None,
timestamp_ms: Annotated[
Optional[int],
Doc(
"""
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
""",
),
] = None,
headers: Annotated[
Optional[dict[str, str]],
Doc("Message headers to store metainformation."),
] = None,
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages.",
),
] = None,
reply_to: Annotated[
str,
Doc("Reply message topic name to send response."),
] = "",
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> "asyncio.Future":
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""
Args:
message:
Message body to send.
topic:
Topic where the message will be published.
key:
A key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is `None` (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is `None`, partition is chosen randomly).
Must be type `bytes`, or be serializable to bytes via configured
`key_serializer`
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
reply_to:
Reply message topic name to send response.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
cmd = KafkaPublishCommand(
message,
topic=topic or self.topic,
Expand Down Expand Up @@ -327,55 +342,70 @@ async def request(


class BatchPublisher(LogicPublisher[tuple["ConsumerRecord", ...]]):

@overload
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True] = False,
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

@override
async def publish(
self,
*messages: Annotated[
"SendableMessage",
Doc("Messages bodies to send."),
],
topic: Annotated[
str,
Doc("Topic where the message will be published."),
] = "",
partition: Annotated[
Optional[int],
Doc(
"""
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`.
""",
),
] = None,
timestamp_ms: Annotated[
Optional[int],
Doc(
"""
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
""",
),
] = None,
headers: Annotated[
Optional[dict[str, str]],
Doc("Messages headers to store metainformation."),
] = None,
reply_to: Annotated[
str,
Doc("Reply message topic name to send response."),
] = "",
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages.",
),
] = None,
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> "asyncio.Future":
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""
Args:
*messages:
Messages bodies to send.
topic:
Topic where the message will be published.
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
reply_to:
Reply message topic name to send response.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
cmd = KafkaPublishCommand(
*messages,
key=None,
Expand Down

0 comments on commit e5b65ed

Please sign in to comment.