Skip to content

Commit

Permalink
refactor: remove concrete topic asking
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Jul 17, 2024
1 parent 4a253aa commit bdeed92
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 17 deletions.
12 changes: 1 addition & 11 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,21 +527,11 @@ async def publish_batch(

@override
async def ping(self, timeout: Optional[float]) -> bool:
random_topic: Optional[str]

if sub := next(iter(self._subscribers.values()), None):
random_topic = next(iter(sub.topics), None)
else:
random_topic = None

with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False

if self._producer is None:
return False

return await self._producer._producer.ping(
timeout=timeout,
topic=random_topic,
)
return await self._producer._producer.ping(timeout=timeout)
7 changes: 1 addition & 6 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,15 @@ async def send_batch(
async def ping(
self,
timeout: Optional[float] = 5.0,
topic: Optional[str] = None,
) -> bool:
"""Implement ping using `list_topics` information request."""
if timeout is None:
timeout = -1

kwargs: Dict[str, Any] = {"timeout": timeout}
if topic:
kwargs["topic"] = topic

try:
cluster_metadata = await call_or_await(
self.producer.list_topics,
**kwargs,
timeout=timeout,
)

return bool(cluster_metadata)
Expand Down

0 comments on commit bdeed92

Please sign in to comment.