Skip to content

Commit

Permalink
Merge pull request #891 from Aiven-Open/jjaakola-aiven-remove-kafka-p…
Browse files Browse the repository at this point in the history
…ython-dependency

chore: remove kafka-python dependency
  • Loading branch information
keejon authored Jun 5, 2024
2 parents 2cfe49c + 6256c31 commit 15698cf
Show file tree
Hide file tree
Showing 23 changed files with 46 additions and 100 deletions.
2 changes: 1 addition & 1 deletion karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
)
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from aiokafka.errors import KafkaError, TopicAlreadyExistsError
from concurrent.futures import Future
from confluent_kafka import Message, TopicPartition
from enum import Enum
from functools import partial
from kafka.errors import KafkaError, TopicAlreadyExistsError
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import api
from .errors import BackupDataRestorationError, StaleConsumerError
from .poll_timeout import PollTimeout
from kafka.errors import BrokerResponseError
from aiokafka.errors import BrokerResponseError
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from typing import Iterator
Expand Down
3 changes: 1 addition & 2 deletions karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from aiokafka.client import AIOKafkaClient, ConnectionGroup
from aiokafka.cluster import ClusterMetadata
from aiokafka.consumer.group_coordinator import CoordinationType
from aiokafka.protocol.api import Request
from aiokafka.protocol.api import Request, Response
from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest
from aiokafka.protocol.group import (
HeartbeatRequest,
Expand All @@ -25,7 +25,6 @@
SyncGroupRequest_v3,
)
from aiokafka.util import create_future, create_task
from kafka.protocol.api import Response
from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData
from karapace.utils import json_decode, json_encode
Expand Down
14 changes: 4 additions & 10 deletions karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@

from __future__ import annotations

from aiokafka.client import UnknownTopicOrPartitionError
from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable
from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import (
AuthenticationFailedError,
for_code,
IllegalStateError,
KafkaTimeoutError,
NoBrokersAvailable,
UnknownTopicOrPartitionError,
)
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

Expand All @@ -40,7 +34,7 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception:
"""Translate a `KafkaError` from `confluent_kafka` to a friendlier exception.
`kafka.errors.for_code` is used to translate the original exception's error code
to a domain specific error class from `kafka-python`.
to a domain specific error class from `aiokafka`.
In some cases `KafkaError`s are created with error codes internal to `confluent_kafka`,
such as various internal error codes for unknown topics or partitions:
Expand All @@ -67,7 +61,7 @@ def raise_from_kafkaexception(exc: KafkaException) -> NoReturn:
The `confluent_kafka` library's `KafkaException` is a wrapper around its internal
`KafkaError`. The resulting, raised exception however is coming from
`kafka-python`, due to these exceptions having human-readable names, providing
`aiokafka`, due to these exceptions having human-readable names, providing
better context for error handling.
"""
raise translate_from_kafkaerror(exc.args[0]) from exc
Expand Down
2 changes: 1 addition & 1 deletion karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from __future__ import annotations

from aiokafka.errors import IllegalStateError, KafkaTimeoutError
from confluent_kafka import Consumer, Message, TopicPartition
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from kafka.errors import IllegalStateError, KafkaTimeoutError
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Any, Callable, Iterable, TypeVar
from typing_extensions import Unpack
Expand Down
18 changes: 12 additions & 6 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
"""
karapace - Rest Proxy API
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from binascii import Error as B64DecodeError
from collections import namedtuple
from confluent_kafka.error import KafkaException
from contextlib import AsyncExitStack
from http import HTTPStatus
from kafka.errors import (
from aiokafka.errors import (
AuthenticationFailedError,
BrokerResponseError,
KafkaTimeoutError,
NoBrokersAvailable,
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from binascii import Error as B64DecodeError
from collections import namedtuple
from confluent_kafka.error import KafkaException
from contextlib import AsyncExitStack
from http import HTTPStatus
from karapace.config import Config
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient
Expand Down
12 changes: 6 additions & 6 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from asyncio import Lock
from collections import defaultdict, namedtuple
from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition
from functools import partial
from http import HTTPStatus
from kafka.errors import (
from aiokafka.errors import (
GroupAuthorizationFailedError,
IllegalStateError,
KafkaConfigurationError,
KafkaError,
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from asyncio import Lock
from collections import defaultdict, namedtuple
from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition
from functools import partial
from http import HTTPStatus
from karapace.config import Config
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import AsyncKafkaConsumer
Expand Down
2 changes: 1 addition & 1 deletion karapace/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.errors import MessageSizeTooLargeError
from aiokafka.errors import MessageSizeTooLargeError
from karapace.config import Config
from karapace.errors import SchemaTooLargeException
from karapace.kafka.producer import KafkaProducer
Expand Down
12 changes: 6 additions & 6 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
"""
from __future__ import annotations

from avro.schema import Schema as AvroSchema
from confluent_kafka import Message, TopicPartition
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka.errors import (
from aiokafka.errors import (
GroupAuthorizationFailedError,
InvalidReplicationFactorError,
KafkaConfigurationError,
Expand All @@ -24,6 +19,11 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from avro.schema import Schema as AvroSchema
from confluent_kafka import Message, TopicPartition
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from karapace import constants
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
Expand Down
47 changes: 0 additions & 47 deletions karapace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from http import HTTPStatus
from kafka.client_async import BrokerConnection, KafkaClient
from pathlib import Path
from types import MappingProxyType
from typing import AnyStr, cast, IO, Literal, NoReturn, overload, TypeVar

import importlib
import kafka.client_async
import logging
import time

Expand Down Expand Up @@ -214,51 +212,6 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None:
)


class KarapaceKafkaClient(KafkaClient):
def __init__(self, **configs):
kafka.client_async.BrokerConnection = KarapaceBrokerConnection
super().__init__(**configs)

def close_invalid_connections(self):
update_needed = False
with self._lock:
conns = self._conns.copy().values()
for conn in conns:
if conn and conn.ns_blackout():
LOG.info(
"Node id %s no longer in cluster metadata, closing connection and requesting update", conn.node_id
)
self.close(conn.node_id)
update_needed = True
if update_needed:
self.cluster.request_update()

def _poll(self, timeout):
super()._poll(timeout)
try:
self.close_invalid_connections()
except Exception as e: # pylint: disable=broad-except
LOG.error("Error closing invalid connections: %r", e)


class KarapaceBrokerConnection(BrokerConnection):
def __init__(self, host, port, afi, **configs):
super().__init__(host, port, afi, **configs)
self.error = None
self.fail_time = time.monotonic()

def close(self, error=None):
self.error = error
self.fail_time = time.monotonic()
super().close(error)

def ns_blackout(self):
return "DNS failure" in str(self.error) and self.fail_time + NS_BLACKOUT_DURATION_SECONDS > time.monotonic()

def blacked_out(self):
return self.ns_blackout() or super().blacked_out()


class DebugAccessLogger(AccessLogger):
"""
Logs access logs as DEBUG instead of INFO.
Expand Down
2 changes: 0 additions & 2 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ jsonschema-specifications==2023.12.1
# via
# -r requirements.txt
# jsonschema
kafka-python @ https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz
# via -r requirements.txt
locust==2.25.0
# via -r requirements-dev.in
lz4==4.3.3
Expand Down
1 change: 0 additions & 1 deletion requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ zstandard
# - The contents of the file change, which invalidates the existing docker
# images and forces a new image generation.
#
https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz
https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c76ca2.tar.gz#subdirectory=lang/py
2 changes: 0 additions & 2 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ jsonschema==4.22.0
# via -r requirements.in
jsonschema-specifications==2023.12.1
# via jsonschema
kafka-python @ https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz
# via -r requirements.in
lz4==4.3.3
# via -r requirements.in
markdown-it-py==3.0.0
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"aiokafka",
"avro",
"jsonschema",
"kafka-python",
"networkx",
"protobuf",
"pyjwt",
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/backup/test_legacy_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.errors import InvalidTopicError
from aiokafka.errors import InvalidTopicError
from karapace.backup import api
from karapace.backup.api import BackupVersion
from karapace.backup.errors import StaleConsumerError
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"""
from __future__ import annotations

from aiokafka.errors import UnknownTopicOrPartitionError
from confluent_kafka import Message, TopicPartition
from confluent_kafka.admin import NewTopic
from dataclasses import fields
from kafka.errors import UnknownTopicOrPartitionError
from karapace.backup import api
from karapace.backup.api import _consume_records, BackupVersion, TopicName
from karapace.backup.backends.v3.errors import InconsistentOffset
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/kafka/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from __future__ import annotations

from aiokafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError
from confluent_kafka.admin import ConfigSource, NewTopic
from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from tests.utils import new_topic as create_new_topic
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"""
from __future__ import annotations

from aiokafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError
from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition
from confluent_kafka.admin import NewTopic
from confluent_kafka.error import KafkaError
from kafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer
from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/kafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from __future__ import annotations

from aiokafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError
from confluent_kafka.admin import NewTopic
from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError
from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer
from karapace.kafka.types import Timestamp

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/utils/kafka_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from aiokafka.errors import AuthenticationFailedError, NoBrokersAvailable
from dataclasses import dataclass
from kafka.errors import AuthenticationFailedError, NoBrokersAvailable
from karapace.kafka.admin import KafkaAdminClient
from karapace.utils import Expiration
from pathlib import Path
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/backup/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from kafka.errors import KafkaError, TopicAlreadyExistsError
from aiokafka.errors import KafkaError, TopicAlreadyExistsError
from karapace import config
from karapace.backup.api import (
_admin,
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
import kafka.codec
import aiokafka.codec


# Test that the setup has all compression algorithms supported
def test_setup_features() -> None:
assert kafka.codec.has_gzip()
assert kafka.codec.has_lz4()
assert kafka.codec.has_snappy()
assert kafka.codec.has_zstd()
assert aiokafka.codec.has_gzip()
assert aiokafka.codec.has_lz4()
assert aiokafka.codec.has_snappy()
assert aiokafka.codec.has_zstd()
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
See LICENSE for details
"""
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError
from kafka.errors import TopicAlreadyExistsError
from aiokafka.errors import TopicAlreadyExistsError
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.protobuf.kotlin_wrapper import trim_margin
Expand Down

0 comments on commit 15698cf

Please sign in to comment.