From f10aa3ddbd7dd1fe7d279ea54228419accee87a8 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Thu, 19 Dec 2024 11:57:48 +0100 Subject: [PATCH] di: do not wire core karapace --- src/karapace/coordinator/master_coordinator.py | 7 ++----- src/karapace/kafka/admin.py | 11 +++++++++++ src/karapace/kafka/common.py | 6 ------ src/karapace/offset_watcher.py | 8 +++----- src/schema_registry/__main__.py | 8 -------- src/{karapace => schema_registry}/messaging.py | 0 src/schema_registry/reader.py | 6 +----- src/schema_registry/registry.py | 4 +++- tests/conftest.py | 8 -------- tests/unit/schema_registry/telemetry/test_tracer.py | 5 ++++- tests/unit/schema_registry/test_controller.py | 4 ++++ 11 files changed, 28 insertions(+), 39 deletions(-) rename src/{karapace => schema_registry}/messaging.py (100%) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index a06435d86..9873a9bec 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -10,12 +10,10 @@ from aiokafka.errors import KafkaConnectionError from aiokafka.helpers import create_ssl_context from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest -from dependency_injector.wiring import inject, Provide from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import SchemaReaderStoppper -from schema_registry.telemetry.container import TelemetryContainer from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -40,8 +38,7 @@ class MasterCoordinator: 5 milliseconds. """ - @inject - def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: + def __init__(self, config: Config) -> None: super().__init__() self._config: Final = config self._kafka_client: AIOKafkaClient | None = None @@ -50,7 +47,7 @@ def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.t self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None self._schema_reader_stopper: SchemaReaderStoppper | None = None - self.tracer = tracer + self.tracer = Tracer() def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: self._schema_reader_stopper = schema_reader_stopper diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index a058bedbc..165dab1a1 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -23,13 +23,24 @@ from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, + KafkaClientParams, raise_from_kafkaexception, single_futmap_result, UnknownTopicOrPartitionError, ) +from schema_registry.telemetry.tracer import Tracer +from typing_extensions import Unpack class KafkaAdminClient(_KafkaConfigMixin, AdminClient): + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + **params: Unpack[KafkaClientParams], + ) -> None: + self.tracer = Tracer() + super().__init__(bootstrap_servers, **params) + def new_topic( self, name: str, diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index 972115667..c541cbac3 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -17,9 +17,6 @@ from collections.abc import Callable, Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException -from dependency_injector.wiring import inject, Provide -from schema_registry.telemetry.container import TelemetryContainer -from schema_registry.telemetry.tracer import Tracer from typing import Any, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -125,17 +122,14 @@ class _KafkaConfigMixin: extract configuration, initialization and connection verification. """ - @inject def __init__( self, bootstrap_servers: Iterable[str] | str, verify_connection: bool = True, - tracer: Tracer = Provide[TelemetryContainer.tracer], **params: Unpack[KafkaClientParams], ) -> None: self._errors: set[KafkaError] = set() self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}") - self.tracer = tracer super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg] self._activate_callbacks() diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 83bd32bbb..3baad5f11 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -4,8 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from dependency_injector.wiring import inject, Provide -from schema_registry.telemetry.container import TelemetryContainer + from schema_registry.telemetry.tracer import Tracer from threading import Condition @@ -17,13 +16,12 @@ class OffsetWatcher: correct as long as no unclean leader election is performed. """ - @inject - def __init__(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: + def __init__(self) -> None: # Condition used to protected _greatest_offset, any modifications to that object must # be performed with this condition acquired self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. - self.tracer = tracer + self.tracer = Tracer() def greatest_offset(self) -> int: with self.tracer.get_tracer().start_as_current_span( diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 4a92d9649..f8b5684c6 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -7,12 +7,8 @@ from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan from schema_registry.telemetry.container import TelemetryContainer -import karapace.coordinator.master_coordinator -import karapace.kafka.common -import karapace.offset_watcher import schema_registry.controller import schema_registry.factory -import schema_registry.reader import schema_registry.routers.compatibility import schema_registry.routers.config import schema_registry.routers.health @@ -42,10 +38,6 @@ modules=[ schema_registry.telemetry.setup, schema_registry.telemetry.middleware, - schema_registry.reader, - karapace.offset_watcher, - karapace.coordinator.master_coordinator, - karapace.kafka.common, ] ) diff --git a/src/karapace/messaging.py b/src/schema_registry/messaging.py similarity index 100% rename from src/karapace/messaging.py rename to src/schema_registry/messaging.py diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index d0d9e7a5e..5ddcfa100 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -24,7 +24,6 @@ from collections.abc import Mapping, Sequence from confluent_kafka import Message, TopicCollection, TopicPartition from contextlib import closing, ExitStack -from dependency_injector.wiring import inject, Provide from enum import Enum from jsonschema.validators import Draft7Validator from karapace import constants @@ -46,7 +45,6 @@ from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown -from schema_registry.telemetry.container import TelemetryContainer from schema_registry.telemetry.tracer import Tracer from threading import Event, Lock, Thread from typing import Final @@ -133,7 +131,6 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: class KafkaSchemaReader(Thread, SchemaReaderStoppper): - @inject def __init__( self, config: Config, @@ -141,7 +138,6 @@ def __init__( key_formatter: KeyFormatter, database: KarapaceDatabase, master_coordinator: MasterCoordinator | None = None, - tracer: Tracer = Provide[TelemetryContainer.tracer], ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator @@ -156,7 +152,7 @@ def __init__( self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) - self.tracer = tracer + self.tracer = Tracer() # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/src/schema_registry/registry.py b/src/schema_registry/registry.py index 9b694f768..7f1ccb0c8 100644 --- a/src/schema_registry/registry.py +++ b/src/schema_registry/registry.py @@ -26,12 +26,13 @@ ) from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter -from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version +from schema_registry.messaging import KarapaceProducer from schema_registry.reader import KafkaSchemaReader +from schema_registry.telemetry.tracer import Tracer import asyncio import logging @@ -43,6 +44,7 @@ class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: # TODO: compatibility was previously in mutable dict, fix the runtime config to be distinct from static config. self.config = config + self.tracer = Tracer() self._key_formatter = KeyFormatter() offset_watcher = OffsetWatcher() diff --git a/tests/conftest.py b/tests/conftest.py index f7724179a..5b65c9405 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,14 +10,10 @@ from tempfile import mkstemp import json -import karapace.coordinator.master_coordinator -import karapace.kafka.common -import karapace.offset_watcher import os import pytest import re import schema_registry.controller -import schema_registry.reader import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -208,12 +204,8 @@ def fixture_telemetry_container() -> TelemetryContainer: telemetry_container = TelemetryContainer() telemetry_container.wire( modules=[ - schema_registry.reader, schema_registry.telemetry.setup, schema_registry.telemetry.middleware, - karapace.offset_watcher, - karapace.coordinator.master_coordinator, - karapace.kafka.common, ] ) return telemetry_container diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index b5860657e..f1edabde2 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -50,11 +50,14 @@ def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceConta def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} + ) with ( patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter, patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor, ): - processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config()) + processor: SpanProcessor = Tracer.get_span_processor(config=config) mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) assert processor is mock_simple_span_processor.return_value diff --git a/tests/unit/schema_registry/test_controller.py b/tests/unit/schema_registry/test_controller.py index 0b1ccbf89..4c4b0b16d 100644 --- a/tests/unit/schema_registry/test_controller.py +++ b/tests/unit/schema_registry/test_controller.py @@ -77,3 +77,7 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC user=None, authorizer=None, ) + with pytest.raises(HTTPResponse): + # prevent `future exception was never retrieved` warning logs + # future: + await mock_forward_func_future