diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index eb8ce40a18..9407aa64e3 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -13,9 +13,11 @@ Sequence, Tuple, Union, + cast, ) import anyio +import anyio.lowlevel from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic @@ -348,7 +350,11 @@ async def start(self) -> None: async def commit(self, asynchronous: bool = True) -> None: """Commits the offsets of all messages returned by the last poll operation.""" - await call_or_await(self.consumer.commit, asynchronous=asynchronous) + if asynchronous: + # Asynchronous commit is non-blocking: + self.consumer.commit(asynchronous=True) + else: + await call_or_await(self.consumer.commit, asynchronous=False) async def stop(self) -> None: """Stops the Kafka consumer and releases all resources.""" @@ -376,8 +382,14 @@ async def stop(self) -> None: async def getone(self, timeout: float = 0.1) -> Optional[Message]: """Consumes a single message from Kafka.""" - async with self._lock: - msg = await call_or_await(self.consumer.poll, timeout) + msg: Optional[Message] = None + with anyio.move_on_after(delay=timeout): + while msg is None: + # We can't remove the lock from here, + # because a race condition with the stop method is still possible: + async with self._lock: + msg = self.consumer.poll(0) + await anyio.lowlevel.checkpoint() return check_msg_error(msg) async def getmany( @@ -386,12 +398,20 @@ async def getmany( max_records: Optional[int] = 10, ) -> Tuple[Message, ...]: """Consumes a batch of messages from Kafka and groups them by topic and partition.""" - async with self._lock: - raw_messages: List[Optional[Message]] = await call_or_await( - self.consumer.consume, # type: ignore[arg-type] - num_messages=max_records or 10, - timeout=timeout, - ) + raw_messages: List[Optional[Message]] = [] + max_records = max_records or 10 + with anyio.move_on_after(delay=timeout): + while to_consume := max_records - len(raw_messages): + # We can't remove the lock from here, + # because a race condition with the stop method is still possible: + async with self._lock: + raw_messages.extend( + cast( + List[Optional[Message]], + self.consumer.consume(num_messages=to_consume, timeout=0), + ) + ) + await anyio.lowlevel.checkpoint() return tuple(x for x in map(check_msg_error, raw_messages) if x is not None) diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index b435f35433..c321b93572 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -378,18 +378,14 @@ def __init__( async def get_msg(self) -> Optional[Tuple["Message", ...]]: assert self.consumer, "You should setup subscriber at first." # nosec B101 - - messages = await self.consumer.getmany( - timeout=self.polling_interval, - max_records=self.max_records, + return ( + await self.consumer.getmany( + timeout=self.polling_interval, + max_records=self.max_records, + ) + or None ) - if not messages: # TODO: why we are sleeping here? - await anyio.sleep(self.polling_interval) - return None - - return messages - def get_log_context( self, message: Optional["StreamMessage[Tuple[Message, ...]]"],