diff --git a/faststream/kafka/exceptions.py b/faststream/kafka/exceptions.py new file mode 100644 index 0000000000..8d398706b9 --- /dev/null +++ b/faststream/kafka/exceptions.py @@ -0,0 +1,13 @@ +from faststream.exceptions import FastStreamException + + +class BatchBufferOverflowException(FastStreamException): + """Exception raised when a buffer overflow occurs when adding a new message to the batches.""" + + def __init__(self, + message_position: int) -> None: + self.message_position = message_position + + def __str__(self) -> str: + return f"The batch buffer is full. The position of the message" \ + f" in the transferred collection at which the overflow occurred: {self.message_position}" diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index bdd8d3677c..e0764bf96a 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -5,12 +5,12 @@ from faststream._internal.publisher.proto import ProducerProto from faststream._internal.subscriber.utils import resolve_custom_func from faststream.exceptions import FeatureNotSupportedException +from faststream.kafka.exceptions import BatchBufferOverflowException from faststream.kafka.message import KafkaMessage from faststream.kafka.parser import AioKafkaParser from faststream.message import encode_message from .state import EmptyProducerState, ProducerState, RealProducer - if TYPE_CHECKING: import asyncio @@ -90,7 +90,7 @@ async def publish_batch( headers_to_send = cmd.headers_to_publish() - for body in cmd.batch_bodies: + for message_position, body in enumerate(cmd.batch_bodies): message, content_type = encode_message(body) if content_type: @@ -101,12 +101,14 @@ async def publish_batch( else: final_headers = headers_to_send.copy() - batch.append( + metadata = batch.append( key=None, value=message, timestamp=cmd.timestamp_ms, headers=[(i, j.encode()) for i, j in final_headers.items()], ) + if metadata is None: + raise BatchBufferOverflowException(message_position=message_position) send_future = await self._producer.producer.send_batch( batch,