From 8b806d111a3a59dd68800fed166584920760fc2c Mon Sep 17 00:00:00 2001 From: DABND19 Date: Sat, 21 Dec 2024 16:44:00 +0300 Subject: [PATCH] feat: Non-blocking polling for confluent kafka driver. --- faststream/confluent/client.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index eb8ce40a18..fe91d79521 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -13,9 +13,11 @@ Sequence, Tuple, Union, + cast, ) import anyio +import anyio.lowlevel from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic @@ -376,8 +378,12 @@ async def stop(self) -> None: async def getone(self, timeout: float = 0.1) -> Optional[Message]: """Consumes a single message from Kafka.""" - async with self._lock: - msg = await call_or_await(self.consumer.poll, timeout) + msg = None + with anyio.move_on_after(delay=timeout): + while msg is None: + async with self._lock: + msg = self.consumer.poll(0) + await anyio.lowlevel.checkpoint() return check_msg_error(msg) async def getmany( @@ -386,12 +392,18 @@ async def getmany( max_records: Optional[int] = 10, ) -> Tuple[Message, ...]: """Consumes a batch of messages from Kafka and groups them by topic and partition.""" - async with self._lock: - raw_messages: List[Optional[Message]] = await call_or_await( - self.consumer.consume, # type: ignore[arg-type] - num_messages=max_records or 10, - timeout=timeout, - ) + raw_messages: List[Optional[Message]] = [] + max_records = max_records or 10 + with anyio.move_on_after(delay=timeout): + while to_consume := max_records - len(raw_messages): + async with self._lock: + raw_messages.extend( + cast( + List[Optional[Message]], + self.consumer.consume(num_messages=to_consume, timeout=0), + ) + ) + await anyio.lowlevel.checkpoint() return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)