Skip to content

Commit

Permalink
Merge branch 'main' into update-release-notes-0.5.32
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl authored Dec 5, 2024
2 parents c75d5b1 + 4f23cbc commit 441a289
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ def __init__(
self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg]

# We shouldn't read messages and close consumer concurrently
# https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
self._lock = anyio.Lock()

@property
def topics_to_create(self) -> List[str]:
return list({*self.topics, *(p.topic for p in self.partitions)})
Expand Down Expand Up @@ -367,11 +371,13 @@ async def stop(self) -> None:
)

# Wrap calls to async to make method cancelable by timeout
await call_or_await(self.consumer.close)
async with self._lock:
await call_or_await(self.consumer.close)

async def getone(self, timeout: float = 0.1) -> Optional[Message]:
"""Consumes a single message from Kafka."""
msg = await call_or_await(self.consumer.poll, timeout)
async with self._lock:
msg = await call_or_await(self.consumer.poll, timeout)
return check_msg_error(msg)

async def getmany(
Expand All @@ -380,11 +386,12 @@ async def getmany(
max_records: Optional[int] = 10,
) -> Tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)
async with self._lock:
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)

return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)

Expand Down

0 comments on commit 441a289

Please sign in to comment.