Skip to content

Commit

Permalink
feat: Non-blocking polling for confluent kafka driver.
Browse files Browse the repository at this point in the history
  • Loading branch information
DABND19 committed Dec 21, 2024
1 parent 2ba3eed commit 51477a5
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
Sequence,
Tuple,
Union,
cast,
)

import anyio
import anyio.lowlevel
from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer
from confluent_kafka.admin import AdminClient, NewTopic

Expand Down Expand Up @@ -376,8 +378,12 @@ async def stop(self) -> None:

async def getone(self, timeout: float = 0.1) -> Optional[Message]:
"""Consumes a single message from Kafka."""
async with self._lock:
msg = await call_or_await(self.consumer.poll, timeout)
msg: Optional[Message] = None
with anyio.move_on_after(delay=timeout):
while msg is None:
async with self._lock:
msg = self.consumer.poll(0)
await anyio.lowlevel.checkpoint()
return check_msg_error(msg)

async def getmany(
Expand All @@ -386,12 +392,18 @@ async def getmany(
max_records: Optional[int] = 10,
) -> Tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
async with self._lock:
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)
raw_messages: List[Optional[Message]] = []
max_records = max_records or 10
with anyio.move_on_after(delay=timeout):
while to_consume := max_records - len(raw_messages):
async with self._lock:
raw_messages.extend(
cast(
List[Optional[Message]],
self.consumer.consume(num_messages=to_consume, timeout=0),
)
)
await anyio.lowlevel.checkpoint()

return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)

Expand Down

0 comments on commit 51477a5

Please sign in to comment.