Skip to content

Commit

Permalink
change annotations on broker publish and publish_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 committed Dec 13, 2024
1 parent 9ad6263 commit f6bbe0b
Showing 1 changed file with 140 additions and 109 deletions.
249 changes: 140 additions & 109 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Optional,
TypeVar,
Union,
overload
)

import aiokafka
Expand Down Expand Up @@ -41,6 +42,7 @@

from aiokafka import ConsumerRecord
from aiokafka.abc import AbstractTokenProvider
from aiokafka.structs import RecordMetadata
from fast_depends.dependencies import Dependant
from fast_depends.library.serializer import SerializerProto
from typing_extensions import TypedDict, Unpack
Expand Down Expand Up @@ -663,76 +665,91 @@ def _subscriber_setup_extra(self) -> "AnyDict":
"builder": self._connection,
}

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True] = False,
) -> "asyncio.Future[RecordMetadata]":
...

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
) -> "RecordMetadata":
...

@override
async def publish( # type: ignore[override]
self,
message: Annotated[
"SendableMessage",
Doc("Message body to send."),
],
topic: Annotated[
str,
Doc("Topic where the message will be published."),
],
*,
key: Annotated[
Union[bytes, Any, None],
Doc(
"""
A key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is `None` (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is `None`, partition is chosen randomly).
Must be type `bytes`, or be serializable to bytes via configured
`key_serializer`.
""",
),
] = None,
partition: Annotated[
Optional[int],
Doc(
"""
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`.
""",
),
] = None,
timestamp_ms: Annotated[
Optional[int],
Doc(
"""
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
""",
),
] = None,
headers: Annotated[
Optional[dict[str, str]],
Doc("Message headers to store metainformation."),
] = None,
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages.",
),
] = None,
reply_to: Annotated[
str,
Doc("Reply message topic name to send response."),
] = "",
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> "asyncio.Future":
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish message directly.
This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
applications or to publish messages from time to time.
Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
Args:
message:
Message body to send.
topic:
Topic where the message will be published.
key:
A key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is `None` (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is `None`, partition is chosen randomly).
Must be type `bytes`, or be serializable to bytes via configured
`key_serializer`
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
reply_to:
Reply message topic name to send response.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
cmd = KafkaPublishCommand(
message,
Expand Down Expand Up @@ -823,54 +840,68 @@ async def request( # type: ignore[override]
msg: KafkaMessage = await super()._basic_request(cmd, producer=self._producer)
return msg

@overload
async def publish_batch(
self,
*messages: Annotated[
"SendableMessage",
Doc("Messages bodies to send."),
],
topic: Annotated[
str,
Doc("Topic where the message will be published."),
] = "",
partition: Annotated[
Optional[int],
Doc(
"""
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`.
""",
),
] = None,
timestamp_ms: Annotated[
Optional[int],
Doc(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True] = False,
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish_batch(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

async def publish_batch(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Args:
*messages:
Messages bodies to send.
topic:
Topic where the message will be published.
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
reply_to:
Reply message topic name to send response.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
""",
),
] = None,
headers: Annotated[
Optional[dict[str, str]],
Doc("Messages headers to store metainformation."),
] = None,
reply_to: Annotated[
str,
Doc("Reply message topic name to send response."),
] = "",
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages.",
),
] = None,
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> "asyncio.Future":
assert self._producer, NOT_CONNECTED_YET # nosec B101

cmd = KafkaPublishCommand(
Expand Down

0 comments on commit f6bbe0b

Please sign in to comment.