Skip to content

Commit

Permalink
feat: add BatchBufferOverflowException
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 committed Dec 16, 2024
1 parent 001aedd commit 26b66a9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
14 changes: 14 additions & 0 deletions faststream/kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faststream.exceptions import FastStreamException


class BatchBufferOverflowException(FastStreamException):
"""Exception raised when a buffer overflow occurs when adding a new message to the batches."""

def __init__(self, message_position: int) -> None:
self.message_position = message_position

def __str__(self) -> str:
return (
f"The batch buffer is full. The position of the message"
f" in the transferred collection at which the overflow occurred: {self.message_position}"
)
7 changes: 5 additions & 2 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from faststream.broker.publisher.proto import ProducerProto
from faststream.broker.utils import resolve_custom_func
from faststream.exceptions import OperationForbiddenError
from faststream.kafka.exceptions import BatchBufferOverflowException
from faststream.kafka.message import KafkaMessage
from faststream.kafka.parser import AioKafkaParser

Expand Down Expand Up @@ -100,7 +101,7 @@ async def publish_batch(
reply_to,
)

for msg in msgs:
for message_position, msg in msgs:
message, content_type = encode_message(msg)

if content_type:
Expand All @@ -111,12 +112,14 @@ async def publish_batch(
else:
final_headers = headers_to_send.copy()

batch.append(
metadata = batch.append(
key=None,
value=message,
timestamp=timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
)
if metadata is None:
raise BatchBufferOverflowException(message_position=message_position)

send_future = await self._producer.send_batch(batch, topic, partition=partition)
if not no_confirm:
Expand Down
17 changes: 17 additions & 0 deletions tests/brokers/kafka/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from faststream import Context
from faststream.kafka import KafkaBroker, KafkaResponse
from faststream.kafka.exceptions import BatchBufferOverflowException
from tests.brokers.base.publish import BrokerPublishTestcase


Expand Down Expand Up @@ -133,3 +134,19 @@ async def handle_next(msg=Context("message")):
body=b"1",
key=b"1",
)

@pytest.mark.asyncio
async def test_raise_buffer_overflow_exception(
self, queue: str, mock: Mock
) -> None:
pub_broker = self.get_broker(max_batch_size=16)

@pub_broker.subscriber(queue)
async def handler(m) -> None:
pass

async with self.patch_broker(pub_broker) as br:
await br.start()
with pytest.raises(BatchBufferOverflowException) as e:
await br.publish_batch(1, "Hello, world!", topic=queue, no_confirm=True)
assert e.value.message_position == 1

0 comments on commit 26b66a9

Please sign in to comment.