diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 927af69cb..5b80c8823 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -21,11 +21,11 @@ ) from .poll_timeout import PollTimeout from .topic_configurations import ConfigSource, get_topic_configurations +from aiokafka.errors import KafkaError, TopicAlreadyExistsError from concurrent.futures import Future from confluent_kafka import Message, TopicPartition from enum import Enum from functools import partial -from kafka.errors import KafkaError, TopicAlreadyExistsError from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index d8af4eaba..8e4b108be 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -9,7 +9,7 @@ from . import api from .errors import BackupDataRestorationError, StaleConsumerError from .poll_timeout import PollTimeout -from kafka.errors import BrokerResponseError +from aiokafka.errors import BrokerResponseError from karapace.backup.api import VerifyLevel from karapace.config import Config, read_config from typing import Iterator diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py index 2b64d1187..d6a56060d 100644 --- a/karapace/coordinator/schema_coordinator.py +++ b/karapace/coordinator/schema_coordinator.py @@ -9,7 +9,7 @@ from aiokafka.client import AIOKafkaClient, ConnectionGroup from aiokafka.cluster import ClusterMetadata from aiokafka.consumer.group_coordinator import CoordinationType -from aiokafka.protocol.api import Request +from aiokafka.protocol.api import Request, Response from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest from aiokafka.protocol.group import ( HeartbeatRequest, @@ -25,7 +25,6 @@ SyncGroupRequest_v3, ) from aiokafka.util import create_future, create_task -from kafka.protocol.api import Response from karapace.dataclasses import default_dataclass from karapace.typing import JsonData from karapace.utils import json_decode, json_encode diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index 54dcb3665..5df9838a2 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -5,17 +5,11 @@ from __future__ import annotations +from aiokafka.client import UnknownTopicOrPartitionError +from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException -from kafka.errors import ( - AuthenticationFailedError, - for_code, - IllegalStateError, - KafkaTimeoutError, - NoBrokersAvailable, - UnknownTopicOrPartitionError, -) from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -40,7 +34,7 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: """Translate a `KafkaError` from `confluent_kafka` to a friendlier exception. `kafka.errors.for_code` is used to translate the original exception's error code - to a domain specific error class from `kafka-python`. + to a domain specific error class from `aiokafka`. In some cases `KafkaError`s are created with error codes internal to `confluent_kafka`, such as various internal error codes for unknown topics or partitions: @@ -67,7 +61,7 @@ def raise_from_kafkaexception(exc: KafkaException) -> NoReturn: The `confluent_kafka` library's `KafkaException` is a wrapper around its internal `KafkaError`. The resulting, raised exception however is coming from - `kafka-python`, due to these exceptions having human-readable names, providing + `aiokafka`, due to these exceptions having human-readable names, providing better context for error handling. """ raise translate_from_kafkaerror(exc.args[0]) from exc diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py index 762295d3d..98e92c5f7 100644 --- a/karapace/kafka/consumer.py +++ b/karapace/kafka/consumer.py @@ -5,10 +5,10 @@ from __future__ import annotations +from aiokafka.errors import IllegalStateError, KafkaTimeoutError from confluent_kafka import Consumer, Message, TopicPartition from confluent_kafka.admin import PartitionMetadata from confluent_kafka.error import KafkaException -from kafka.errors import IllegalStateError, KafkaTimeoutError from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception from typing import Any, Callable, Iterable, TypeVar from typing_extensions import Unpack diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index a150778ee..10675fb23 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -1,11 +1,12 @@ +""" +karapace - Rest Proxy API + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" from __future__ import annotations -from binascii import Error as B64DecodeError -from collections import namedtuple -from confluent_kafka.error import KafkaException -from contextlib import AsyncExitStack -from http import HTTPStatus -from kafka.errors import ( +from aiokafka.errors import ( AuthenticationFailedError, BrokerResponseError, KafkaTimeoutError, @@ -13,6 +14,11 @@ TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) +from binascii import Error as B64DecodeError +from collections import namedtuple +from confluent_kafka.error import KafkaException +from contextlib import AsyncExitStack +from http import HTTPStatus from karapace.config import Config from karapace.errors import InvalidSchema from karapace.kafka.admin import KafkaAdminClient diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 6b27c274c..8175414a5 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -2,12 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from asyncio import Lock -from collections import defaultdict, namedtuple -from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition -from functools import partial -from http import HTTPStatus -from kafka.errors import ( +from aiokafka.errors import ( GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, @@ -15,6 +10,11 @@ TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) +from asyncio import Lock +from collections import defaultdict, namedtuple +from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition +from functools import partial +from http import HTTPStatus from karapace.config import Config from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import AsyncKafkaConsumer diff --git a/karapace/messaging.py b/karapace/messaging.py index 36d07c638..bfdd33665 100644 --- a/karapace/messaging.py +++ b/karapace/messaging.py @@ -4,7 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka.errors import MessageSizeTooLargeError +from aiokafka.errors import MessageSizeTooLargeError from karapace.config import Config from karapace.errors import SchemaTooLargeException from karapace.kafka.producer import KafkaProducer diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9bcd69260..e1ad5d091 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -6,12 +6,7 @@ """ from __future__ import annotations -from avro.schema import Schema as AvroSchema -from confluent_kafka import Message, TopicPartition -from contextlib import closing, ExitStack -from enum import Enum -from jsonschema.validators import Draft7Validator -from kafka.errors import ( +from aiokafka.errors import ( GroupAuthorizationFailedError, InvalidReplicationFactorError, KafkaConfigurationError, @@ -24,6 +19,11 @@ TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) +from avro.schema import Schema as AvroSchema +from confluent_kafka import Message, TopicPartition +from contextlib import closing, ExitStack +from enum import Enum +from jsonschema.validators import Draft7Validator from karapace import constants from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator diff --git a/karapace/utils.py b/karapace/utils.py index 39a1ae2ec..d566b0ef1 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -14,13 +14,11 @@ from datetime import datetime, timedelta, timezone from decimal import Decimal from http import HTTPStatus -from kafka.client_async import BrokerConnection, KafkaClient from pathlib import Path from types import MappingProxyType from typing import AnyStr, cast, IO, Literal, NoReturn, overload, TypeVar import importlib -import kafka.client_async import logging import time @@ -214,51 +212,6 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None: ) -class KarapaceKafkaClient(KafkaClient): - def __init__(self, **configs): - kafka.client_async.BrokerConnection = KarapaceBrokerConnection - super().__init__(**configs) - - def close_invalid_connections(self): - update_needed = False - with self._lock: - conns = self._conns.copy().values() - for conn in conns: - if conn and conn.ns_blackout(): - LOG.info( - "Node id %s no longer in cluster metadata, closing connection and requesting update", conn.node_id - ) - self.close(conn.node_id) - update_needed = True - if update_needed: - self.cluster.request_update() - - def _poll(self, timeout): - super()._poll(timeout) - try: - self.close_invalid_connections() - except Exception as e: # pylint: disable=broad-except - LOG.error("Error closing invalid connections: %r", e) - - -class KarapaceBrokerConnection(BrokerConnection): - def __init__(self, host, port, afi, **configs): - super().__init__(host, port, afi, **configs) - self.error = None - self.fail_time = time.monotonic() - - def close(self, error=None): - self.error = error - self.fail_time = time.monotonic() - super().close(error) - - def ns_blackout(self): - return "DNS failure" in str(self.error) and self.fail_time + NS_BLACKOUT_DURATION_SECONDS > time.monotonic() - - def blacked_out(self): - return self.ns_blackout() or super().blacked_out() - - class DebugAccessLogger(AccessLogger): """ Logs access logs as DEBUG instead of INFO. diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 02ba63f84..4d06a553b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -119,8 +119,6 @@ jsonschema-specifications==2023.12.1 # via # -r requirements.txt # jsonschema -kafka-python @ https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz - # via -r requirements.txt locust==2.25.0 # via -r requirements-dev.in lz4==4.3.3 diff --git a/requirements/requirements.in b/requirements/requirements.in index d2ef94613..80d8f29ea 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -28,5 +28,4 @@ zstandard # - The contents of the file change, which invalidates the existing docker # images and forces a new image generation. # -https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c76ca2.tar.gz#subdirectory=lang/py diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e74b9e1e7..15f2e2e77 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -51,8 +51,6 @@ jsonschema==4.22.0 # via -r requirements.in jsonschema-specifications==2023.12.1 # via jsonschema -kafka-python @ https://github.com/aiven/kafka-python/archive/19ff1f4b28e33318b0cd2d916b8399170055b1ca.tar.gz - # via -r requirements.in lz4==4.3.3 # via -r requirements.in markdown-it-py==3.0.0 diff --git a/setup.py b/setup.py index 2e99bc275..3b50270d1 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,6 @@ "aiokafka", "avro", "jsonschema", - "kafka-python", "networkx", "protobuf", "pyjwt", diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 12a400dde..08076dbde 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,7 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka.errors import InvalidTopicError +from aiokafka.errors import InvalidTopicError from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.backup.errors import StaleConsumerError diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 535e6492c..59b788541 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,10 +4,10 @@ """ from __future__ import annotations +from aiokafka.errors import UnknownTopicOrPartitionError from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic from dataclasses import fields -from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, BackupVersion, TopicName from karapace.backup.backends.v3.errors import InconsistentOffset diff --git a/tests/integration/kafka/test_admin.py b/tests/integration/kafka/test_admin.py index 84d8a3e7f..fb523b2f9 100644 --- a/tests/integration/kafka/test_admin.py +++ b/tests/integration/kafka/test_admin.py @@ -5,8 +5,8 @@ from __future__ import annotations +from aiokafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError from confluent_kafka.admin import ConfigSource, NewTopic -from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer from tests.utils import new_topic as create_new_topic diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py index bc85fda65..70ec5f057 100644 --- a/tests/integration/kafka/test_consumer.py +++ b/tests/integration/kafka/test_consumer.py @@ -4,10 +4,10 @@ """ from __future__ import annotations +from aiokafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition from confluent_kafka.admin import NewTopic from confluent_kafka.error import KafkaError -from kafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 04c27f108..f8d3787b0 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -5,8 +5,8 @@ from __future__ import annotations +from aiokafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError from confluent_kafka.admin import NewTopic -from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer from karapace.kafka.types import Timestamp diff --git a/tests/integration/utils/kafka_server.py b/tests/integration/utils/kafka_server.py index 8f14f22fd..fb6fff7e4 100644 --- a/tests/integration/utils/kafka_server.py +++ b/tests/integration/utils/kafka_server.py @@ -2,8 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from aiokafka.errors import AuthenticationFailedError, NoBrokersAvailable from dataclasses import dataclass -from kafka.errors import AuthenticationFailedError, NoBrokersAvailable from karapace.kafka.admin import KafkaAdminClient from karapace.utils import Expiration from pathlib import Path diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index 820287cc3..983beb786 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from kafka.errors import KafkaError, TopicAlreadyExistsError +from aiokafka.errors import KafkaError, TopicAlreadyExistsError from karapace import config from karapace.backup.api import ( _admin, diff --git a/tests/unit/test_features.py b/tests/unit/test_features.py index 54e96c2e9..bb2b708c2 100644 --- a/tests/unit/test_features.py +++ b/tests/unit/test_features.py @@ -2,12 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -import kafka.codec +import aiokafka.codec # Test that the setup has all compression algorithms supported def test_setup_features() -> None: - assert kafka.codec.has_gzip() - assert kafka.codec.has_lz4() - assert kafka.codec.has_snappy() - assert kafka.codec.has_zstd() + assert aiokafka.codec.has_gzip() + assert aiokafka.codec.has_lz4() + assert aiokafka.codec.has_snappy() + assert aiokafka.codec.has_zstd() diff --git a/tests/utils.py b/tests/utils.py index 7b4538440..f38097858 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,7 +3,7 @@ See LICENSE for details """ from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError -from kafka.errors import TopicAlreadyExistsError +from aiokafka.errors import TopicAlreadyExistsError from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin