From cf068e5c3d4541718c739ae6d208831b594da540 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 16:17:00 +0100 Subject: [PATCH 1/9] chore: move more files to schema_registry - we move the schema reader and registry - we rename the apis to controller and updates to the DI wiring --- mypy.ini | 2 +- src/karapace/container.py | 3 --- ...{schema_registry_apis.py => controller.py} | 2 +- src/schema_registry/factory.py | 10 ++++----- .../reader.py} | 11 +++++++--- .../registry.py} | 2 +- src/schema_registry/routers/compatibility.py | 2 +- src/schema_registry/routers/config.py | 10 ++++----- .../routers/master_availability.py | 4 ++-- src/schema_registry/routers/mode.py | 2 +- src/schema_registry/routers/schemas.py | 2 +- src/schema_registry/routers/subjects.py | 8 +++---- tests/e2e/schema_registry/test_jsonschema.py | 2 +- tests/integration/test_schema.py | 2 +- tests/integration/test_schema_reader.py | 2 +- tests/unit/schema_registry/__init__.py | 4 ++++ .../test_controller.py} | 5 ++--- .../test_reader.py} | 22 +++++++++---------- tests/unit/test_in_memory_database.py | 2 +- 19 files changed, 51 insertions(+), 46 deletions(-) rename src/schema_registry/{schema_registry_apis.py => controller.py} (99%) rename src/{karapace/schema_reader.py => schema_registry/reader.py} (98%) rename src/{karapace/schema_registry.py => schema_registry/registry.py} (99%) create mode 100644 tests/unit/schema_registry/__init__.py rename tests/unit/{test_schema_registry_api.py => schema_registry/test_controller.py} (92%) rename tests/unit/{test_schema_reader.py => schema_registry/test_reader.py} (98%) diff --git a/mypy.ini b/mypy.ini index 5f94f8bc2..73b0e34c5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,7 +15,7 @@ warn_no_return = True warn_unreachable = True strict_equality = True -[mypy-schema_registry.schema_registry_apis] +[mypy-schema_registry.controller] ignore_errors = True [mypy-karapace.compatibility.jsonschema.checks] diff --git a/src/karapace/container.py b/src/karapace/container.py index 951956bf2..64dc6e285 100644 --- a/src/karapace/container.py +++ b/src/karapace/container.py @@ -8,7 +8,6 @@ from karapace.config import Config from karapace.forward_client import ForwardClient from karapace.instrumentation.prometheus import PrometheusInstrumentation -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient @@ -21,8 +20,6 @@ class KarapaceContainer(containers.DeclarativeContainer): http_authorizer = providers.Singleton(HTTPAuthorizer, auth_file=config().registry_authfile) - schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=config) - forward_client = providers.Singleton(ForwardClient) authorizer = providers.Factory( diff --git a/src/schema_registry/schema_registry_apis.py b/src/schema_registry/controller.py similarity index 99% rename from src/schema_registry/schema_registry_apis.py rename to src/schema_registry/controller.py index 56cd567b1..94c7d6540 100644 --- a/src/schema_registry/schema_registry_apis.py +++ b/src/schema_registry/controller.py @@ -33,10 +33,10 @@ from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.utils import JSONDecodeError +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, SchemaErrorCodes, SchemaErrorMessages from schema_registry.routers.requests import ( CompatibilityCheckResponse, diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py index 4ef678841..b02c131c6 100644 --- a/src/schema_registry/factory.py +++ b/src/schema_registry/factory.py @@ -11,13 +11,13 @@ from karapace.config import Config from karapace.forward_client import ForwardClient from karapace.logging_setup import configure_logging, log_config_without_secrets -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from schema_registry.container import SchemaRegistryContainer from schema_registry.http_handlers import setup_exception_handlers from schema_registry.middlewares import setup_middlewares +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.setup import setup_routers +from schema_registry.telemetry.setup import setup_tracing from typing import AsyncContextManager import logging @@ -29,7 +29,7 @@ async def karapace_schema_registry_lifespan( _: FastAPI, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), ) -> AsyncGenerator[None, None]: try: @@ -57,10 +57,10 @@ def create_karapace_application( logging.info("Starting Karapace Schema Registry (%s)", karapace_version.__version__) app = FastAPI(lifespan=lifespan) # type: ignore[arg-type] + + setup_tracing() setup_routers(app=app) setup_exception_handlers(app=app) setup_middlewares(app=app) - FastAPIInstrumentor.instrument_app(app) - return app diff --git a/src/karapace/schema_reader.py b/src/schema_registry/reader.py similarity index 98% rename from src/karapace/schema_reader.py rename to src/schema_registry/reader.py index bb3bea067..a0e4f68b3 100644 --- a/src/karapace/schema_reader.py +++ b/src/schema_registry/reader.py @@ -24,6 +24,7 @@ from collections.abc import Mapping, Sequence from confluent_kafka import Message, TopicCollection, TopicPartition from contextlib import closing, ExitStack +from dependency_injector.wiring import inject, Provide from enum import Enum from jsonschema.validators import Draft7Validator from karapace import constants @@ -45,6 +46,8 @@ from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from threading import Event, Lock, Thread from typing import Final @@ -369,9 +372,11 @@ def _is_ready(self) -> bool: def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) - def ready(self) -> bool: - with self._ready_lock: - return self._ready + @inject + def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.ready)): + with self._ready_lock: + return self._ready def set_not_ready(self) -> None: with self._ready_lock: diff --git a/src/karapace/schema_registry.py b/src/schema_registry/registry.py similarity index 99% rename from src/karapace/schema_registry.py rename to src/schema_registry/registry.py index a93bfa0ce..9b694f768 100644 --- a/src/karapace/schema_registry.py +++ b/src/schema_registry/registry.py @@ -29,9 +29,9 @@ from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner -from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version +from schema_registry.reader import KafkaSchemaReader import asyncio import logging diff --git a/src/schema_registry/routers/compatibility.py b/src/schema_registry/routers/compatibility.py index 0e91e3625..109df4e95 100644 --- a/src/schema_registry/routers/compatibility.py +++ b/src/schema_registry/routers/compatibility.py @@ -8,9 +8,9 @@ from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/config.py b/src/schema_registry/routers/config.py index 1c95ac046..3d1884af6 100644 --- a/src/schema_registry/routers/config.py +++ b/src/schema_registry/routers/config.py @@ -7,12 +7,12 @@ from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated @@ -42,7 +42,7 @@ async def config_put( request: Request, compatibility_level_request: CompatibilityRequest, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), @@ -82,7 +82,7 @@ async def config_set_subject( subject: Subject, compatibility_level_request: CompatibilityRequest, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), @@ -106,7 +106,7 @@ async def config_delete_subject( request: Request, subject: Subject, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), diff --git a/src/schema_registry/routers/master_availability.py b/src/schema_registry/routers/master_availability.py index a3783575a..02d072afd 100644 --- a/src/schema_registry/routers/master_availability.py +++ b/src/schema_registry/routers/master_availability.py @@ -7,9 +7,9 @@ from fastapi import APIRouter, Depends, Request, Response from karapace.config import Config from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from pydantic import BaseModel from schema_registry.container import SchemaRegistryContainer +from schema_registry.registry import KarapaceSchemaRegistry from typing import Final import logging @@ -38,7 +38,7 @@ async def master_availability( response: Response, config: Config = Depends(Provide[SchemaRegistryContainer.karapace_container.config]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), ) -> MasterAvailabilityResponse: are_we_master, master_url = await schema_registry.get_master() LOG.info("are master %s, master url %s", are_we_master, master_url) diff --git a/src/schema_registry/routers/mode.py b/src/schema_registry/routers/mode.py index c139e8e7d..4df141d3d 100644 --- a/src/schema_registry/routers/mode.py +++ b/src/schema_registry/routers/mode.py @@ -8,9 +8,9 @@ from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import ModeResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/schemas.py b/src/schema_registry/routers/schemas.py index 984c50085..63cb7dadc 100644 --- a/src/schema_registry/routers/schemas.py +++ b/src/schema_registry/routers/schemas.py @@ -7,8 +7,8 @@ from fastapi import APIRouter, Depends, Query from karapace.auth import AuthenticatorAndAuthorizer, User from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.requests import SchemaListingItem, SchemasResponse, SubjectVersion -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/subjects.py b/src/schema_registry/routers/subjects.py index 4d0a9fe94..cd5352490 100644 --- a/src/schema_registry/routers/subjects.py +++ b/src/schema_registry/routers/subjects.py @@ -7,12 +7,12 @@ from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated @@ -74,7 +74,7 @@ async def subjects_subject_delete( permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): @@ -155,7 +155,7 @@ async def subjects_subject_version_delete( permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): diff --git a/tests/e2e/schema_registry/test_jsonschema.py b/tests/e2e/schema_registry/test_jsonschema.py index 803b72223..01267f6f0 100644 --- a/tests/e2e/schema_registry/test_jsonschema.py +++ b/tests/e2e/schema_registry/test_jsonschema.py @@ -5,8 +5,8 @@ from jsonschema import Draft7Validator from karapace.client import Client from karapace.compatibility import CompatibilityModes -from karapace.schema_reader import SchemaType from karapace.typing import SchemaMetadata, SchemaRuleSet +from schema_registry.reader import SchemaType from tests.schemas.json_schemas import ( A_DINT_B_DINT_OBJECT_SCHEMA, A_DINT_B_INT_OBJECT_SCHEMA, diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 2f6a7097a..1a7937eca 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -11,7 +11,7 @@ from karapace.rapu import is_success from karapace.schema_type import SchemaType from karapace.utils import json_encode -from schema_registry.schema_registry_apis import SchemaErrorMessages +from schema_registry.controller import SchemaErrorMessages from tests.base_testcase import BaseTestCase from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 46643c9ce..0b5ae9f2c 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -12,8 +12,8 @@ from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode +from schema_registry.reader import KafkaSchemaReader from tests.base_testcase import BaseTestCase from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers diff --git a/tests/unit/schema_registry/__init__.py b/tests/unit/schema_registry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/schema_registry/test_controller.py similarity index 92% rename from tests/unit/test_schema_registry_api.py rename to tests/unit/schema_registry/test_controller.py index f21f47097..0b1ccbf89 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/schema_registry/test_controller.py @@ -5,8 +5,8 @@ from fastapi.exceptions import HTTPException from karapace.rapu import HTTPResponse from karapace.schema_models import SchemaType, ValidatedTypedSchema -from karapace.schema_reader import KafkaSchemaReader from schema_registry.container import SchemaRegistryContainer +from schema_registry.reader import KafkaSchemaReader from unittest.mock import Mock, patch, PropertyMock import asyncio @@ -45,7 +45,7 @@ async def test_validate_schema_request_body(schema_registry_container: SchemaReg async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryContainer) -> None: - with patch("karapace.container.KarapaceSchemaRegistry") as schema_registry_class: + with patch("schema_registry.container.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) type(schema_reader_mock).ready = ready_property_mock @@ -60,7 +60,6 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC close_func.return_value = close_future_result schema_registry_class.close = close_func - schema_registry_container.karapace_container().schema_registry = schema_registry_class controller = schema_registry_container.schema_registry_controller() controller.schema_registry = schema_registry_class diff --git a/tests/unit/test_schema_reader.py b/tests/unit/schema_registry/test_reader.py similarity index 98% rename from tests/unit/test_schema_reader.py rename to tests/unit/schema_registry/test_reader.py index a245d026f..e1e9e0db5 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/schema_registry/test_reader.py @@ -16,7 +16,10 @@ from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import ( +from karapace.schema_type import SchemaType +from karapace.typing import SchemaId, Version +from pytest import MonkeyPatch +from schema_registry.reader import ( KafkaSchemaReader, MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, MAX_MESSAGES_TO_CONSUME_ON_STARTUP, @@ -24,9 +27,6 @@ OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) -from karapace.schema_type import SchemaType -from karapace.typing import SchemaId, Version -from pytest import MonkeyPatch from tests.base_testcase import BaseTestCase from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref from unittest.mock import Mock @@ -318,12 +318,12 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_cont database=database_mock, ) - with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARNING, logger="schema_registry.reader"): schema_reader._handle_msg_schema_hard_delete( # pylint: disable=protected-access key={"subject": "test-subject", "version": 2} ) for log in caplog.records: - assert log.name == "karapace.schema_reader" + assert log.name == "schema_registry.reader" assert log.levelname == "WARNING" assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have" @@ -598,14 +598,14 @@ def test_message_error_handling( consumer_messages = ([message],) schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) - with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARNING, logger="schema_registry.reader"): with pytest.raises(test_case.expected_error): schema_reader.handle_messages() assert schema_reader.offset == 1 assert not schema_reader.ready() for log in caplog.records: - assert log.name == "karapace.schema_reader" + assert log.name == "schema_registry.reader" assert log.levelname == "WARNING" assert log.message == test_case.expected_log_message @@ -636,7 +636,7 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( ) message_using_ref = message_factory(key=key_using_ref, value=value_using_ref) - with caplog.at_level(logging.WARN, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARN, logger="schema_registry.reader"): # When handling the corrupted schema schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],)) @@ -662,8 +662,8 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( assert len(warn_records) == 2 # Check that different warnings are logged for each schema - assert warn_records[0].name == "karapace.schema_reader" + assert warn_records[0].name == "schema_registry.reader" assert warn_records[0].message == "Schema is not valid ProtoBuf definition" - assert warn_records[1].name == "karapace.schema_reader" + assert warn_records[1].name == "schema_registry.reader" assert warn_records[1].message == "Invalid Protobuf references" diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index 2a0156567..9dc8edf88 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -14,10 +14,10 @@ from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import SchemaVersion, TypedSchema -from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import Reference, Referents from karapace.typing import SchemaId, Version from pathlib import Path +from schema_registry.reader import KafkaSchemaReader from typing import Final TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/") From a142b73eb4714deec5cb43dad756c483a52daaaf Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 16:17:53 +0100 Subject: [PATCH 2/9] requirements: drop opentelemetry-instrumentation-fastapi --- pyproject.toml | 6 +- requirements/requirements-dev.txt | 79 +++++++++++++----------- requirements/requirements-typing.txt | 91 ++++++++++++++++------------ requirements/requirements.txt | 89 +++++++++++++++------------ 4 files changed, 149 insertions(+), 116 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cd9834ff1..729d0e829 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,9 +34,9 @@ dependencies = [ "zstandard", "prometheus-client == 0.20.0", "yarl == 1.12.1", - "opentelemetry-api == 1.28.2", - "opentelemetry-sdk == 1.28.2", - "opentelemetry-instrumentation-fastapi == 0.49b2", + "opentelemetry-api == 1.27.0", + "opentelemetry-sdk == 1.27.0", + "opentelemetry-exporter-otlp == 1.27.0", "dependency-injector == 4.43.0", # Patched dependencies diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 00c6d4d4c..d4b526cec 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # hypothesis @@ -42,7 +40,7 @@ brotli==1.1.0 # via geventhttpclient cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # geventhttpclient # httpcore @@ -63,13 +61,15 @@ confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) coverage[toml]==7.6.9 # via pytest-cov -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -86,7 +86,7 @@ fancycompleter==0.9.1 # via pdbpp fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi filelock==3.16.1 # via karapace (/karapace/pyproject.toml) @@ -109,8 +109,14 @@ gevent==24.11.1 # locust geventhttpclient==2.3.3 # via locust +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http greenlet==3.1.1 # via gevent +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -130,7 +136,7 @@ idna==3.10 # httpx # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api iniconfig==2.0.0 # via pytest @@ -166,38 +172,38 @@ multidict==6.1.0 # yarl networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 + # via + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk packaging==24.2 # via # aiokafka - # opentelemetry-instrumentation # pytest pdbpp==0.10.3 # via karapace (/karapace/pyproject.toml) @@ -206,7 +212,10 @@ pluggy==1.5.0 prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto psutil==6.1.0 # via # karapace (/karapace/pyproject.toml) @@ -266,6 +275,7 @@ requests==2.32.3 # via # karapace (/karapace/pyproject.toml) # locust + # opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -303,7 +313,6 @@ typer==0.15.1 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # locust @@ -321,7 +330,7 @@ urllib3==2.2.3 # geventhttpclient # requests # sentry-sdk -uvicorn[standard]==0.32.1 +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -341,9 +350,7 @@ werkzeug==3.1.3 wmctrl==0.5 # via pdbpp wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index bcda10c84..288204908 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # jsonschema @@ -36,11 +34,14 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via karapace (/karapace/pyproject.toml) cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # httpcore # httpx + # requests # sentry-sdk +charset-normalizer==3.4.0 + # via requests click==8.1.7 # via # rich-toolkit @@ -48,13 +49,15 @@ click==8.1.7 # uvicorn confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -64,12 +67,18 @@ exceptiongroup==1.2.2 # via anyio fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi frozenlist==1.5.0 # via # aiohttp # aiosignal +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -85,8 +94,9 @@ idna==3.10 # anyio # email-validator # httpx + # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api isodate==0.7.2 # via karapace (/karapace/pyproject.toml) @@ -114,42 +124,44 @@ mypy-extensions==1.0.0 # via mypy networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -packaging==24.2 + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # aiokafka - # opentelemetry-instrumentation + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk +packaging==24.2 + # via aiokafka prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto pydantic==2.10.2 # via # fastapi @@ -180,6 +192,8 @@ referencing==0.35.1 # jsonschema # jsonschema-specifications # types-jsonschema +requests==2.32.3 + # via opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -218,7 +232,6 @@ types-protobuf==3.20.4.6 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # multidict @@ -232,8 +245,10 @@ typing-extensions==4.12.2 ujson==5.10.0 # via karapace (/karapace/pyproject.toml) urllib3==2.2.3 - # via sentry-sdk -uvicorn[standard]==0.32.1 + # via + # requests + # sentry-sdk +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -246,9 +261,7 @@ watchfiles==0.24.0 websockets==14.1 # via uvicorn wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index edf3064a4..2b11e8010 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # jsonschema @@ -36,10 +34,13 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via karapace (/karapace/pyproject.toml) cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # httpcore # httpx + # requests +charset-normalizer==3.4.0 + # via requests click==8.1.7 # via # rich-toolkit @@ -47,13 +48,15 @@ click==8.1.7 # uvicorn confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -63,12 +66,18 @@ exceptiongroup==1.2.2 # via anyio fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi frozenlist==1.5.0 # via # aiohttp # aiosignal +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -84,8 +93,9 @@ idna==3.10 # anyio # email-validator # httpx + # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api isodate==0.7.2 # via karapace (/karapace/pyproject.toml) @@ -109,42 +119,44 @@ multidict==6.1.0 # yarl networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -packaging==24.2 + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # aiokafka - # opentelemetry-instrumentation + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk +packaging==24.2 + # via aiokafka prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto pydantic==2.10.2 # via # fastapi @@ -174,6 +186,8 @@ referencing==0.35.1 # via # jsonschema # jsonschema-specifications +requests==2.32.3 + # via opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -202,7 +216,6 @@ typer==0.15.1 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # multidict @@ -214,7 +227,9 @@ typing-extensions==4.12.2 # uvicorn ujson==5.10.0 # via karapace (/karapace/pyproject.toml) -uvicorn[standard]==0.32.1 +urllib3==2.2.3 + # via requests +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -227,9 +242,7 @@ watchfiles==0.24.0 websockets==14.1 # via uvicorn wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 From e18bb9fa34a65691cd464a085107a64bba6e7d3b Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 16:19:25 +0100 Subject: [PATCH 3/9] feat: add OTel tracing setup --- container/compose.yml | 89 ++++++++++++------- container/opentelemetry/collector-config.yaml | 29 ++++++ container/prometheus/prometheus.yml | 5 ++ src/karapace/config.py | 7 +- src/schema_registry/__main__.py | 23 ++++- src/schema_registry/container.py | 15 +++- src/schema_registry/middlewares/__init__.py | 3 + src/schema_registry/telemetry/__init__.py | 4 + src/schema_registry/telemetry/container.py | 13 +++ src/schema_registry/telemetry/middleware.py | 34 +++++++ src/schema_registry/telemetry/setup.py | 24 +++++ src/schema_registry/telemetry/tracer.py | 66 ++++++++++++++ tests/conftest.py | 34 +++++-- .../schema_registry/telemetry/__init__.py | 4 + .../telemetry/test_middleware.py | 56 ++++++++++++ .../schema_registry/telemetry/test_setup.py | 32 +++++++ .../schema_registry/telemetry/test_tracer.py | 57 ++++++++++++ 17 files changed, 454 insertions(+), 41 deletions(-) create mode 100644 container/opentelemetry/collector-config.yaml create mode 100644 src/schema_registry/telemetry/__init__.py create mode 100644 src/schema_registry/telemetry/container.py create mode 100644 src/schema_registry/telemetry/middleware.py create mode 100644 src/schema_registry/telemetry/setup.py create mode 100644 src/schema_registry/telemetry/tracer.py create mode 100644 tests/unit/schema_registry/telemetry/__init__.py create mode 100644 tests/unit/schema_registry/telemetry/test_middleware.py create mode 100644 tests/unit/schema_registry/telemetry/test_setup.py create mode 100644 tests/unit/schema_registry/telemetry/test_tracer.py diff --git a/container/compose.yml b/container/compose.yml index 0407f38c1..5b433f6ba 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -1,5 +1,3 @@ ---- -version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest @@ -67,6 +65,7 @@ services: - schema_registry depends_on: - kafka + - opentelemetry-collector ports: - 8081:8081 environment: @@ -85,6 +84,15 @@ services: KARAPACE_STATSD_PORT: 8125 KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + KARAPACE_TAGS__APP: karapace-schema-registry + + KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317 + OTEL_RESOURCE_ATTRIBUTES: | + service.instance.id=karapace-schema-registry, + service.name=karapace-schema-registry, + telemetry.sdk.name=opentelemetry, + telemetry.sdk.language=python, + telemetry.sdk.version=1.28.2 karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop @@ -133,17 +141,17 @@ services: - karapace-schema-registry - karapace-rest-proxy volumes: - - ../tests:/opt/karapace/tests - - ../pytest.ini:/opt/karapace/pytest.ini - - ../mypy.ini:/opt/karapace/mypy.ini - - ../.flake8:/opt/karapace/.flake8 - - ../.isort.cfg:/opt/karapace/.isort.cfg - - ../.pre-commit-config.yaml:/opt/karapace/.pre-commit-config.yaml - - ../.pylintrc:/opt/karapace/.pylintrc - - ../.coveragerc:/opt/karapace/.coveragerc - - ../.coverage.3.10:/opt/karapace/coverage/.coverage.3.10 - - ../.coverage.3.11:/opt/karapace/coverage/.coverage.3.11 - - ../.coverage.3.12:/opt/karapace/coverage/.coverage.3.12 + - ../tests:/opt/karapace/tests + - ../pytest.ini:/opt/karapace/pytest.ini + - ../mypy.ini:/opt/karapace/mypy.ini + - ../.flake8:/opt/karapace/.flake8 + - ../.isort.cfg:/opt/karapace/.isort.cfg + - ../.pre-commit-config.yaml:/opt/karapace/.pre-commit-config.yaml + - ../.pylintrc:/opt/karapace/.pylintrc + - ../.coveragerc:/opt/karapace/.coveragerc + - ../.coverage.3.10:/opt/karapace/coverage/.coverage.3.10 + - ../.coverage.3.11:/opt/karapace/coverage/.coverage.3.11 + - ../.coverage.3.12:/opt/karapace/coverage/.coverage.3.12 environment: - COVERAGE_FILE - COVERAGE_RCFILE=/opt/karapace/.coveragerc @@ -151,26 +159,47 @@ services: prometheus: image: prom/prometheus volumes: - - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml - - ./prometheus/rules.yml:/etc/prometheus/rules.yml + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - ./prometheus/rules.yml:/etc/prometheus/rules.yml + depends_on: + - karapace-schema-registry + - karapace-rest-proxy + - opentelemetry-collector ports: - 9090:9090 grafana: - image: grafana/grafana - environment: - GF_SECURITY_ADMIN_USER: karapace - GF_SECURITY_ADMIN_PASSWORD: karapace - GF_PATHS_PROVISIONING: /grafana/provisioning - ports: - - 3000:3000 - volumes: - - ./grafana/dashboards:/grafana/dashboards - - ./grafana/provisioning:/grafana/provisioning + image: grafana/grafana + environment: + GF_SECURITY_ADMIN_USER: karapace + GF_SECURITY_ADMIN_PASSWORD: karapace + GF_PATHS_PROVISIONING: /grafana/provisioning + ports: + - 3000:3000 + volumes: + - ./grafana/dashboards:/grafana/dashboards + - ./grafana/provisioning:/grafana/provisioning statsd-exporter: - image: prom/statsd-exporter - command: "--statsd.listen-udp=:8125 --web.listen-address=:9102" - ports: - - 9102:9102 - - 8125:8125/udp + image: prom/statsd-exporter + command: --statsd.listen-udp=:8125 --web.listen-address=:9102 + ports: + - 9102:9102 + - 8125:8125/udp + + opentelemetry-collector: + image: otel/opentelemetry-collector-contrib:latest + command: --config=/etc/collector-config.yaml + volumes: + - ./opentelemetry/collector-config.yaml:/etc/collector-config.yaml + ports: # 4317=OTLP-gRPC-receiver | 8888=prom-collector-metrics | 8889=prom-exporter-metrics + - 4317:4317 + - 8888:8888 + - 8889:8889 + + jaeger: + image: jaegertracing/all-in-one:latest + ports: # 6831=agent | 16686=UI | 14268=spans | 4317=metrics (not exposing, clashes with opentelemetry-collector) + - 6831:6831/udp + - 16686:16686 + - 14268:14268 diff --git a/container/opentelemetry/collector-config.yaml b/container/opentelemetry/collector-config.yaml new file mode 100644 index 000000000..3fcd0df0c --- /dev/null +++ b/container/opentelemetry/collector-config.yaml @@ -0,0 +1,29 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: opentelemetry-collector:4317 + +processors: + +extensions: + health_check: {} + +exporters: + otlp: + endpoint: jaeger:4317 + tls: + insecure: true + otlphttp/prometheus: + endpoint: prometheus:9090/api/v1/otlp + tls: + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlp] + metrics: + receivers: [otlp] + exporters: [otlphttp/prometheus] diff --git a/container/prometheus/prometheus.yml b/container/prometheus/prometheus.yml index f62e8082a..68b64109f 100644 --- a/container/prometheus/prometheus.yml +++ b/container/prometheus/prometheus.yml @@ -27,3 +27,8 @@ scrape_configs: static_configs: - targets: - statsd-exporter:9102 + + - job_name: opentelemetry-collector + static_configs: + - targets: + - opentelemetry-collector:8888 diff --git a/src/karapace/config.py b/src/karapace/config.py index 332363d46..152249eb2 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -27,8 +27,12 @@ class KarapaceTags(BaseModel): app: str = "Karapace" +class KarapaceTelemetry(BaseModel): + otel_endpoint_url: str | None = None + + class Config(BaseSettings): - model_config = SettingsConfigDict(env_prefix="karapace_", env_ignore_empty=True) + model_config = SettingsConfigDict(env_prefix="karapace_", env_ignore_empty=True, env_nested_delimiter="__") access_logs_debug: bool = False access_log_class: ImportString = "karapace.utils.DebugAccessLogger" @@ -101,6 +105,7 @@ class Config(BaseSettings): sentry: Mapping[str, object] | None = None tags: KarapaceTags = KarapaceTags() + telemetry: KarapaceTelemetry = KarapaceTelemetry() # add rest uri if not set # f"{new_config['advertised_protocol']}://{new_config['advertised_hostname']}:{new_config['advertised_port']}" diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 7ff513584..9bd86cb8f 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -5,8 +5,11 @@ from karapace.container import KarapaceContainer from schema_registry.container import SchemaRegistryContainer from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan +from schema_registry.telemetry.container import TelemetryContainer +import schema_registry.controller import schema_registry.factory +import schema_registry.reader import schema_registry.routers.compatibility import schema_registry.routers.config import schema_registry.routers.health @@ -15,7 +18,9 @@ import schema_registry.routers.mode import schema_registry.routers.schemas import schema_registry.routers.subjects -import schema_registry.schema_registry_apis +import schema_registry.telemetry.middleware +import schema_registry.telemetry.setup +import schema_registry.telemetry.tracer import schema_registry.user import uvicorn @@ -24,11 +29,23 @@ container.wire( modules=[ __name__, - schema_registry.schema_registry_apis, + schema_registry.controller, + schema_registry.telemetry.tracer, ] ) - schema_registry_container = SchemaRegistryContainer(karapace_container=container) + telemetry_container = TelemetryContainer() + telemetry_container.wire( + modules=[ + schema_registry.telemetry.setup, + schema_registry.telemetry.middleware, + schema_registry.reader, + ] + ) + + schema_registry_container = SchemaRegistryContainer( + karapace_container=container, telemetry_container=telemetry_container + ) schema_registry_container.wire( modules=[ __name__, diff --git a/src/schema_registry/container.py b/src/schema_registry/container.py index b93bc4139..5772f60e8 100644 --- a/src/schema_registry/container.py +++ b/src/schema_registry/container.py @@ -5,14 +5,25 @@ from dependency_injector import containers, providers from karapace.container import KarapaceContainer -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer class SchemaRegistryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) + telemetry_container = providers.Container(TelemetryContainer) + + schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=karapace_container.config) + schema_registry_controller = providers.Singleton( KarapaceSchemaRegistryController, config=karapace_container.config, - schema_registry=karapace_container.schema_registry, + schema_registry=schema_registry, stats=karapace_container.statsd, ) + + tracer_provider = providers.Singleton(TracerProvider) + tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/middlewares/__init__.py b/src/schema_registry/middlewares/__init__.py index 8df42b04c..d86fbf40e 100644 --- a/src/schema_registry/middlewares/__init__.py +++ b/src/schema_registry/middlewares/__init__.py @@ -7,6 +7,7 @@ from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse from karapace.content_type import check_schema_headers +from schema_registry.telemetry.middleware import setup_telemetry_middleware def setup_middlewares(app: FastAPI) -> None: @@ -32,3 +33,5 @@ async def set_content_types(request: Request, call_next: Callable[[Request], Awa response = await call_next(request) response.headers["Content-Type"] = response_content_type return response + + setup_telemetry_middleware(app=app) diff --git a/src/schema_registry/telemetry/__init__.py b/src/schema_registry/telemetry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/src/schema_registry/telemetry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py new file mode 100644 index 000000000..3aad7ac11 --- /dev/null +++ b/src/schema_registry/telemetry/container.py @@ -0,0 +1,13 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.telemetry.tracer import Tracer + + +class TelemetryContainer(containers.DeclarativeContainer): + tracer_provider = providers.Singleton(TracerProvider) + tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/telemetry/middleware.py b/src/schema_registry/telemetry/middleware.py new file mode 100644 index 000000000..c6d14bbe9 --- /dev/null +++ b/src/schema_registry/telemetry/middleware.py @@ -0,0 +1,34 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from collections.abc import Awaitable, Callable +from dependency_injector.wiring import inject, Provide +from fastapi import FastAPI, Request, Response +from opentelemetry.trace import SpanKind +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer + +import logging + +LOG = logging.getLogger(__name__) + + +@inject +async def telemetry_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + tracer: Tracer = Provide[TelemetryContainer.tracer], +) -> Response: + resource = request.url.path.split("/")[1] + with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: + tracer.update_span_with_request(request=request, span=span) + response: Response = await call_next(request) + tracer.update_span_with_response(response=response, span=span) + return response + + +def setup_telemetry_middleware(app: FastAPI) -> None: + LOG.info("Setting OTel tracing middleware") + app.middleware("http")(telemetry_middleware) diff --git a/src/schema_registry/telemetry/setup.py b/src/schema_registry/telemetry/setup.py new file mode 100644 index 000000000..30b423902 --- /dev/null +++ b/src/schema_registry/telemetry/setup.py @@ -0,0 +1,24 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer + +import logging + +LOG = logging.getLogger(__name__) + + +@inject +def setup_tracing( + tracer_provider: TracerProvider = Provide[TelemetryContainer.tracer_provider], + tracer: Tracer = Provide[TelemetryContainer.tracer], +) -> None: + LOG.info("Setting OTel tracing provider") + tracer_provider.add_span_processor(tracer.get_span_processor()) + trace.set_tracer_provider(tracer_provider) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py new file mode 100644 index 000000000..5b6ed86ea --- /dev/null +++ b/src/schema_registry/telemetry/tracer.py @@ -0,0 +1,66 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from fastapi import Request, Response +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor +from opentelemetry.trace.span import Span + +import inspect + + +class Tracer: + @staticmethod + @inject + def get_tracer(config: Config = Provide[KarapaceContainer.config]) -> trace.Tracer: + return trace.get_tracer(f"{config.tags.app}.tracer") + + @staticmethod + @inject + def get_span_processor(config: Config = Provide[KarapaceContainer.config]) -> SpanProcessor: + if config.telemetry.otel_endpoint_url: + otlp_span_exporter = OTLPSpanExporter(endpoint=config.telemetry.otel_endpoint_url) + return BatchSpanProcessor(otlp_span_exporter) + return SimpleSpanProcessor(ConsoleSpanExporter()) + + @staticmethod + def get_name_from_caller() -> str: + return inspect.stack()[1].function + + @staticmethod + def get_name_from_caller_with_class(function_class, function) -> str: + return f"{type(function_class).__name__}.{function.__name__}" + + @staticmethod + def add_span_attribute(span: Span, key: str, value: str | int) -> None: + if span.is_recording(): + span.set_attribute(key, value) + + @staticmethod + def update_span_with_request(request: Request, span: Span) -> None: + if span.is_recording(): + span.set_attribute("server.scheme", request.url.scheme) + span.set_attribute("server.hostname", request.url.hostname) + span.set_attribute("server.port", request.url.port) + span.set_attribute("server.is_secure", request.url.is_secure) + span.set_attribute("request.http.method", request.method) + span.set_attribute("request.http.path", request.url.path) + span.set_attribute("request.http.client.host", request.client.host) + span.set_attribute("request.http.client.port", request.client.port) + span.set_attribute("request.http.headers.connection", request.headers.get("connection", "")) + span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", "")) + span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", "")) + + @staticmethod + def update_span_with_response(response: Response, span: Span) -> None: + if span.is_recording(): + span.set_attribute("response.http.status_code", response.status_code) + span.set_attribute("response.http.media_type", response.media_type or "") + span.set_attribute("response.http.headers.content_type", response.headers.get("content-type", "")) + span.set_attribute("response.http.headers.content_length", response.headers.get("content-length", "")) diff --git a/tests/conftest.py b/tests/conftest.py index 8413773fa..e48a2600a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,12 +6,16 @@ from karapace.container import KarapaceContainer from pathlib import Path from schema_registry.container import SchemaRegistryContainer +from schema_registry.telemetry.container import TelemetryContainer from tempfile import mkstemp import json import os import pytest import re +import schema_registry.controller +import schema_registry.reader +import schema_registry.telemetry.tracer pytest_plugins = "aiohttp.pytest_plugin" KAFKA_BOOTSTRAP_SERVERS_OPT = "--kafka-bootstrap-servers" @@ -182,11 +186,31 @@ def fixture_tmp_file(): path.unlink() -@pytest.fixture(name="karapace_container", scope="session") +@pytest.fixture(name="karapace_container", scope="session", autouse=True) def fixture_karapace_container() -> KarapaceContainer: - return KarapaceContainer() + karapace_container = KarapaceContainer() + karapace_container.wire( + modules=[ + schema_registry.controller, + schema_registry.telemetry.tracer, + ] + ) + return karapace_container + + +@pytest.fixture(name="telemetry_container", scope="session", autouse=True) +def fixture_telemetry_container() -> TelemetryContainer: + telemetry_container = TelemetryContainer() + telemetry_container.wire( + modules=[ + schema_registry.reader, + ] + ) + return telemetry_container -@pytest.fixture -def schema_registry_container(karapace_container: KarapaceContainer) -> SchemaRegistryContainer: - return SchemaRegistryContainer(karapace_container=karapace_container) +@pytest.fixture(name="schema_registry_container", scope="session", autouse=True) +def fixture_schema_registry_container( + karapace_container: KarapaceContainer, telemetry_container: TelemetryContainer +) -> SchemaRegistryContainer: + return SchemaRegistryContainer(karapace_container=karapace_container, telemetry_container=telemetry_container) diff --git a/tests/unit/schema_registry/telemetry/__init__.py b/tests/unit/schema_registry/telemetry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/schema_registry/telemetry/test_middleware.py b/tests/unit/schema_registry/telemetry/test_middleware.py new file mode 100644 index 000000000..ecbe79307 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_middleware.py @@ -0,0 +1,56 @@ +""" +schema_registry - telemetry middleware tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from fastapi import FastAPI, Request, Response +from opentelemetry.trace import SpanKind +from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import AsyncMock, MagicMock + +import logging + + +def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None: + app = AsyncMock(spec=FastAPI) + with caplog.at_level(logging.INFO, logger="schema_registry.telemetry.middleware"): + setup_telemetry_middleware(app=app) + + for log in caplog.records: + assert log.name == "schema_registry.telemetry.middleware" + assert log.levelname == "INFO" + assert log.message == "Setting OTel tracing middleware" + + app.middleware.assert_called_once_with("http") + app.middleware.return_value.assert_called_once_with(telemetry_middleware) + + +async def test_telemetry_middleware() -> None: + tracer = MagicMock(spec=Tracer) + + request_mock = AsyncMock(spec=Request) + request_mock.method = "GET" + request_mock.url.path = "/test" + + response_mock = AsyncMock(spec=Response) + response_mock.status_code = 200 + + call_next = AsyncMock() + call_next.return_value = response_mock + + response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer) + span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value + + tracer.get_tracer.assert_called_once() + tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) + tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) + tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request_mock) + + assert response == response_mock diff --git a/tests/unit/schema_registry/telemetry/test_setup.py b/tests/unit/schema_registry/telemetry/test_setup.py new file mode 100644 index 000000000..e3dc90690 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_setup.py @@ -0,0 +1,32 @@ +""" +schema_registry - telemetry setup tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.telemetry.setup import setup_tracing +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import MagicMock, patch + +import logging + + +def test_setup_telemetry(caplog: LogCaptureFixture): + tracer_provider = MagicMock(spec=TracerProvider) + tracer = MagicMock(spec=Tracer) + with ( + caplog.at_level(logging.INFO, logger="schema_registry.telemetry.setup"), + patch("schema_registry.telemetry.setup.trace") as mock_trace, + ): + tracer.get_span_processor.return_value = "span_processor" + setup_tracing(tracer_provider=tracer_provider, tracer=tracer) + + tracer_provider.add_span_processor.assert_called_once_with("span_processor") + mock_trace.set_tracer_provider.assert_called_once_with(tracer_provider) + for log in caplog.records: + assert log.name == "schema_registry.telemetry.setup" + assert log.levelname == "INFO" + assert log.message == "Setting OTel tracing provider" diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py new file mode 100644 index 000000000..25f349e41 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -0,0 +1,57 @@ +""" +schema_registry - telemetry middleware tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.config import KarapaceTelemetry +from karapace.container import KarapaceContainer +from opentelemetry.sdk.trace.export import SpanProcessor +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import patch + + +def test_tracer(karapace_container: KarapaceContainer): + with patch("schema_registry.telemetry.tracer.trace") as mock_trace: + Tracer.get_tracer(config=karapace_container.config()) + mock_trace.get_tracer.assert_called_once_with("Karapace.tracer") + + +def test_get_name_from_caller(): + def test_function(): + return Tracer.get_name_from_caller() + + assert test_function() == "test_function" + + +def test_get_name_from_caller_with_class(): + class Test: + def test_function(self): + return Tracer.get_name_from_caller_with_class(self, self.test_function) + + assert Test().test_function() == "Test.test_function" + + +def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} + ) + with ( + patch("schema_registry.telemetry.tracer.OTLPSpanExporter") as mock_otlp_exporter, + patch("schema_registry.telemetry.tracer.BatchSpanProcessor") as mock_batch_span_processor, + ): + processor: SpanProcessor = Tracer.get_span_processor(config=config) + mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") + mock_batch_span_processor.assert_called_once_with(mock_otlp_exporter.return_value) + assert processor is mock_batch_span_processor.return_value + + +def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + with ( + patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter, + patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor, + ): + processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config()) + mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) + assert processor is mock_simple_span_processor.return_value From 4beb1ee53e188cae8c53e3bff984422fea8debca Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 16:19:57 +0100 Subject: [PATCH 4/9] feat: use tracing in healthcheck endpoint --- .../coordinator/master_coordinator.py | 27 ++++--- src/karapace/offset_watcher.py | 9 ++- src/schema_registry/__main__.py | 4 + src/schema_registry/container.py | 5 -- src/schema_registry/reader.py | 57 ++++++++------ src/schema_registry/routers/health.py | 76 +++++++++++++------ src/schema_registry/telemetry/tracer.py | 14 ++-- .../schema_registry/telemetry/test_tracer.py | 2 +- 8 files changed, 120 insertions(+), 74 deletions(-) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 9b4f8181c..ada53c664 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -10,10 +10,13 @@ from aiokafka.errors import KafkaConnectionError from aiokafka.helpers import create_ssl_context from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest +from dependency_injector.wiring import inject, Provide from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import SchemaReaderStoppper +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -146,16 +149,20 @@ def init_schema_coordinator(self) -> SchemaCoordinator: schema_coordinator.start() return schema_coordinator - def get_coordinator_status(self) -> SchemaCoordinatorStatus: - assert self._sc is not None - generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID - return SchemaCoordinatorStatus( - is_primary=self._sc.are_we_master() if self._sc is not None else None, - is_primary_eligible=self._config.master_eligibility, - primary_url=self._sc.master_url if self._sc is not None else None, - is_running=True, - group_generation_id=generation if generation is not None else -1, - ) + @inject + def get_coordinator_status(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> SchemaCoordinatorStatus: + with tracer.get_tracer().start_as_current_span( + tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) + ): + assert self._sc is not None + generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID + return SchemaCoordinatorStatus( + is_primary=self._sc.are_we_master() if self._sc is not None else None, + is_primary_eligible=self._config.master_eligibility, + primary_url=self._sc.master_url if self._sc is not None else None, + is_running=True, + group_generation_id=generation if generation is not None else -1, + ) def get_master_info(self) -> tuple[bool | None, str | None]: """Return whether we're the master, and the actual master url that can be used if we're not""" diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 6056d5f37..092691e9f 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -4,6 +4,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from dependency_injector.wiring import inject, Provide +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from threading import Condition @@ -20,8 +23,10 @@ def __init__(self) -> None: self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. - def greatest_offset(self) -> int: - return self._greatest_offset + @inject + def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)): + return self._greatest_offset def offset_seen(self, new_offset: int) -> None: with self._condition: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 9bd86cb8f..3e1fcae25 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -7,6 +7,8 @@ from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan from schema_registry.telemetry.container import TelemetryContainer +import karapace.coordinator.master_coordinator +import karapace.offset_watcher import schema_registry.controller import schema_registry.factory import schema_registry.reader @@ -40,6 +42,8 @@ schema_registry.telemetry.setup, schema_registry.telemetry.middleware, schema_registry.reader, + karapace.offset_watcher, + karapace.coordinator.master_coordinator, ] ) diff --git a/src/schema_registry/container.py b/src/schema_registry/container.py index 5772f60e8..21bb53c13 100644 --- a/src/schema_registry/container.py +++ b/src/schema_registry/container.py @@ -5,11 +5,9 @@ from dependency_injector import containers, providers from karapace.container import KarapaceContainer -from opentelemetry.sdk.trace import TracerProvider from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.telemetry.container import TelemetryContainer -from schema_registry.telemetry.tracer import Tracer class SchemaRegistryContainer(containers.DeclarativeContainer): @@ -24,6 +22,3 @@ class SchemaRegistryContainer(containers.DeclarativeContainer): schema_registry=schema_registry, stats=karapace_container.statsd, ) - - tracer_provider = providers.Singleton(TracerProvider) - tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index a0e4f68b3..4919a0271 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -278,29 +278,33 @@ def run(self) -> None: self.consecutive_unexpected_errors_start = time.monotonic() LOG.warning("Unexpected exception in schema reader loop - %s", e) - async def is_healthy(self) -> bool: - if ( - self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS - and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS - ): - LOG.warning( - "Health check failed with %s consecutive errors in %s seconds", self.consecutive_unexpected_errors, duration - ) - return False - - try: - # Explicitly check if topic exists. - # This needs to be done because in case of missing topic the consumer will not repeat the error - # on conscutive consume calls and instead will return empty list. - assert self.admin_client is not None - topic = self.config.topic_name - res = self.admin_client.describe_topics(TopicCollection([topic])) - await asyncio.wrap_future(res[topic]) - except Exception as e: # pylint: disable=broad-except - LOG.warning("Health check failed with %r", e) - return False + @inject + async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)): + if ( + self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS + and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS + ): + LOG.warning( + "Health check failed with %s consecutive errors in %s seconds", + self.consecutive_unexpected_errors, + duration, + ) + return False - return True + try: + # Explicitly check if topic exists. + # This needs to be done because in case of missing topic the consumer will not repeat the error + # on conscutive consume calls and instead will return empty list. + assert self.admin_client is not None + topic = self.config.topic_name + res = self.admin_client.describe_topics(TopicCollection([topic])) + await asyncio.wrap_future(res[topic]) + except Exception as e: # pylint: disable=broad-except + LOG.warning("Health check failed with %r", e) + return False + + return True def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -369,12 +373,15 @@ def _is_ready(self) -> bool: LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) return ready - def highest_offset(self) -> int: - return max(self._highest_offset, self._offset_watcher.greatest_offset()) + @inject + def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: + with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)): + return max(self._highest_offset, self._offset_watcher.greatest_offset()) @inject def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.ready)): + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span: + span.add_event("Acquiring ready lock") with self._ready_lock: return self._ready diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index b02d2f760..c7365a555 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -5,9 +5,12 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, HTTPException, status -from karapace.schema_registry import KarapaceSchemaRegistry +from opentelemetry.trace import Span +from opentelemetry.trace.status import StatusCode from pydantic import BaseModel from schema_registry.container import SchemaRegistryContainer +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.telemetry.tracer import Tracer class HealthStatus(BaseModel): @@ -34,34 +37,57 @@ class HealthCheck(BaseModel): ) +def set_health_status_tracing_attributes(health_check_span: Span, health_status: HealthStatus) -> None: + health_check_span.add_event("Setting health status tracing attributes") + health_check_span.set_attribute("schema_registry_ready", health_status.schema_registry_ready) + health_check_span.set_attribute("schema_registry_startup_time_sec", health_status.schema_registry_startup_time_sec) + health_check_span.set_attribute( + "schema_registry_reader_current_offset", health_status.schema_registry_reader_current_offset + ) + health_check_span.set_attribute( + "schema_registry_reader_highest_offset", health_status.schema_registry_reader_highest_offset + ) + health_check_span.set_attribute("schema_registry_is_primary", getattr(health_status, "schema_registry_is_primary", "")) + + @health_router.get("") @inject async def health( - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), + tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]), ) -> HealthCheck: - starttime = 0.0 - if schema_registry.schema_reader.ready(): - starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time - - cs = schema_registry.mc.get_coordinator_status() - - health_status = HealthStatus( - schema_registry_ready=schema_registry.schema_reader.ready(), - schema_registry_startup_time_sec=starttime, - schema_registry_reader_current_offset=schema_registry.schema_reader.offset, - schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), - schema_registry_is_primary=cs.is_primary, - schema_registry_is_primary_eligible=cs.is_primary_eligible, - schema_registry_primary_url=cs.primary_url, - schema_registry_coordinator_running=cs.is_running, - schema_registry_coordinator_generation_id=cs.group_generation_id, - ) - # if self._auth is not None: - # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span: + starttime = 0.0 - if not await schema_registry.schema_reader.is_healthy(): - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + health_check_span.add_event("Checking schema-reader is ready") + schema_reader_is_ready = schema_registry.schema_reader.ready() + if schema_reader_is_ready: + starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time + + health_check_span.add_event("Getting schema-registry master-coordinator status") + cs = schema_registry.mc.get_coordinator_status() + + health_check_span.add_event("Building health status response model") + health_status = HealthStatus( + schema_registry_ready=schema_reader_is_ready, + schema_registry_startup_time_sec=starttime, + schema_registry_reader_current_offset=schema_registry.schema_reader.offset, + schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), + schema_registry_is_primary=cs.is_primary, + schema_registry_is_primary_eligible=cs.is_primary_eligible, + schema_registry_primary_url=cs.primary_url, + schema_registry_coordinator_running=cs.is_running, + schema_registry_coordinator_generation_id=cs.group_generation_id, ) + set_health_status_tracing_attributes(health_check_span=health_check_span, health_status=health_status) + + # if self._auth is not None: + # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + + if not await schema_registry.schema_reader.is_healthy(): + health_check_span.add_event("Erroring because schema-reader is not healthy") + health_check_span.set_status(status=StatusCode.ERROR, description="Schema reader is not healthy") + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) - return HealthCheck(status=health_status, healthy=True) + health_check_span.add_event("Returning health check response") + return HealthCheck(status=health_status, healthy=True) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index 5b6ed86ea..e25a463da 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -3,6 +3,7 @@ See LICENSE for details """ +from collections.abc import Callable from dependency_injector.wiring import inject, Provide from fastapi import Request, Response from karapace.config import Config @@ -11,6 +12,7 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor from opentelemetry.trace.span import Span +from typing import Any import inspect @@ -34,8 +36,8 @@ def get_name_from_caller() -> str: return inspect.stack()[1].function @staticmethod - def get_name_from_caller_with_class(function_class, function) -> str: - return f"{type(function_class).__name__}.{function.__name__}" + def get_name_from_caller_with_class(function_class: object, function: Callable[[Any], Any]) -> str: + return f"{type(function_class).__name__}.{function.__name__}()" @staticmethod def add_span_attribute(span: Span, key: str, value: str | int) -> None: @@ -46,13 +48,13 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None: def update_span_with_request(request: Request, span: Span) -> None: if span.is_recording(): span.set_attribute("server.scheme", request.url.scheme) - span.set_attribute("server.hostname", request.url.hostname) - span.set_attribute("server.port", request.url.port) + span.set_attribute("server.hostname", request.url.hostname or "") + span.set_attribute("server.port", request.url.port or "") span.set_attribute("server.is_secure", request.url.is_secure) span.set_attribute("request.http.method", request.method) span.set_attribute("request.http.path", request.url.path) - span.set_attribute("request.http.client.host", request.client.host) - span.set_attribute("request.http.client.port", request.client.port) + span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "") + span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "") span.set_attribute("request.http.headers.connection", request.headers.get("connection", "")) span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", "")) span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", "")) diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index 25f349e41..ff11c31fc 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -30,7 +30,7 @@ class Test: def test_function(self): return Tracer.get_name_from_caller_with_class(self, self.test_function) - assert Test().test_function() == "Test.test_function" + assert Test().test_function() == "Test.test_function()" def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: From 3897c38927717efacd837a2977340b6eab3e00fc Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 16 Dec 2024 21:58:18 +0100 Subject: [PATCH 5/9] otel: use dedicated container - we also build a custom resource --- container/compose.yml | 12 ++-- src/karapace/config.py | 5 ++ src/karapace/offset_watcher.py | 2 +- src/schema_registry/__main__.py | 15 +++-- src/schema_registry/reader.py | 4 +- src/schema_registry/routers/health.py | 2 +- src/schema_registry/telemetry/container.py | 20 +++++- src/schema_registry/telemetry/tracer.py | 36 ++++++----- .../schema_registry/telemetry/test_tracer.py | 63 ++++++++++++++++++- 9 files changed, 123 insertions(+), 36 deletions(-) diff --git a/container/compose.yml b/container/compose.yml index 5b433f6ba..c42d4b428 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -85,14 +85,12 @@ services: KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true KARAPACE_TAGS__APP: karapace-schema-registry - KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317 - OTEL_RESOURCE_ATTRIBUTES: | - service.instance.id=karapace-schema-registry, - service.name=karapace-schema-registry, - telemetry.sdk.name=opentelemetry, - telemetry.sdk.language=python, - telemetry.sdk.version=1.28.2 + KARAPACE_TELEMETRY__RESOURCE_SERVICE_NAME: karapace-schema-registry + KARAPACE_TELEMETRY__RESOURCE_SERVICE_INSTANCE_ID: sr1 + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_NAME: opentelemetry + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_LANGUAGE: python + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_VERSION: 1.27.0 karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop diff --git a/src/karapace/config.py b/src/karapace/config.py index 152249eb2..0dd811d92 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -29,6 +29,11 @@ class KarapaceTags(BaseModel): class KarapaceTelemetry(BaseModel): otel_endpoint_url: str | None = None + resource_service_name: str = "karapace" + resource_service_instance_id: str = "karapace" + resource_telemetry_sdk_name: str = "opentelemetry" + resource_telemetry_sdk_language: str = "python" + resource_telemetry_sdk_version: str = "1.27.0" class Config(BaseSettings): diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 092691e9f..5c64cf12b 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -25,7 +25,7 @@ def __init__(self) -> None: @inject def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: - with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)): + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)): return self._greatest_offset def offset_seen(self, new_offset: int) -> None: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 3e1fcae25..20dfc109e 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -27,8 +27,8 @@ import uvicorn if __name__ == "__main__": - container = KarapaceContainer() - container.wire( + karapace_container = KarapaceContainer() + karapace_container.wire( modules=[ __name__, schema_registry.controller, @@ -36,7 +36,7 @@ ] ) - telemetry_container = TelemetryContainer() + telemetry_container = TelemetryContainer(karapace_container=karapace_container) telemetry_container.wire( modules=[ schema_registry.telemetry.setup, @@ -48,7 +48,7 @@ ) schema_registry_container = SchemaRegistryContainer( - karapace_container=container, telemetry_container=telemetry_container + karapace_container=karapace_container, telemetry_container=telemetry_container ) schema_registry_container.wire( modules=[ @@ -66,7 +66,6 @@ ] ) - app = create_karapace_application(config=container.config(), lifespan=karapace_schema_registry_lifespan) - uvicorn.run( - app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower() - ) + config = karapace_container.config() + app = create_karapace_application(config=config, lifespan=karapace_schema_registry_lifespan) + uvicorn.run(app, host=config.host, port=config.port, log_level=config.log_level.lower()) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index 4919a0271..6150a69ec 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -280,7 +280,7 @@ def run(self) -> None: @inject async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)): + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)): if ( self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS @@ -375,7 +375,7 @@ def _is_ready(self) -> bool: @inject def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: - with tracer.get_tracer().start_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)): + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)): return max(self._highest_offset, self._offset_watcher.greatest_offset()) @inject diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index c7365a555..f41fa13cb 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -56,7 +56,7 @@ async def health( schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]), ) -> HealthCheck: - with tracer.get_tracer().start_span("health_check_api_handler_GET") as health_check_span: + with tracer.get_tracer().start_as_current_span("APIRouter: health_check") as health_check_span: starttime = 0.0 health_check_span.add_event("Checking schema-reader is ready") diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py index 3aad7ac11..d9d53ea2f 100644 --- a/src/schema_registry/telemetry/container.py +++ b/src/schema_registry/telemetry/container.py @@ -4,10 +4,28 @@ """ from dependency_injector import containers, providers +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.semconv.attributes import telemetry_attributes as T from schema_registry.telemetry.tracer import Tracer +def create_tracing_resource(config: Config) -> Resource: + return Resource.create( + { + "service.name": config.telemetry.resource_service_name, + "service.instance.id": config.telemetry.resource_service_instance_id, + T.TELEMETRY_SDK_NAME: config.telemetry.resource_telemetry_sdk_name, + T.TELEMETRY_SDK_LANGUAGE: config.telemetry.resource_telemetry_sdk_language, + T.TELEMETRY_SDK_VERSION: config.telemetry.resource_telemetry_sdk_version, + } + ) + + class TelemetryContainer(containers.DeclarativeContainer): - tracer_provider = providers.Singleton(TracerProvider) + karapace_container = providers.Container(KarapaceContainer) + tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) + tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index e25a463da..d23fa0ec4 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -11,6 +11,12 @@ from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor +from opentelemetry.semconv.attributes import ( + client_attributes as C, + http_attributes as H, + server_attributes as S, + url_attributes as U, +) from opentelemetry.trace.span import Span from typing import Any @@ -47,22 +53,22 @@ def add_span_attribute(span: Span, key: str, value: str | int) -> None: @staticmethod def update_span_with_request(request: Request, span: Span) -> None: if span.is_recording(): - span.set_attribute("server.scheme", request.url.scheme) - span.set_attribute("server.hostname", request.url.hostname or "") - span.set_attribute("server.port", request.url.port or "") - span.set_attribute("server.is_secure", request.url.is_secure) - span.set_attribute("request.http.method", request.method) - span.set_attribute("request.http.path", request.url.path) - span.set_attribute("request.http.client.host", request.client.host or "" if request.client else "") - span.set_attribute("request.http.client.port", request.client.port or "" if request.client else "") - span.set_attribute("request.http.headers.connection", request.headers.get("connection", "")) - span.set_attribute("request.http.headers.user_agent", request.headers.get("user-agent", "")) - span.set_attribute("request.http.headers.content_type", request.headers.get("content-type", "")) + span.set_attribute(C.CLIENT_ADDRESS, request.client.host or "" if request.client else "") + span.set_attribute(C.CLIENT_PORT, request.client.port or "" if request.client else "") + span.set_attribute(S.SERVER_ADDRESS, request.url.hostname or "") + span.set_attribute(S.SERVER_PORT, request.url.port or "") + span.set_attribute(U.URL_SCHEME, request.url.scheme) + span.set_attribute(U.URL_PATH, request.url.path) + span.set_attribute(H.HTTP_REQUEST_METHOD, request.method) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.connection", request.headers.get("connection", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.user_agent", request.headers.get("user-agent", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.content_type", request.headers.get("content-type", "")) @staticmethod def update_span_with_response(response: Response, span: Span) -> None: if span.is_recording(): - span.set_attribute("response.http.status_code", response.status_code) - span.set_attribute("response.http.media_type", response.media_type or "") - span.set_attribute("response.http.headers.content_type", response.headers.get("content-type", "")) - span.set_attribute("response.http.headers.content_length", response.headers.get("content-length", "")) + span.set_attribute(H.HTTP_RESPONSE_STATUS_CODE, response.status_code) + span.set_attribute(f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_type", response.headers.get("content-type", "")) + span.set_attribute( + f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_length", response.headers.get("content-length", "") + ) diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index ff11c31fc..b5860657e 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -5,11 +5,13 @@ See LICENSE for details """ +from fastapi import Request, Response from karapace.config import KarapaceTelemetry from karapace.container import KarapaceContainer from opentelemetry.sdk.trace.export import SpanProcessor +from opentelemetry.trace.span import Span from schema_registry.telemetry.tracer import Tracer -from unittest.mock import patch +from unittest.mock import call, MagicMock, patch def test_tracer(karapace_container: KarapaceContainer): @@ -55,3 +57,62 @@ def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceCo processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config()) mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) assert processor is mock_simple_span_processor.return_value + + +def test_add_span_attribute(): + span = MagicMock(spec=Span) + + # Test when span is not recording + span.is_recording.return_value = False + Tracer.add_span_attribute(span=span, key="key", value="value") + assert not span.set_attribute.called + + # Test when span is recording + span.is_recording.return_value = True + Tracer.add_span_attribute(span=span, key="key", value="value") + span.set_attribute.assert_called_once_with("key", "value") + + +def test_update_span_with_request(): + span = MagicMock(spec=Span) + span.is_recording.return_value = True + + request = MagicMock(spec=Request) + request.headers = {"content-type": "application/json", "connection": "keep-alive", "user-agent": "pytest"} + request.method = "GET" + request.url = MagicMock(port=8081, scheme="http", path="/test", hostname="server") + request.client = MagicMock(host="client", port=8080) + + Tracer.update_span_with_request(request=request, span=span) + span.set_attribute.assert_has_calls( + [ + call("client.address", "client"), + call("client.port", 8080), + call("server.address", "server"), + call("server.port", 8081), + call("url.scheme", "http"), + call("url.path", "/test"), + call("http.request.method", "GET"), + call("http.request.header.connection", "keep-alive"), + call("http.request.header.user_agent", "pytest"), + call("http.request.header.content_type", "application/json"), + ] + ) + + +def test_update_span_with_response(): + span = MagicMock(spec=Span) + + response = MagicMock(spec=Response) + response.status_code = 200 + response.headers = {"content-type": "application/json", "content-length": 8} + + span.is_recording.return_value = True + Tracer.update_span_with_response(response=response, span=span) + span.set_attribute.assert_has_calls( + [ + call("http.response.status_code", 200), + call("http.response.header.content_type", "application/json"), + call("http.response.header.content_length", 8), + ] + ) From 0d0923d0d8739a28f2f49e83551c1ee7813fdc5c Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 17 Dec 2024 12:15:32 +0100 Subject: [PATCH 6/9] tracer: trace also KafkaAdminClient.describe_topics() --- src/karapace/kafka/admin.py | 12 +++++++++++- src/schema_registry/__main__.py | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index fef52ebf5..0455a712c 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -7,7 +7,7 @@ from collections.abc import Container, Iterable from concurrent.futures import Future -from confluent_kafka import TopicPartition +from confluent_kafka import TopicCollection, TopicPartition from confluent_kafka.admin import ( AdminClient, BrokerMetadata, @@ -20,6 +20,7 @@ TopicMetadata, ) from confluent_kafka.error import KafkaException +from dependency_injector.wiring import inject, Provide from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, @@ -27,6 +28,8 @@ single_futmap_result, UnknownTopicOrPartitionError, ) +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer class KafkaAdminClient(_KafkaConfigMixin, AdminClient): @@ -175,3 +178,10 @@ def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: except KafkaException as exc: raise_from_kafkaexception(exc) return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} + + @inject + def describe_topics( + self, topics: TopicCollection, tracer: Tracer = Provide[TelemetryContainer.tracer] + ) -> dict[str, Future]: + with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.describe_topics)): + return super().describe_topics(topics) diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 20dfc109e..3da295b4d 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -8,6 +8,7 @@ from schema_registry.telemetry.container import TelemetryContainer import karapace.coordinator.master_coordinator +import karapace.kafka.admin import karapace.offset_watcher import schema_registry.controller import schema_registry.factory @@ -44,6 +45,7 @@ schema_registry.reader, karapace.offset_watcher, karapace.coordinator.master_coordinator, + karapace.kafka.admin, ] ) From d004ba61389ca5f90fc78d1a82c1d5dccc42c0fe Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 18 Dec 2024 14:54:46 +0100 Subject: [PATCH 7/9] tracing: inject into __init__.py rather than methods - this still relies on DI wiring, but avoids changing function signatures - API routes are injected and wired as normal --- .../coordinator/master_coordinator.py | 11 +++++---- src/karapace/kafka/admin.py | 12 ++++------ src/karapace/kafka/common.py | 6 +++++ src/karapace/offset_watcher.py | 11 +++++---- src/schema_registry/__main__.py | 4 ++-- src/schema_registry/reader.py | 24 ++++++++++++------- tests/conftest.py | 10 ++++++++ 7 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index ada53c664..a06435d86 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -40,7 +40,8 @@ class MasterCoordinator: 5 milliseconds. """ - def __init__(self, config: Config) -> None: + @inject + def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: super().__init__() self._config: Final = config self._kafka_client: AIOKafkaClient | None = None @@ -49,6 +50,7 @@ def __init__(self, config: Config) -> None: self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None self._schema_reader_stopper: SchemaReaderStoppper | None = None + self.tracer = tracer def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: self._schema_reader_stopper = schema_reader_stopper @@ -149,10 +151,9 @@ def init_schema_coordinator(self) -> SchemaCoordinator: schema_coordinator.start() return schema_coordinator - @inject - def get_coordinator_status(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> SchemaCoordinatorStatus: - with tracer.get_tracer().start_as_current_span( - tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) + def get_coordinator_status(self) -> SchemaCoordinatorStatus: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) ): assert self._sc is not None generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index 0455a712c..a058bedbc 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -20,7 +20,6 @@ TopicMetadata, ) from confluent_kafka.error import KafkaException -from dependency_injector.wiring import inject, Provide from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, @@ -28,8 +27,6 @@ single_futmap_result, UnknownTopicOrPartitionError, ) -from schema_registry.telemetry.container import TelemetryContainer -from schema_registry.telemetry.tracer import Tracer class KafkaAdminClient(_KafkaConfigMixin, AdminClient): @@ -179,9 +176,8 @@ def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: raise_from_kafkaexception(exc) return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} - @inject - def describe_topics( - self, topics: TopicCollection, tracer: Tracer = Provide[TelemetryContainer.tracer] - ) -> dict[str, Future]: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.describe_topics)): + def describe_topics(self, topics: TopicCollection) -> dict[str, Future]: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.describe_topics) + ): return super().describe_topics(topics) diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index c541cbac3..972115667 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -17,6 +17,9 @@ from collections.abc import Callable, Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException +from dependency_injector.wiring import inject, Provide +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer from typing import Any, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -122,14 +125,17 @@ class _KafkaConfigMixin: extract configuration, initialization and connection verification. """ + @inject def __init__( self, bootstrap_servers: Iterable[str] | str, verify_connection: bool = True, + tracer: Tracer = Provide[TelemetryContainer.tracer], **params: Unpack[KafkaClientParams], ) -> None: self._errors: set[KafkaError] = set() self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}") + self.tracer = tracer super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg] self._activate_callbacks() diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 5c64cf12b..83bd32bbb 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -17,15 +17,18 @@ class OffsetWatcher: correct as long as no unclean leader election is performed. """ - def __init__(self) -> None: + @inject + def __init__(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: # Condition used to protected _greatest_offset, any modifications to that object must # be performed with this condition acquired self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. + self.tracer = tracer - @inject - def greatest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.greatest_offset)): + def greatest_offset(self) -> int: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.greatest_offset) + ): return self._greatest_offset def offset_seen(self, new_offset: int) -> None: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 3da295b4d..4a92d9649 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -8,7 +8,7 @@ from schema_registry.telemetry.container import TelemetryContainer import karapace.coordinator.master_coordinator -import karapace.kafka.admin +import karapace.kafka.common import karapace.offset_watcher import schema_registry.controller import schema_registry.factory @@ -45,7 +45,7 @@ schema_registry.reader, karapace.offset_watcher, karapace.coordinator.master_coordinator, - karapace.kafka.admin, + karapace.kafka.common, ] ) diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index 6150a69ec..d0d9e7a5e 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -133,6 +133,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: class KafkaSchemaReader(Thread, SchemaReaderStoppper): + @inject def __init__( self, config: Config, @@ -140,6 +141,7 @@ def __init__( key_formatter: KeyFormatter, database: KarapaceDatabase, master_coordinator: MasterCoordinator | None = None, + tracer: Tracer = Provide[TelemetryContainer.tracer], ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator @@ -154,6 +156,7 @@ def __init__( self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) + self.tracer = tracer # Thread synchronization objects # - offset is used by the REST API to wait until this thread has @@ -278,9 +281,10 @@ def run(self) -> None: self.consecutive_unexpected_errors_start = time.monotonic() LOG.warning("Unexpected exception in schema reader loop - %s", e) - @inject - async def is_healthy(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.is_healthy)): + async def is_healthy(self) -> bool: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.is_healthy) + ): if ( self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS @@ -373,14 +377,16 @@ def _is_ready(self) -> bool: LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) return ready - @inject - def highest_offset(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> int: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.highest_offset)): + def highest_offset(self) -> int: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.highest_offset) + ): return max(self._highest_offset, self._offset_watcher.greatest_offset()) - @inject - def ready(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> bool: - with tracer.get_tracer().start_as_current_span(tracer.get_name_from_caller_with_class(self, self.ready)) as span: + def ready(self) -> bool: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.ready) + ) as span: span.add_event("Acquiring ready lock") with self._ready_lock: return self._ready diff --git a/tests/conftest.py b/tests/conftest.py index e48a2600a..f7724179a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,11 +10,16 @@ from tempfile import mkstemp import json +import karapace.coordinator.master_coordinator +import karapace.kafka.common +import karapace.offset_watcher import os import pytest import re import schema_registry.controller import schema_registry.reader +import schema_registry.telemetry.middleware +import schema_registry.telemetry.setup import schema_registry.telemetry.tracer pytest_plugins = "aiohttp.pytest_plugin" @@ -204,6 +209,11 @@ def fixture_telemetry_container() -> TelemetryContainer: telemetry_container.wire( modules=[ schema_registry.reader, + schema_registry.telemetry.setup, + schema_registry.telemetry.middleware, + karapace.offset_watcher, + karapace.coordinator.master_coordinator, + karapace.kafka.common, ] ) return telemetry_container From 474e914e495ade9866b38fe60234fae4eb64a100 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 18 Dec 2024 15:10:33 +0100 Subject: [PATCH 8/9] ci: extract version from git tags --- .github/workflows/container-smoke-test.yml | 8 +------- .github/workflows/lint.yml | 12 +++++------- .github/workflows/tests.yml | 11 ++++------- src/schema_registry/telemetry/tracer.py | 3 +-- 4 files changed, 11 insertions(+), 23 deletions(-) diff --git a/.github/workflows/container-smoke-test.yml b/.github/workflows/container-smoke-test.yml index 89d6d4a2d..a92bd387f 100644 --- a/.github/workflows/container-smoke-test.yml +++ b/.github/workflows/container-smoke-test.yml @@ -20,14 +20,8 @@ jobs: with: fetch-depth: 0 - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 1de08c44f..e830d6f2c 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -37,19 +37,17 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + # Need fetch-depth 0 to fetch tags, see https://github.com/actions/checkout/issues/701 + with: + fetch-depth: 0 + - uses: actions/setup-python@v5 with: cache: pip python-version: '3.12' - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index df5b5683b..016dff288 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -28,6 +28,9 @@ jobs: --showlocals steps: - uses: actions/checkout@v4 + # Need fetch-depth 0 to fetch tags, see https://github.com/actions/checkout/issues/701 + with: + fetch-depth: 0 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -40,14 +43,8 @@ jobs: with: go-version: '1.21.0' - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py index d23fa0ec4..e905e0edc 100644 --- a/src/schema_registry/telemetry/tracer.py +++ b/src/schema_registry/telemetry/tracer.py @@ -18,7 +18,6 @@ url_attributes as U, ) from opentelemetry.trace.span import Span -from typing import Any import inspect @@ -42,7 +41,7 @@ def get_name_from_caller() -> str: return inspect.stack()[1].function @staticmethod - def get_name_from_caller_with_class(function_class: object, function: Callable[[Any], Any]) -> str: + def get_name_from_caller_with_class(function_class: object, function: Callable) -> str: return f"{type(function_class).__name__}.{function.__name__}()" @staticmethod From b0e4b636bbb64077951979c9c6869a559156e201 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Thu, 19 Dec 2024 11:57:48 +0100 Subject: [PATCH 9/9] di: do not wire core karapace --- src/karapace/coordinator/master_coordinator.py | 7 ++----- src/karapace/kafka/admin.py | 11 +++++++++++ src/karapace/kafka/common.py | 6 ------ src/karapace/offset_watcher.py | 8 +++----- src/schema_registry/__main__.py | 8 -------- src/{karapace => schema_registry}/messaging.py | 0 src/schema_registry/reader.py | 6 +----- src/schema_registry/registry.py | 4 +++- tests/conftest.py | 8 -------- tests/unit/schema_registry/telemetry/test_tracer.py | 5 ++++- tests/unit/schema_registry/test_controller.py | 4 ++++ 11 files changed, 28 insertions(+), 39 deletions(-) rename src/{karapace => schema_registry}/messaging.py (100%) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index a06435d86..9873a9bec 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -10,12 +10,10 @@ from aiokafka.errors import KafkaConnectionError from aiokafka.helpers import create_ssl_context from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest -from dependency_injector.wiring import inject, Provide from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import SchemaReaderStoppper -from schema_registry.telemetry.container import TelemetryContainer from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -40,8 +38,7 @@ class MasterCoordinator: 5 milliseconds. """ - @inject - def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: + def __init__(self, config: Config) -> None: super().__init__() self._config: Final = config self._kafka_client: AIOKafkaClient | None = None @@ -50,7 +47,7 @@ def __init__(self, config: Config, tracer: Tracer = Provide[TelemetryContainer.t self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None self._schema_reader_stopper: SchemaReaderStoppper | None = None - self.tracer = tracer + self.tracer = Tracer() def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: self._schema_reader_stopper = schema_reader_stopper diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index a058bedbc..165dab1a1 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -23,13 +23,24 @@ from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, + KafkaClientParams, raise_from_kafkaexception, single_futmap_result, UnknownTopicOrPartitionError, ) +from schema_registry.telemetry.tracer import Tracer +from typing_extensions import Unpack class KafkaAdminClient(_KafkaConfigMixin, AdminClient): + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + **params: Unpack[KafkaClientParams], + ) -> None: + self.tracer = Tracer() + super().__init__(bootstrap_servers, **params) + def new_topic( self, name: str, diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index 972115667..c541cbac3 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -17,9 +17,6 @@ from collections.abc import Callable, Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException -from dependency_injector.wiring import inject, Provide -from schema_registry.telemetry.container import TelemetryContainer -from schema_registry.telemetry.tracer import Tracer from typing import Any, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -125,17 +122,14 @@ class _KafkaConfigMixin: extract configuration, initialization and connection verification. """ - @inject def __init__( self, bootstrap_servers: Iterable[str] | str, verify_connection: bool = True, - tracer: Tracer = Provide[TelemetryContainer.tracer], **params: Unpack[KafkaClientParams], ) -> None: self._errors: set[KafkaError] = set() self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}") - self.tracer = tracer super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg] self._activate_callbacks() diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 83bd32bbb..3baad5f11 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -4,8 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from dependency_injector.wiring import inject, Provide -from schema_registry.telemetry.container import TelemetryContainer + from schema_registry.telemetry.tracer import Tracer from threading import Condition @@ -17,13 +16,12 @@ class OffsetWatcher: correct as long as no unclean leader election is performed. """ - @inject - def __init__(self, tracer: Tracer = Provide[TelemetryContainer.tracer]) -> None: + def __init__(self) -> None: # Condition used to protected _greatest_offset, any modifications to that object must # be performed with this condition acquired self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. - self.tracer = tracer + self.tracer = Tracer() def greatest_offset(self) -> int: with self.tracer.get_tracer().start_as_current_span( diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 4a92d9649..f8b5684c6 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -7,12 +7,8 @@ from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan from schema_registry.telemetry.container import TelemetryContainer -import karapace.coordinator.master_coordinator -import karapace.kafka.common -import karapace.offset_watcher import schema_registry.controller import schema_registry.factory -import schema_registry.reader import schema_registry.routers.compatibility import schema_registry.routers.config import schema_registry.routers.health @@ -42,10 +38,6 @@ modules=[ schema_registry.telemetry.setup, schema_registry.telemetry.middleware, - schema_registry.reader, - karapace.offset_watcher, - karapace.coordinator.master_coordinator, - karapace.kafka.common, ] ) diff --git a/src/karapace/messaging.py b/src/schema_registry/messaging.py similarity index 100% rename from src/karapace/messaging.py rename to src/schema_registry/messaging.py diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index d0d9e7a5e..5ddcfa100 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -24,7 +24,6 @@ from collections.abc import Mapping, Sequence from confluent_kafka import Message, TopicCollection, TopicPartition from contextlib import closing, ExitStack -from dependency_injector.wiring import inject, Provide from enum import Enum from jsonschema.validators import Draft7Validator from karapace import constants @@ -46,7 +45,6 @@ from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown -from schema_registry.telemetry.container import TelemetryContainer from schema_registry.telemetry.tracer import Tracer from threading import Event, Lock, Thread from typing import Final @@ -133,7 +131,6 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: class KafkaSchemaReader(Thread, SchemaReaderStoppper): - @inject def __init__( self, config: Config, @@ -141,7 +138,6 @@ def __init__( key_formatter: KeyFormatter, database: KarapaceDatabase, master_coordinator: MasterCoordinator | None = None, - tracer: Tracer = Provide[TelemetryContainer.tracer], ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator @@ -156,7 +152,7 @@ def __init__( self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) - self.tracer = tracer + self.tracer = Tracer() # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/src/schema_registry/registry.py b/src/schema_registry/registry.py index 9b694f768..7f1ccb0c8 100644 --- a/src/schema_registry/registry.py +++ b/src/schema_registry/registry.py @@ -26,12 +26,13 @@ ) from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter -from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version +from schema_registry.messaging import KarapaceProducer from schema_registry.reader import KafkaSchemaReader +from schema_registry.telemetry.tracer import Tracer import asyncio import logging @@ -43,6 +44,7 @@ class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: # TODO: compatibility was previously in mutable dict, fix the runtime config to be distinct from static config. self.config = config + self.tracer = Tracer() self._key_formatter = KeyFormatter() offset_watcher = OffsetWatcher() diff --git a/tests/conftest.py b/tests/conftest.py index f7724179a..5b65c9405 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,14 +10,10 @@ from tempfile import mkstemp import json -import karapace.coordinator.master_coordinator -import karapace.kafka.common -import karapace.offset_watcher import os import pytest import re import schema_registry.controller -import schema_registry.reader import schema_registry.telemetry.middleware import schema_registry.telemetry.setup import schema_registry.telemetry.tracer @@ -208,12 +204,8 @@ def fixture_telemetry_container() -> TelemetryContainer: telemetry_container = TelemetryContainer() telemetry_container.wire( modules=[ - schema_registry.reader, schema_registry.telemetry.setup, schema_registry.telemetry.middleware, - karapace.offset_watcher, - karapace.coordinator.master_coordinator, - karapace.kafka.common, ] ) return telemetry_container diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py index b5860657e..f1edabde2 100644 --- a/tests/unit/schema_registry/telemetry/test_tracer.py +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -50,11 +50,14 @@ def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceConta def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} + ) with ( patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter, patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor, ): - processor: SpanProcessor = Tracer.get_span_processor(config=karapace_container.config()) + processor: SpanProcessor = Tracer.get_span_processor(config=config) mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) assert processor is mock_simple_span_processor.return_value diff --git a/tests/unit/schema_registry/test_controller.py b/tests/unit/schema_registry/test_controller.py index 0b1ccbf89..4c4b0b16d 100644 --- a/tests/unit/schema_registry/test_controller.py +++ b/tests/unit/schema_registry/test_controller.py @@ -77,3 +77,7 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC user=None, authorizer=None, ) + with pytest.raises(HTTPResponse): + # prevent `future exception was never retrieved` warning logs + # future: + await mock_forward_func_future