Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use confluent-kafka-python for Kafka admin client instead of kafka-python #757

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from enum import Enum
from functools import partial
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata, TopicPartition
Expand All @@ -33,6 +32,7 @@
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka_admin import KafkaAdminClient
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
Expand Down Expand Up @@ -186,7 +186,7 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa


@contextlib.contextmanager
def _admin(config: Config) -> KafkaAdminClient:
def _admin(config: Config) -> Iterator[KafkaAdminClient]:
"""Creates an automatically closing Kafka admin client.

:param config: for the client.
Expand All @@ -201,10 +201,7 @@ def _admin(config: Config) -> KafkaAdminClient:
retry=retry_if_exception_type(KafkaError),
)(kafka_admin_from_config)(config)

try:
yield admin
finally:
admin.close()
yield admin


@retry(
Expand All @@ -222,26 +219,24 @@ def _maybe_create_topic(
topic_configs: Mapping[str, str],
) -> bool:
"""Returns True if topic creation was successful, False if topic already exists"""
topic = NewTopic(
name=name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
topic_configs=topic_configs,
)

with _admin(config) as admin:
try:
admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
admin.new_topic(
name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
config=dict(topic_configs),
)
except TopicAlreadyExistsError:
LOG.debug("Topic %r already exists", topic.name)
LOG.debug("Topic %r already exists", name)
return False

LOG.info(
"Created topic %r (partition count: %s, replication factor: %s, config: %s)",
topic.name,
topic.num_partitions,
topic.replication_factor,
topic.topic_configs,
name,
constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor,
topic_configs,
)
return True

Expand Down Expand Up @@ -520,7 +515,7 @@ def create_backup(
topic_configurations = get_topic_configurations(
admin=admin,
topic_name=topic_name,
config_source_filter={ConfigSource.TOPIC_CONFIG},
config_source_filter={ConfigSource.DYNAMIC_TOPIC_CONFIG},
)

# Note: It's expected that we at some point want to introduce handling of
Expand Down
43 changes: 10 additions & 33 deletions karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from enum import Enum
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
from kafka.errors import for_code
from kafka.protocol.admin import DescribeConfigsRequest
from typing import Container, Dict, Final
from __future__ import annotations

from karapace.kafka_admin import ConfigSource, KafkaAdminClient
from typing import Container, Final

class ConfigSource(int, Enum):
UNKNOWN = 0
TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
DYNAMIC_BROKER_LOGGER_CONFIG = 6


ALL_CONFIG_SOURCES: Final = {item.value for item in ConfigSource.__members__.values()}
ALL_CONFIG_SOURCES: Final = ConfigSource


DEFAULT_CONFIGS: Final = [
Expand All @@ -35,7 +22,7 @@ def get_topic_configurations(
admin: KafkaAdminClient,
topic_name: str,
config_source_filter: Container[ConfigSource] = (),
) -> Dict[str, str]:
) -> dict[str, str]:
"""Get configurations of the specified topic. The following configurations will be retrieved by default:
- `cleanup.policy`
- `min.insync.replicas`
Expand All @@ -47,18 +34,8 @@ def get_topic_configurations(
:param config_source_filter: returns all the configurations that match the sources specified,
plus the default configurations. If empty, returns only the default configurations.
"""
if admin._matching_api_version(DescribeConfigsRequest) == 0: # pylint: disable=protected-access
raise NotImplementedError("Broker version is not supported")
req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic_name)]
cfgs = admin.describe_configs(req_cfgs)
assert len(cfgs) == 1
assert len(cfgs[0].resources) == 1
err, _, _, _, config_values = cfgs[0].resources[0]
if err != 0:
raise for_code(err)
topic_config = {}
for cv in config_values:
name, val, _, config_source, _, _ = cv
if name in DEFAULT_CONFIGS or (config_source in config_source_filter):
topic_config[name] = val
return topic_config
return admin.get_topic_config(
topic_name,
config_name_filter=DEFAULT_CONFIGS,
config_source_filter=config_source_filter,
)
2 changes: 1 addition & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_MS: Final = 20000
TOPIC_CREATION_TIMEOUT_S: Final = 20
matyaskuti marked this conversation as resolved.
Show resolved Hide resolved
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
DEFAULT_AIOHTTP_CLIENT_MAX_SIZE: Final = 1048576
Expand Down
Loading