Skip to content

Commit

Permalink
add tests and use lint.sh
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 committed Dec 13, 2024
1 parent f6bbe0b commit 39b2a90
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
50 changes: 25 additions & 25 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Optional,
TypeVar,
Union,
overload
overload,
)

import aiokafka
Expand Down Expand Up @@ -878,30 +878,30 @@ async def publish_batch(
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Args:
*messages:
Messages bodies to send.
topic:
Topic where the message will be published.
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
reply_to:
Reply message topic name to send response.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
*messages:
Messages bodies to send.
topic:
Topic where the message will be published.
partition:
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`
timestamp_ms:
Epoch milliseconds (from Jan 1 1970 UTC) to use as
the message timestamp. Defaults to current time.
headers:
Message headers to store metainformation.
reply_to:
Reply message topic name to send response.
correlation_id:
Manual message **correlation_id** setter.
**correlation_id** is a useful option to trace messages.
no_confirm:
Do not wait for Kafka publish confirmation.
Returns:
`asyncio.Future[RecordMetadata]` if no_confirm = True.
`RecordMetadata` if no_confirm = False.
"""
assert self._producer, NOT_CONNECTED_YET # nosec B101

cmd = KafkaPublishCommand(
Expand Down
28 changes: 25 additions & 3 deletions tests/brokers/kafka/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import Mock

import pytest
from aiokafka.structs import RecordMetadata

from faststream import Context
from faststream.kafka import KafkaResponse
Expand All @@ -25,15 +26,15 @@ async def handler(msg) -> None:
async with self.patch_broker(pub_broker) as br:
await br.start()

await br.publish_batch(1, "hi", topic=queue)

record_metadata = await br.publish_batch(1, "hi", topic=queue)
result, _ = await asyncio.wait(
(
asyncio.create_task(msgs_queue.get()),
asyncio.create_task(msgs_queue.get()),
),
timeout=3,
)
assert isinstance(record_metadata, RecordMetadata)

assert {1, "hi"} == {r.result() for r in result}

Expand Down Expand Up @@ -82,7 +83,7 @@ async def pub(m):
async with self.patch_broker(pub_broker) as br:
await br.start()

await br.publish("", queue + "1")
record_metadata = await br.publish("", queue + "1")

result, _ = await asyncio.wait(
(
Expand All @@ -91,6 +92,7 @@ async def pub(m):
),
timeout=3,
)
assert isinstance(record_metadata, RecordMetadata)

assert {1, "hi"} == {r.result() for r in result}

Expand Down Expand Up @@ -133,3 +135,23 @@ async def handle_next(msg=Context("message")) -> None:
body=b"1",
key=b"1",
)

@pytest.mark.asyncio()
async def test_return_future(
self,
queue: str,
mock: Mock,
) -> None:
pub_broker = self.get_broker()

@pub_broker.subscriber(queue)
async def handler(m) -> None:
pass

async with self.patch_broker(pub_broker) as br:
await br.start()

batch_record_metadata_future = await br.publish_batch(1, "hi", topic=queue, no_confirm=True)
record_metadata_future = await br.publish("", topic=queue, no_confirm=True)
assert isinstance(batch_record_metadata_future, asyncio.Future)
assert isinstance(record_metadata_future, asyncio.Future)

0 comments on commit 39b2a90

Please sign in to comment.