Skip to content

Commit

Permalink
Replace async consumer with confluent-kafka one
Browse files Browse the repository at this point in the history
  • Loading branch information
Mátyás Kuti committed Jan 11, 2024
1 parent ecdef2e commit ad83b03
Show file tree
Hide file tree
Showing 13 changed files with 630 additions and 122 deletions.
29 changes: 23 additions & 6 deletions karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from kafka.errors import (
AuthenticationFailedError,
for_code,
IllegalStateError,
KafkaTimeoutError,
NoBrokersAvailable,
UnknownTopicOrPartitionError,
)
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

Expand Down Expand Up @@ -47,6 +54,10 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception:
KafkaError._UNKNOWN_TOPIC, # pylint: disable=protected-access
):
return UnknownTopicOrPartitionError()
if code == KafkaError._TIMED_OUT: # pylint: disable=protected-access
return KafkaTimeoutError()
if code == KafkaError._STATE: # pylint: disable=protected-access
return IllegalStateError()

return for_code(code)

Expand Down Expand Up @@ -89,12 +100,15 @@ class KafkaClientParams(TypedDict, total=False):
ssl_crlfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
topic_metadata_refresh_interval_ms: int | None
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
enable_auto_commit: bool
fetch_max_wait_ms: int
group_id: str
session_timeout_ms: int
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] | None
enable_auto_commit: bool | None
fetch_min_bytes: int | None
fetch_max_bytes: int | None
fetch_max_wait_ms: int | None
group_id: str | None
session_timeout_ms: int | None


class _KafkaConfigMixin:
Expand Down Expand Up @@ -142,10 +156,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.crl.location": params.get("ssl_crlfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"topic.metadata.refresh.interval.ms": params.get("topic_metadata_refresh_interval_ms"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"enable.auto.commit": params.get("enable_auto_commit"),
"fetch.min.bytes": params.get("fetch_min_bytes"),
"fetch.max.bytes": params.get("fetch_max_bytes"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"group.id": params.get("group_id"),
"session.timeout.ms": params.get("session_timeout_ms"),
Expand Down
164 changes: 161 additions & 3 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

from __future__ import annotations

from confluent_kafka import Consumer, TopicPartition
from confluent_kafka import Consumer, Message, TopicPartition
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from kafka.errors import KafkaTimeoutError
from kafka.errors import IllegalStateError, KafkaTimeoutError
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Iterable
from typing import Any, Callable, Iterable, TypeVar
from typing_extensions import Unpack

import asyncio
import secrets


Expand All @@ -32,6 +33,7 @@ def __init__(

super().__init__(bootstrap_servers, verify_connection, **params)

self._subscription: frozenset[str] = frozenset()
if topic is not None:
self.subscribe([topic])

Expand Down Expand Up @@ -67,3 +69,159 @@ def get_watermark_offsets(
return result
except KafkaException as exc:
raise_from_kafkaexception(exc)

def commit(
self,
message: Message | None = None,
offsets: list[TopicPartition] | None = None,
_asynchronous: bool = False,
) -> list[TopicPartition] | None:
"""Commit offsets based on a message or offsets (topic partitions).
The `message` and `offsets` parameters are mutually exclusive, `message`
takes precedence.
"""
if message is not None and offsets is not None:
raise ValueError("Parameters message and offsets are mutually exclusive.")

try:
if message is not None:
return super().commit(message=message, asynchronous=False)

return super().commit(offsets=offsets, asynchronous=False)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]:
try:
if timeout is not None:
return super().committed(partitions, timeout)

return super().committed(partitions)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def subscribe( # type: ignore[override]
self,
topics: list[str] | None = None,
patterns: list[str] | None = None,
) -> None:
"""Subscribe to a list of topics and/or topic patterns.
Subscriptions are not incremental.
For `Consumer.subscribe`, Topic patterns must start with "^", eg.
"^this-is-a-regex-[0-9]", thus we prefix all strings in the `patterns`
list with "^".
The `on_assign` and `on_revoke` callbacks are set to keep track of
subscriptions (topics).
More in the confluent-kafka documentation:
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.subscribe
"""
topics = topics or []
patterns = patterns or []
self.log.info("Subscribing to topics %s and patterns %s", topics, patterns)
try:
super().subscribe(
topics + [f"^{pattern}" for pattern in patterns],
on_assign=self._on_assign,
on_revoke=self._on_revoke,
)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
topics = frozenset(partition.topic for partition in partitions)
self._subscription = self._subscription.union(topics)

def _on_revoke(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
topics = frozenset(partition.topic for partition in partitions)
self._subscription = self._subscription.difference(topics)

def subscription(self) -> frozenset[str]:
"""Returns the list of topic names the consumer is subscribed to.
The topic list is maintained by the `_on_assign` and `_on_revoke` callback
methods, which are set in `subscribe`. These callbacks are only called
when `poll` is called.
"""
return self._subscription

def unsubscribe(self) -> None:
try:
super().unsubscribe()
except KafkaException as exc:
raise_from_kafkaexception(exc)

def assign(self, partitions: list[TopicPartition]) -> None:
"""Assign a list of topic partitions to the consumer.
Raises an `IllegalStateError` if `subscribe` has been previously called.
This is to match previous behaviour from `aiokafka`.
"""
if self._subscription:
raise IllegalStateError

try:
super().assign(partitions)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def seek(self, partition: TopicPartition) -> None:
try:
super().seek(partition)
except KafkaException as exc:
raise_from_kafkaexception(exc)


T = TypeVar("T")


class AIOKafkaConsumer:
def __init__(
self,
bootstrap_servers: Iterable[str] | str,
topic: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
**params: Unpack[KafkaClientParams],
) -> None:
self.loop = loop or asyncio.get_running_loop()
self.consumer = KafkaConsumer(bootstrap_servers, topic=topic, **params)

async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
return await self.loop.run_in_executor(None, func, *args)

async def poll(self, timeout: float) -> Message | None:
return await self._run_in_executor(self.consumer.poll, timeout)

async def commit(
self,
message: Message | None = None,
offsets: list[TopicPartition] | None = None,
) -> list[TopicPartition] | None:
return await self._run_in_executor(self.consumer.commit, message, offsets)

async def close(self) -> None:
return await self._run_in_executor(self.consumer.close)

async def subscribe(self, topics: list[str] | None = None, patterns: list[str] | None = None) -> None:
return await self._run_in_executor(self.consumer.subscribe, topics, patterns)

async def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]:
return await self._run_in_executor(self.consumer.committed, partitions, timeout)

def subscription(self) -> frozenset[str]:
return self.consumer.subscription()

async def unsubscribe(self) -> None:
return await self._run_in_executor(self.consumer.unsubscribe)

async def assign(self, partitions: list[TopicPartition]) -> None:
return await self._run_in_executor(self.consumer.assign, partitions)

async def assignment(self) -> list[TopicPartition]:
return await self._run_in_executor(self.consumer.assignment)

async def seek(self, partition: TopicPartition) -> None:
return await self._run_in_executor(self.consumer.seek, partition)
9 changes: 7 additions & 2 deletions karapace/kafka/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

import enum

# A constant that corresponds to the default value of request.timeout.ms in
# the librdkafka C library
# request.timeout.ms default from librdkafka
DEFAULT_REQUEST_TIMEOUT_MS: Final = 30000

# fetch.wait.max.ms default from librdkafka
FETCH_WAIT_MAX_MS: Final = 500

# fetch.max.bytes default from librdkafka
FETCH_MAX_BYTES: Final = 52428800


class Timestamp(enum.IntEnum):
NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE
Expand Down
Loading

0 comments on commit ad83b03

Please sign in to comment.