diff --git a/.github/workflows/container-smoke-test.yml b/.github/workflows/container-smoke-test.yml index 89d6d4a2d..a92bd387f 100644 --- a/.github/workflows/container-smoke-test.yml +++ b/.github/workflows/container-smoke-test.yml @@ -20,14 +20,8 @@ jobs: with: fetch-depth: 0 - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 1de08c44f..e830d6f2c 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -37,19 +37,17 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + # Need fetch-depth 0 to fetch tags, see https://github.com/actions/checkout/issues/701 + with: + fetch-depth: 0 + - uses: actions/setup-python@v5 with: cache: pip python-version: '3.12' - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index df5b5683b..016dff288 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -28,6 +28,9 @@ jobs: --showlocals steps: - uses: actions/checkout@v4 + # Need fetch-depth 0 to fetch tags, see https://github.com/actions/checkout/issues/701 + with: + fetch-depth: 0 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -40,14 +43,8 @@ jobs: with: go-version: '1.21.0' - - name: Install requirements - run: make install-dev - - name: Resolve Karapace version - run: | - source ./venv/bin/activate - KARAPACE_VERSION=$(python -c "from karapace import version; print(version.__version__)") - echo KARAPACE_VERSION=$KARAPACE_VERSION >> $GITHUB_ENV + run: echo KARAPACE_VERSION="$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g')" >> $GITHUB_ENV - run: echo "RUNNER_UID=$(id -u)" >> $GITHUB_ENV - run: echo "RUNNER_GID=$(id -g)" >> $GITHUB_ENV diff --git a/container/compose.yml b/container/compose.yml index 0407f38c1..c42d4b428 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -1,5 +1,3 @@ ---- -version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest @@ -67,6 +65,7 @@ services: - schema_registry depends_on: - kafka + - opentelemetry-collector ports: - 8081:8081 environment: @@ -85,6 +84,13 @@ services: KARAPACE_STATSD_PORT: 8125 KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + KARAPACE_TAGS__APP: karapace-schema-registry + KARAPACE_TELEMETRY__OTEL_ENDPOINT_URL: http://opentelemetry-collector:4317 + KARAPACE_TELEMETRY__RESOURCE_SERVICE_NAME: karapace-schema-registry + KARAPACE_TELEMETRY__RESOURCE_SERVICE_INSTANCE_ID: sr1 + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_NAME: opentelemetry + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_LANGUAGE: python + KARAPACE_TELEMETRY__RESOURCE_TELEMETRY_SDK_VERSION: 1.27.0 karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop @@ -133,17 +139,17 @@ services: - karapace-schema-registry - karapace-rest-proxy volumes: - - ../tests:/opt/karapace/tests - - ../pytest.ini:/opt/karapace/pytest.ini - - ../mypy.ini:/opt/karapace/mypy.ini - - ../.flake8:/opt/karapace/.flake8 - - ../.isort.cfg:/opt/karapace/.isort.cfg - - ../.pre-commit-config.yaml:/opt/karapace/.pre-commit-config.yaml - - ../.pylintrc:/opt/karapace/.pylintrc - - ../.coveragerc:/opt/karapace/.coveragerc - - ../.coverage.3.10:/opt/karapace/coverage/.coverage.3.10 - - ../.coverage.3.11:/opt/karapace/coverage/.coverage.3.11 - - ../.coverage.3.12:/opt/karapace/coverage/.coverage.3.12 + - ../tests:/opt/karapace/tests + - ../pytest.ini:/opt/karapace/pytest.ini + - ../mypy.ini:/opt/karapace/mypy.ini + - ../.flake8:/opt/karapace/.flake8 + - ../.isort.cfg:/opt/karapace/.isort.cfg + - ../.pre-commit-config.yaml:/opt/karapace/.pre-commit-config.yaml + - ../.pylintrc:/opt/karapace/.pylintrc + - ../.coveragerc:/opt/karapace/.coveragerc + - ../.coverage.3.10:/opt/karapace/coverage/.coverage.3.10 + - ../.coverage.3.11:/opt/karapace/coverage/.coverage.3.11 + - ../.coverage.3.12:/opt/karapace/coverage/.coverage.3.12 environment: - COVERAGE_FILE - COVERAGE_RCFILE=/opt/karapace/.coveragerc @@ -151,26 +157,47 @@ services: prometheus: image: prom/prometheus volumes: - - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml - - ./prometheus/rules.yml:/etc/prometheus/rules.yml + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - ./prometheus/rules.yml:/etc/prometheus/rules.yml + depends_on: + - karapace-schema-registry + - karapace-rest-proxy + - opentelemetry-collector ports: - 9090:9090 grafana: - image: grafana/grafana - environment: - GF_SECURITY_ADMIN_USER: karapace - GF_SECURITY_ADMIN_PASSWORD: karapace - GF_PATHS_PROVISIONING: /grafana/provisioning - ports: - - 3000:3000 - volumes: - - ./grafana/dashboards:/grafana/dashboards - - ./grafana/provisioning:/grafana/provisioning + image: grafana/grafana + environment: + GF_SECURITY_ADMIN_USER: karapace + GF_SECURITY_ADMIN_PASSWORD: karapace + GF_PATHS_PROVISIONING: /grafana/provisioning + ports: + - 3000:3000 + volumes: + - ./grafana/dashboards:/grafana/dashboards + - ./grafana/provisioning:/grafana/provisioning statsd-exporter: - image: prom/statsd-exporter - command: "--statsd.listen-udp=:8125 --web.listen-address=:9102" - ports: - - 9102:9102 - - 8125:8125/udp + image: prom/statsd-exporter + command: --statsd.listen-udp=:8125 --web.listen-address=:9102 + ports: + - 9102:9102 + - 8125:8125/udp + + opentelemetry-collector: + image: otel/opentelemetry-collector-contrib:latest + command: --config=/etc/collector-config.yaml + volumes: + - ./opentelemetry/collector-config.yaml:/etc/collector-config.yaml + ports: # 4317=OTLP-gRPC-receiver | 8888=prom-collector-metrics | 8889=prom-exporter-metrics + - 4317:4317 + - 8888:8888 + - 8889:8889 + + jaeger: + image: jaegertracing/all-in-one:latest + ports: # 6831=agent | 16686=UI | 14268=spans | 4317=metrics (not exposing, clashes with opentelemetry-collector) + - 6831:6831/udp + - 16686:16686 + - 14268:14268 diff --git a/container/opentelemetry/collector-config.yaml b/container/opentelemetry/collector-config.yaml new file mode 100644 index 000000000..3fcd0df0c --- /dev/null +++ b/container/opentelemetry/collector-config.yaml @@ -0,0 +1,29 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: opentelemetry-collector:4317 + +processors: + +extensions: + health_check: {} + +exporters: + otlp: + endpoint: jaeger:4317 + tls: + insecure: true + otlphttp/prometheus: + endpoint: prometheus:9090/api/v1/otlp + tls: + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlp] + metrics: + receivers: [otlp] + exporters: [otlphttp/prometheus] diff --git a/container/prometheus/prometheus.yml b/container/prometheus/prometheus.yml index f62e8082a..68b64109f 100644 --- a/container/prometheus/prometheus.yml +++ b/container/prometheus/prometheus.yml @@ -27,3 +27,8 @@ scrape_configs: static_configs: - targets: - statsd-exporter:9102 + + - job_name: opentelemetry-collector + static_configs: + - targets: + - opentelemetry-collector:8888 diff --git a/mypy.ini b/mypy.ini index 5f94f8bc2..73b0e34c5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,7 +15,7 @@ warn_no_return = True warn_unreachable = True strict_equality = True -[mypy-schema_registry.schema_registry_apis] +[mypy-schema_registry.controller] ignore_errors = True [mypy-karapace.compatibility.jsonschema.checks] diff --git a/pyproject.toml b/pyproject.toml index cd9834ff1..729d0e829 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,9 +34,9 @@ dependencies = [ "zstandard", "prometheus-client == 0.20.0", "yarl == 1.12.1", - "opentelemetry-api == 1.28.2", - "opentelemetry-sdk == 1.28.2", - "opentelemetry-instrumentation-fastapi == 0.49b2", + "opentelemetry-api == 1.27.0", + "opentelemetry-sdk == 1.27.0", + "opentelemetry-exporter-otlp == 1.27.0", "dependency-injector == 4.43.0", # Patched dependencies diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 00c6d4d4c..d4b526cec 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # hypothesis @@ -42,7 +40,7 @@ brotli==1.1.0 # via geventhttpclient cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # geventhttpclient # httpcore @@ -63,13 +61,15 @@ confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) coverage[toml]==7.6.9 # via pytest-cov -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -86,7 +86,7 @@ fancycompleter==0.9.1 # via pdbpp fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi filelock==3.16.1 # via karapace (/karapace/pyproject.toml) @@ -109,8 +109,14 @@ gevent==24.11.1 # locust geventhttpclient==2.3.3 # via locust +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http greenlet==3.1.1 # via gevent +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -130,7 +136,7 @@ idna==3.10 # httpx # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api iniconfig==2.0.0 # via pytest @@ -166,38 +172,38 @@ multidict==6.1.0 # yarl networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 + # via + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk packaging==24.2 # via # aiokafka - # opentelemetry-instrumentation # pytest pdbpp==0.10.3 # via karapace (/karapace/pyproject.toml) @@ -206,7 +212,10 @@ pluggy==1.5.0 prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto psutil==6.1.0 # via # karapace (/karapace/pyproject.toml) @@ -266,6 +275,7 @@ requests==2.32.3 # via # karapace (/karapace/pyproject.toml) # locust + # opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -303,7 +313,6 @@ typer==0.15.1 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # locust @@ -321,7 +330,7 @@ urllib3==2.2.3 # geventhttpclient # requests # sentry-sdk -uvicorn[standard]==0.32.1 +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -341,9 +350,7 @@ werkzeug==3.1.3 wmctrl==0.5 # via pdbpp wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index bcda10c84..288204908 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # jsonschema @@ -36,11 +34,14 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via karapace (/karapace/pyproject.toml) cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # httpcore # httpx + # requests # sentry-sdk +charset-normalizer==3.4.0 + # via requests click==8.1.7 # via # rich-toolkit @@ -48,13 +49,15 @@ click==8.1.7 # uvicorn confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -64,12 +67,18 @@ exceptiongroup==1.2.2 # via anyio fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi frozenlist==1.5.0 # via # aiohttp # aiosignal +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -85,8 +94,9 @@ idna==3.10 # anyio # email-validator # httpx + # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api isodate==0.7.2 # via karapace (/karapace/pyproject.toml) @@ -114,42 +124,44 @@ mypy-extensions==1.0.0 # via mypy networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -packaging==24.2 + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # aiokafka - # opentelemetry-instrumentation + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk +packaging==24.2 + # via aiokafka prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto pydantic==2.10.2 # via # fastapi @@ -180,6 +192,8 @@ referencing==0.35.1 # jsonschema # jsonschema-specifications # types-jsonschema +requests==2.32.3 + # via opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -218,7 +232,6 @@ types-protobuf==3.20.4.6 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # multidict @@ -232,8 +245,10 @@ typing-extensions==4.12.2 ujson==5.10.0 # via karapace (/karapace/pyproject.toml) urllib3==2.2.3 - # via sentry-sdk -uvicorn[standard]==0.32.1 + # via + # requests + # sentry-sdk +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -246,9 +261,7 @@ watchfiles==0.24.0 websockets==14.1 # via uvicorn wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index edf3064a4..2b11e8010 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,7 +12,7 @@ aiohttp==3.10.11 # via karapace (/karapace/pyproject.toml) aiokafka==0.10.0 # via karapace (/karapace/pyproject.toml) -aiosignal==1.3.1 +aiosignal==1.3.2 # via aiohttp annotated-types==0.7.0 # via pydantic @@ -21,13 +21,11 @@ anyio==4.7.0 # httpx # starlette # watchfiles -asgiref==3.8.1 - # via opentelemetry-instrumentation-asgi async-timeout==5.0.1 # via # aiohttp # aiokafka -attrs==24.2.0 +attrs==24.3.0 # via # aiohttp # jsonschema @@ -36,10 +34,13 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via karapace (/karapace/pyproject.toml) cachetools==5.3.3 # via karapace (/karapace/pyproject.toml) -certifi==2024.8.30 +certifi==2024.12.14 # via # httpcore # httpx + # requests +charset-normalizer==3.4.0 + # via requests click==8.1.7 # via # rich-toolkit @@ -47,13 +48,15 @@ click==8.1.7 # uvicorn confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) -cramjam==2.9.0 +cramjam==2.9.1 # via python-snappy dependency-injector==4.43.0 # via karapace (/karapace/pyproject.toml) deprecated==1.2.15 # via # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions dnspython==2.7.0 # via email-validator @@ -63,12 +66,18 @@ exceptiongroup==1.2.2 # via anyio fastapi[standard]==0.115.5 # via karapace (/karapace/pyproject.toml) -fastapi-cli[standard]==0.0.6 +fastapi-cli[standard]==0.0.7 # via fastapi frozenlist==1.5.0 # via # aiohttp # aiosignal +googleapis-common-protos==1.66.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +grpcio==1.68.1 + # via opentelemetry-exporter-otlp-proto-grpc h11==0.14.0 # via # httpcore @@ -84,8 +93,9 @@ idna==3.10 # anyio # email-validator # httpx + # requests # yarl -importlib-metadata==8.5.0 +importlib-metadata==8.4.0 # via opentelemetry-api isodate==0.7.2 # via karapace (/karapace/pyproject.toml) @@ -109,42 +119,44 @@ multidict==6.1.0 # yarl networkx==3.4.2 # via karapace (/karapace/pyproject.toml) -opentelemetry-api==1.28.2 +opentelemetry-api==1.27.0 # via # karapace (/karapace/pyproject.toml) - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-instrumentation==0.49b2 - # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-asgi==0.49b2 - # via opentelemetry-instrumentation-fastapi -opentelemetry-instrumentation-fastapi==0.49b2 - # via karapace (/karapace/pyproject.toml) -opentelemetry-sdk==1.28.2 +opentelemetry-exporter-otlp==1.27.0 # via karapace (/karapace/pyproject.toml) -opentelemetry-semantic-conventions==0.49b2 +opentelemetry-exporter-otlp-proto-common==1.27.0 # via - # opentelemetry-instrumentation - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi - # opentelemetry-sdk -opentelemetry-util-http==0.49b2 + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.27.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.27.0 # via - # opentelemetry-instrumentation-asgi - # opentelemetry-instrumentation-fastapi -packaging==24.2 + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.27.0 # via - # aiokafka - # opentelemetry-instrumentation + # karapace (/karapace/pyproject.toml) + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.48b0 + # via opentelemetry-sdk +packaging==24.2 + # via aiokafka prometheus-client==0.20.0 # via karapace (/karapace/pyproject.toml) protobuf==3.20.3 - # via karapace (/karapace/pyproject.toml) + # via + # googleapis-common-protos + # karapace (/karapace/pyproject.toml) + # opentelemetry-proto pydantic==2.10.2 # via # fastapi @@ -174,6 +186,8 @@ referencing==0.35.1 # via # jsonschema # jsonschema-specifications +requests==2.32.3 + # via opentelemetry-exporter-otlp-proto-http rich==13.7.1 # via # karapace (/karapace/pyproject.toml) @@ -202,7 +216,6 @@ typer==0.15.1 typing-extensions==4.12.2 # via # anyio - # asgiref # fastapi # karapace (/karapace/pyproject.toml) # multidict @@ -214,7 +227,9 @@ typing-extensions==4.12.2 # uvicorn ujson==5.10.0 # via karapace (/karapace/pyproject.toml) -uvicorn[standard]==0.32.1 +urllib3==2.2.3 + # via requests +uvicorn[standard]==0.34.0 # via # fastapi # fastapi-cli @@ -227,9 +242,7 @@ watchfiles==0.24.0 websockets==14.1 # via uvicorn wrapt==1.17.0 - # via - # deprecated - # opentelemetry-instrumentation + # via deprecated xxhash==3.5.0 # via karapace (/karapace/pyproject.toml) yarl==1.12.1 diff --git a/src/karapace/config.py b/src/karapace/config.py index 332363d46..0dd811d92 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -27,8 +27,17 @@ class KarapaceTags(BaseModel): app: str = "Karapace" +class KarapaceTelemetry(BaseModel): + otel_endpoint_url: str | None = None + resource_service_name: str = "karapace" + resource_service_instance_id: str = "karapace" + resource_telemetry_sdk_name: str = "opentelemetry" + resource_telemetry_sdk_language: str = "python" + resource_telemetry_sdk_version: str = "1.27.0" + + class Config(BaseSettings): - model_config = SettingsConfigDict(env_prefix="karapace_", env_ignore_empty=True) + model_config = SettingsConfigDict(env_prefix="karapace_", env_ignore_empty=True, env_nested_delimiter="__") access_logs_debug: bool = False access_log_class: ImportString = "karapace.utils.DebugAccessLogger" @@ -101,6 +110,7 @@ class Config(BaseSettings): sentry: Mapping[str, object] | None = None tags: KarapaceTags = KarapaceTags() + telemetry: KarapaceTelemetry = KarapaceTelemetry() # add rest uri if not set # f"{new_config['advertised_protocol']}://{new_config['advertised_hostname']}:{new_config['advertised_port']}" diff --git a/src/karapace/container.py b/src/karapace/container.py index 951956bf2..64dc6e285 100644 --- a/src/karapace/container.py +++ b/src/karapace/container.py @@ -8,7 +8,6 @@ from karapace.config import Config from karapace.forward_client import ForwardClient from karapace.instrumentation.prometheus import PrometheusInstrumentation -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient @@ -21,8 +20,6 @@ class KarapaceContainer(containers.DeclarativeContainer): http_authorizer = providers.Singleton(HTTPAuthorizer, auth_file=config().registry_authfile) - schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=config) - forward_client = providers.Singleton(ForwardClient) authorizer = providers.Factory( diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 9b4f8181c..9873a9bec 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -14,6 +14,7 @@ from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import SchemaReaderStoppper +from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -46,6 +47,7 @@ def __init__(self, config: Config) -> None: self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None self._schema_reader_stopper: SchemaReaderStoppper | None = None + self.tracer = Tracer() def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: self._schema_reader_stopper = schema_reader_stopper @@ -147,15 +149,18 @@ def init_schema_coordinator(self) -> SchemaCoordinator: return schema_coordinator def get_coordinator_status(self) -> SchemaCoordinatorStatus: - assert self._sc is not None - generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID - return SchemaCoordinatorStatus( - is_primary=self._sc.are_we_master() if self._sc is not None else None, - is_primary_eligible=self._config.master_eligibility, - primary_url=self._sc.master_url if self._sc is not None else None, - is_running=True, - group_generation_id=generation if generation is not None else -1, - ) + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.get_coordinator_status) + ): + assert self._sc is not None + generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID + return SchemaCoordinatorStatus( + is_primary=self._sc.are_we_master() if self._sc is not None else None, + is_primary_eligible=self._config.master_eligibility, + primary_url=self._sc.master_url if self._sc is not None else None, + is_running=True, + group_generation_id=generation if generation is not None else -1, + ) def get_master_info(self) -> tuple[bool | None, str | None]: """Return whether we're the master, and the actual master url that can be used if we're not""" diff --git a/src/karapace/kafka/admin.py b/src/karapace/kafka/admin.py index fef52ebf5..165dab1a1 100644 --- a/src/karapace/kafka/admin.py +++ b/src/karapace/kafka/admin.py @@ -7,7 +7,7 @@ from collections.abc import Container, Iterable from concurrent.futures import Future -from confluent_kafka import TopicPartition +from confluent_kafka import TopicCollection, TopicPartition from confluent_kafka.admin import ( AdminClient, BrokerMetadata, @@ -23,13 +23,24 @@ from karapace.constants import TOPIC_CREATION_TIMEOUT_S from karapace.kafka.common import ( _KafkaConfigMixin, + KafkaClientParams, raise_from_kafkaexception, single_futmap_result, UnknownTopicOrPartitionError, ) +from schema_registry.telemetry.tracer import Tracer +from typing_extensions import Unpack class KafkaAdminClient(_KafkaConfigMixin, AdminClient): + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + **params: Unpack[KafkaClientParams], + ) -> None: + self.tracer = Tracer() + super().__init__(bootstrap_servers, **params) + def new_topic( self, name: str, @@ -175,3 +186,9 @@ def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: except KafkaException as exc: raise_from_kafkaexception(exc) return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} + + def describe_topics(self, topics: TopicCollection) -> dict[str, Future]: + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.describe_topics) + ): + return super().describe_topics(topics) diff --git a/src/karapace/offset_watcher.py b/src/karapace/offset_watcher.py index 6056d5f37..3baad5f11 100644 --- a/src/karapace/offset_watcher.py +++ b/src/karapace/offset_watcher.py @@ -4,6 +4,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ + +from schema_registry.telemetry.tracer import Tracer from threading import Condition @@ -19,9 +21,13 @@ def __init__(self) -> None: # be performed with this condition acquired self._condition = Condition() self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever. + self.tracer = Tracer() def greatest_offset(self) -> int: - return self._greatest_offset + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.greatest_offset) + ): + return self._greatest_offset def offset_seen(self, new_offset: int) -> None: with self._condition: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 7ff513584..f8b5684c6 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -5,7 +5,9 @@ from karapace.container import KarapaceContainer from schema_registry.container import SchemaRegistryContainer from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan +from schema_registry.telemetry.container import TelemetryContainer +import schema_registry.controller import schema_registry.factory import schema_registry.routers.compatibility import schema_registry.routers.config @@ -15,20 +17,33 @@ import schema_registry.routers.mode import schema_registry.routers.schemas import schema_registry.routers.subjects -import schema_registry.schema_registry_apis +import schema_registry.telemetry.middleware +import schema_registry.telemetry.setup +import schema_registry.telemetry.tracer import schema_registry.user import uvicorn if __name__ == "__main__": - container = KarapaceContainer() - container.wire( + karapace_container = KarapaceContainer() + karapace_container.wire( modules=[ __name__, - schema_registry.schema_registry_apis, + schema_registry.controller, + schema_registry.telemetry.tracer, ] ) - schema_registry_container = SchemaRegistryContainer(karapace_container=container) + telemetry_container = TelemetryContainer(karapace_container=karapace_container) + telemetry_container.wire( + modules=[ + schema_registry.telemetry.setup, + schema_registry.telemetry.middleware, + ] + ) + + schema_registry_container = SchemaRegistryContainer( + karapace_container=karapace_container, telemetry_container=telemetry_container + ) schema_registry_container.wire( modules=[ __name__, @@ -45,7 +60,6 @@ ] ) - app = create_karapace_application(config=container.config(), lifespan=karapace_schema_registry_lifespan) - uvicorn.run( - app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower() - ) + config = karapace_container.config() + app = create_karapace_application(config=config, lifespan=karapace_schema_registry_lifespan) + uvicorn.run(app, host=config.host, port=config.port, log_level=config.log_level.lower()) diff --git a/src/schema_registry/container.py b/src/schema_registry/container.py index b93bc4139..21bb53c13 100644 --- a/src/schema_registry/container.py +++ b/src/schema_registry/container.py @@ -5,14 +5,20 @@ from dependency_injector import containers, providers from karapace.container import KarapaceContainer -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.telemetry.container import TelemetryContainer class SchemaRegistryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) + telemetry_container = providers.Container(TelemetryContainer) + + schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=karapace_container.config) + schema_registry_controller = providers.Singleton( KarapaceSchemaRegistryController, config=karapace_container.config, - schema_registry=karapace_container.schema_registry, + schema_registry=schema_registry, stats=karapace_container.statsd, ) diff --git a/src/schema_registry/schema_registry_apis.py b/src/schema_registry/controller.py similarity index 99% rename from src/schema_registry/schema_registry_apis.py rename to src/schema_registry/controller.py index 56cd567b1..94c7d6540 100644 --- a/src/schema_registry/schema_registry_apis.py +++ b/src/schema_registry/controller.py @@ -33,10 +33,10 @@ from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.utils import JSONDecodeError +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, SchemaErrorCodes, SchemaErrorMessages from schema_registry.routers.requests import ( CompatibilityCheckResponse, diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py index 4ef678841..b02c131c6 100644 --- a/src/schema_registry/factory.py +++ b/src/schema_registry/factory.py @@ -11,13 +11,13 @@ from karapace.config import Config from karapace.forward_client import ForwardClient from karapace.logging_setup import configure_logging, log_config_without_secrets -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.statsd import StatsClient -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from schema_registry.container import SchemaRegistryContainer from schema_registry.http_handlers import setup_exception_handlers from schema_registry.middlewares import setup_middlewares +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.setup import setup_routers +from schema_registry.telemetry.setup import setup_tracing from typing import AsyncContextManager import logging @@ -29,7 +29,7 @@ async def karapace_schema_registry_lifespan( _: FastAPI, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), ) -> AsyncGenerator[None, None]: try: @@ -57,10 +57,10 @@ def create_karapace_application( logging.info("Starting Karapace Schema Registry (%s)", karapace_version.__version__) app = FastAPI(lifespan=lifespan) # type: ignore[arg-type] + + setup_tracing() setup_routers(app=app) setup_exception_handlers(app=app) setup_middlewares(app=app) - FastAPIInstrumentor.instrument_app(app) - return app diff --git a/src/karapace/messaging.py b/src/schema_registry/messaging.py similarity index 100% rename from src/karapace/messaging.py rename to src/schema_registry/messaging.py diff --git a/src/schema_registry/middlewares/__init__.py b/src/schema_registry/middlewares/__init__.py index 8df42b04c..d86fbf40e 100644 --- a/src/schema_registry/middlewares/__init__.py +++ b/src/schema_registry/middlewares/__init__.py @@ -7,6 +7,7 @@ from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse from karapace.content_type import check_schema_headers +from schema_registry.telemetry.middleware import setup_telemetry_middleware def setup_middlewares(app: FastAPI) -> None: @@ -32,3 +33,5 @@ async def set_content_types(request: Request, call_next: Callable[[Request], Awa response = await call_next(request) response.headers["Content-Type"] = response_content_type return response + + setup_telemetry_middleware(app=app) diff --git a/src/karapace/schema_reader.py b/src/schema_registry/reader.py similarity index 94% rename from src/karapace/schema_reader.py rename to src/schema_registry/reader.py index bb3bea067..5ddcfa100 100644 --- a/src/karapace/schema_reader.py +++ b/src/schema_registry/reader.py @@ -45,6 +45,7 @@ from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown +from schema_registry.telemetry.tracer import Tracer from threading import Event, Lock, Thread from typing import Final @@ -151,6 +152,7 @@ def __init__( self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) + self.tracer = Tracer() # Thread synchronization objects # - offset is used by the REST API to wait until this thread has @@ -276,28 +278,33 @@ def run(self) -> None: LOG.warning("Unexpected exception in schema reader loop - %s", e) async def is_healthy(self) -> bool: - if ( - self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS - and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.is_healthy) ): - LOG.warning( - "Health check failed with %s consecutive errors in %s seconds", self.consecutive_unexpected_errors, duration - ) - return False - - try: - # Explicitly check if topic exists. - # This needs to be done because in case of missing topic the consumer will not repeat the error - # on conscutive consume calls and instead will return empty list. - assert self.admin_client is not None - topic = self.config.topic_name - res = self.admin_client.describe_topics(TopicCollection([topic])) - await asyncio.wrap_future(res[topic]) - except Exception as e: # pylint: disable=broad-except - LOG.warning("Health check failed with %r", e) - return False + if ( + self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS + and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS + ): + LOG.warning( + "Health check failed with %s consecutive errors in %s seconds", + self.consecutive_unexpected_errors, + duration, + ) + return False - return True + try: + # Explicitly check if topic exists. + # This needs to be done because in case of missing topic the consumer will not repeat the error + # on conscutive consume calls and instead will return empty list. + assert self.admin_client is not None + topic = self.config.topic_name + res = self.admin_client.describe_topics(TopicCollection([topic])) + await asyncio.wrap_future(res[topic]) + except Exception as e: # pylint: disable=broad-except + LOG.warning("Health check failed with %r", e) + return False + + return True def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -367,11 +374,18 @@ def _is_ready(self) -> bool: return ready def highest_offset(self) -> int: - return max(self._highest_offset, self._offset_watcher.greatest_offset()) + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.highest_offset) + ): + return max(self._highest_offset, self._offset_watcher.greatest_offset()) def ready(self) -> bool: - with self._ready_lock: - return self._ready + with self.tracer.get_tracer().start_as_current_span( + self.tracer.get_name_from_caller_with_class(self, self.ready) + ) as span: + span.add_event("Acquiring ready lock") + with self._ready_lock: + return self._ready def set_not_ready(self) -> None: with self._ready_lock: diff --git a/src/karapace/schema_registry.py b/src/schema_registry/registry.py similarity index 99% rename from src/karapace/schema_registry.py rename to src/schema_registry/registry.py index a93bfa0ce..7f1ccb0c8 100644 --- a/src/karapace/schema_registry.py +++ b/src/schema_registry/registry.py @@ -26,12 +26,13 @@ ) from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter -from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner -from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version +from schema_registry.messaging import KarapaceProducer +from schema_registry.reader import KafkaSchemaReader +from schema_registry.telemetry.tracer import Tracer import asyncio import logging @@ -43,6 +44,7 @@ class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: # TODO: compatibility was previously in mutable dict, fix the runtime config to be distinct from static config. self.config = config + self.tracer = Tracer() self._key_formatter = KeyFormatter() offset_watcher = OffsetWatcher() diff --git a/src/schema_registry/routers/compatibility.py b/src/schema_registry/routers/compatibility.py index 0e91e3625..109df4e95 100644 --- a/src/schema_registry/routers/compatibility.py +++ b/src/schema_registry/routers/compatibility.py @@ -8,9 +8,9 @@ from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/config.py b/src/schema_registry/routers/config.py index 1c95ac046..3d1884af6 100644 --- a/src/schema_registry/routers/config.py +++ b/src/schema_registry/routers/config.py @@ -7,12 +7,12 @@ from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated @@ -42,7 +42,7 @@ async def config_put( request: Request, compatibility_level_request: CompatibilityRequest, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), @@ -82,7 +82,7 @@ async def config_set_subject( subject: Subject, compatibility_level_request: CompatibilityRequest, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), @@ -106,7 +106,7 @@ async def config_delete_subject( request: Request, subject: Subject, user: Annotated[User, Depends(get_current_user)], - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), diff --git a/src/schema_registry/routers/health.py b/src/schema_registry/routers/health.py index b02d2f760..f41fa13cb 100644 --- a/src/schema_registry/routers/health.py +++ b/src/schema_registry/routers/health.py @@ -5,9 +5,12 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, HTTPException, status -from karapace.schema_registry import KarapaceSchemaRegistry +from opentelemetry.trace import Span +from opentelemetry.trace.status import StatusCode from pydantic import BaseModel from schema_registry.container import SchemaRegistryContainer +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.telemetry.tracer import Tracer class HealthStatus(BaseModel): @@ -34,34 +37,57 @@ class HealthCheck(BaseModel): ) +def set_health_status_tracing_attributes(health_check_span: Span, health_status: HealthStatus) -> None: + health_check_span.add_event("Setting health status tracing attributes") + health_check_span.set_attribute("schema_registry_ready", health_status.schema_registry_ready) + health_check_span.set_attribute("schema_registry_startup_time_sec", health_status.schema_registry_startup_time_sec) + health_check_span.set_attribute( + "schema_registry_reader_current_offset", health_status.schema_registry_reader_current_offset + ) + health_check_span.set_attribute( + "schema_registry_reader_highest_offset", health_status.schema_registry_reader_highest_offset + ) + health_check_span.set_attribute("schema_registry_is_primary", getattr(health_status, "schema_registry_is_primary", "")) + + @health_router.get("") @inject async def health( - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), + tracer: Tracer = Depends(Provide[SchemaRegistryContainer.telemetry_container.tracer]), ) -> HealthCheck: - starttime = 0.0 - if schema_registry.schema_reader.ready(): - starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time - - cs = schema_registry.mc.get_coordinator_status() - - health_status = HealthStatus( - schema_registry_ready=schema_registry.schema_reader.ready(), - schema_registry_startup_time_sec=starttime, - schema_registry_reader_current_offset=schema_registry.schema_reader.offset, - schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), - schema_registry_is_primary=cs.is_primary, - schema_registry_is_primary_eligible=cs.is_primary_eligible, - schema_registry_primary_url=cs.primary_url, - schema_registry_coordinator_running=cs.is_running, - schema_registry_coordinator_generation_id=cs.group_generation_id, - ) - # if self._auth is not None: - # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + with tracer.get_tracer().start_as_current_span("APIRouter: health_check") as health_check_span: + starttime = 0.0 - if not await schema_registry.schema_reader.is_healthy(): - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + health_check_span.add_event("Checking schema-reader is ready") + schema_reader_is_ready = schema_registry.schema_reader.ready() + if schema_reader_is_ready: + starttime = schema_registry.schema_reader.last_check - schema_registry.schema_reader.start_time + + health_check_span.add_event("Getting schema-registry master-coordinator status") + cs = schema_registry.mc.get_coordinator_status() + + health_check_span.add_event("Building health status response model") + health_status = HealthStatus( + schema_registry_ready=schema_reader_is_ready, + schema_registry_startup_time_sec=starttime, + schema_registry_reader_current_offset=schema_registry.schema_reader.offset, + schema_registry_reader_highest_offset=schema_registry.schema_reader.highest_offset(), + schema_registry_is_primary=cs.is_primary, + schema_registry_is_primary_eligible=cs.is_primary_eligible, + schema_registry_primary_url=cs.primary_url, + schema_registry_coordinator_running=cs.is_running, + schema_registry_coordinator_generation_id=cs.group_generation_id, ) + set_health_status_tracing_attributes(health_check_span=health_check_span, health_status=health_status) + + # if self._auth is not None: + # resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified + + if not await schema_registry.schema_reader.is_healthy(): + health_check_span.add_event("Erroring because schema-reader is not healthy") + health_check_span.set_status(status=StatusCode.ERROR, description="Schema reader is not healthy") + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) - return HealthCheck(status=health_status, healthy=True) + health_check_span.add_event("Returning health check response") + return HealthCheck(status=health_status, healthy=True) diff --git a/src/schema_registry/routers/master_availability.py b/src/schema_registry/routers/master_availability.py index a3783575a..02d072afd 100644 --- a/src/schema_registry/routers/master_availability.py +++ b/src/schema_registry/routers/master_availability.py @@ -7,9 +7,9 @@ from fastapi import APIRouter, Depends, Request, Response from karapace.config import Config from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from pydantic import BaseModel from schema_registry.container import SchemaRegistryContainer +from schema_registry.registry import KarapaceSchemaRegistry from typing import Final import logging @@ -38,7 +38,7 @@ async def master_availability( response: Response, config: Config = Depends(Provide[SchemaRegistryContainer.karapace_container.config]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), ) -> MasterAvailabilityResponse: are_we_master, master_url = await schema_registry.get_master() LOG.info("are master %s, master url %s", are_we_master, master_url) diff --git a/src/schema_registry/routers/mode.py b/src/schema_registry/routers/mode.py index c139e8e7d..4df141d3d 100644 --- a/src/schema_registry/routers/mode.py +++ b/src/schema_registry/routers/mode.py @@ -8,9 +8,9 @@ from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import ModeResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/schemas.py b/src/schema_registry/routers/schemas.py index 984c50085..63cb7dadc 100644 --- a/src/schema_registry/routers/schemas.py +++ b/src/schema_registry/routers/schemas.py @@ -7,8 +7,8 @@ from fastapi import APIRouter, Depends, Query from karapace.auth import AuthenticatorAndAuthorizer, User from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController from schema_registry.routers.requests import SchemaListingItem, SchemasResponse, SubjectVersion -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated diff --git a/src/schema_registry/routers/subjects.py b/src/schema_registry/routers/subjects.py index 4d0a9fe94..cd5352490 100644 --- a/src/schema_registry/routers/subjects.py +++ b/src/schema_registry/routers/subjects.py @@ -7,12 +7,12 @@ from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.forward_client import ForwardClient -from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController from schema_registry.user import get_current_user from typing import Annotated @@ -74,7 +74,7 @@ async def subjects_subject_delete( permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): @@ -155,7 +155,7 @@ async def subjects_subject_version_delete( permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), - schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): diff --git a/src/schema_registry/telemetry/__init__.py b/src/schema_registry/telemetry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/src/schema_registry/telemetry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/src/schema_registry/telemetry/container.py b/src/schema_registry/telemetry/container.py new file mode 100644 index 000000000..d9d53ea2f --- /dev/null +++ b/src/schema_registry/telemetry/container.py @@ -0,0 +1,31 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.semconv.attributes import telemetry_attributes as T +from schema_registry.telemetry.tracer import Tracer + + +def create_tracing_resource(config: Config) -> Resource: + return Resource.create( + { + "service.name": config.telemetry.resource_service_name, + "service.instance.id": config.telemetry.resource_service_instance_id, + T.TELEMETRY_SDK_NAME: config.telemetry.resource_telemetry_sdk_name, + T.TELEMETRY_SDK_LANGUAGE: config.telemetry.resource_telemetry_sdk_language, + T.TELEMETRY_SDK_VERSION: config.telemetry.resource_telemetry_sdk_version, + } + ) + + +class TelemetryContainer(containers.DeclarativeContainer): + karapace_container = providers.Container(KarapaceContainer) + tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config) + tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource) + tracer = providers.Singleton(Tracer) diff --git a/src/schema_registry/telemetry/middleware.py b/src/schema_registry/telemetry/middleware.py new file mode 100644 index 000000000..c6d14bbe9 --- /dev/null +++ b/src/schema_registry/telemetry/middleware.py @@ -0,0 +1,34 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from collections.abc import Awaitable, Callable +from dependency_injector.wiring import inject, Provide +from fastapi import FastAPI, Request, Response +from opentelemetry.trace import SpanKind +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer + +import logging + +LOG = logging.getLogger(__name__) + + +@inject +async def telemetry_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + tracer: Tracer = Provide[TelemetryContainer.tracer], +) -> Response: + resource = request.url.path.split("/")[1] + with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: + tracer.update_span_with_request(request=request, span=span) + response: Response = await call_next(request) + tracer.update_span_with_response(response=response, span=span) + return response + + +def setup_telemetry_middleware(app: FastAPI) -> None: + LOG.info("Setting OTel tracing middleware") + app.middleware("http")(telemetry_middleware) diff --git a/src/schema_registry/telemetry/setup.py b/src/schema_registry/telemetry/setup.py new file mode 100644 index 000000000..30b423902 --- /dev/null +++ b/src/schema_registry/telemetry/setup.py @@ -0,0 +1,24 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.telemetry.container import TelemetryContainer +from schema_registry.telemetry.tracer import Tracer + +import logging + +LOG = logging.getLogger(__name__) + + +@inject +def setup_tracing( + tracer_provider: TracerProvider = Provide[TelemetryContainer.tracer_provider], + tracer: Tracer = Provide[TelemetryContainer.tracer], +) -> None: + LOG.info("Setting OTel tracing provider") + tracer_provider.add_span_processor(tracer.get_span_processor()) + trace.set_tracer_provider(tracer_provider) diff --git a/src/schema_registry/telemetry/tracer.py b/src/schema_registry/telemetry/tracer.py new file mode 100644 index 000000000..e905e0edc --- /dev/null +++ b/src/schema_registry/telemetry/tracer.py @@ -0,0 +1,73 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from collections.abc import Callable +from dependency_injector.wiring import inject, Provide +from fastapi import Request, Response +from karapace.config import Config +from karapace.container import KarapaceContainer +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, SpanProcessor +from opentelemetry.semconv.attributes import ( + client_attributes as C, + http_attributes as H, + server_attributes as S, + url_attributes as U, +) +from opentelemetry.trace.span import Span + +import inspect + + +class Tracer: + @staticmethod + @inject + def get_tracer(config: Config = Provide[KarapaceContainer.config]) -> trace.Tracer: + return trace.get_tracer(f"{config.tags.app}.tracer") + + @staticmethod + @inject + def get_span_processor(config: Config = Provide[KarapaceContainer.config]) -> SpanProcessor: + if config.telemetry.otel_endpoint_url: + otlp_span_exporter = OTLPSpanExporter(endpoint=config.telemetry.otel_endpoint_url) + return BatchSpanProcessor(otlp_span_exporter) + return SimpleSpanProcessor(ConsoleSpanExporter()) + + @staticmethod + def get_name_from_caller() -> str: + return inspect.stack()[1].function + + @staticmethod + def get_name_from_caller_with_class(function_class: object, function: Callable) -> str: + return f"{type(function_class).__name__}.{function.__name__}()" + + @staticmethod + def add_span_attribute(span: Span, key: str, value: str | int) -> None: + if span.is_recording(): + span.set_attribute(key, value) + + @staticmethod + def update_span_with_request(request: Request, span: Span) -> None: + if span.is_recording(): + span.set_attribute(C.CLIENT_ADDRESS, request.client.host or "" if request.client else "") + span.set_attribute(C.CLIENT_PORT, request.client.port or "" if request.client else "") + span.set_attribute(S.SERVER_ADDRESS, request.url.hostname or "") + span.set_attribute(S.SERVER_PORT, request.url.port or "") + span.set_attribute(U.URL_SCHEME, request.url.scheme) + span.set_attribute(U.URL_PATH, request.url.path) + span.set_attribute(H.HTTP_REQUEST_METHOD, request.method) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.connection", request.headers.get("connection", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.user_agent", request.headers.get("user-agent", "")) + span.set_attribute(f"{H.HTTP_REQUEST_HEADER_TEMPLATE}.content_type", request.headers.get("content-type", "")) + + @staticmethod + def update_span_with_response(response: Response, span: Span) -> None: + if span.is_recording(): + span.set_attribute(H.HTTP_RESPONSE_STATUS_CODE, response.status_code) + span.set_attribute(f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_type", response.headers.get("content-type", "")) + span.set_attribute( + f"{H.HTTP_RESPONSE_HEADER_TEMPLATE}.content_length", response.headers.get("content-length", "") + ) diff --git a/tests/conftest.py b/tests/conftest.py index 8413773fa..5b65c9405 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,12 +6,17 @@ from karapace.container import KarapaceContainer from pathlib import Path from schema_registry.container import SchemaRegistryContainer +from schema_registry.telemetry.container import TelemetryContainer from tempfile import mkstemp import json import os import pytest import re +import schema_registry.controller +import schema_registry.telemetry.middleware +import schema_registry.telemetry.setup +import schema_registry.telemetry.tracer pytest_plugins = "aiohttp.pytest_plugin" KAFKA_BOOTSTRAP_SERVERS_OPT = "--kafka-bootstrap-servers" @@ -182,11 +187,32 @@ def fixture_tmp_file(): path.unlink() -@pytest.fixture(name="karapace_container", scope="session") +@pytest.fixture(name="karapace_container", scope="session", autouse=True) def fixture_karapace_container() -> KarapaceContainer: - return KarapaceContainer() + karapace_container = KarapaceContainer() + karapace_container.wire( + modules=[ + schema_registry.controller, + schema_registry.telemetry.tracer, + ] + ) + return karapace_container + + +@pytest.fixture(name="telemetry_container", scope="session", autouse=True) +def fixture_telemetry_container() -> TelemetryContainer: + telemetry_container = TelemetryContainer() + telemetry_container.wire( + modules=[ + schema_registry.telemetry.setup, + schema_registry.telemetry.middleware, + ] + ) + return telemetry_container -@pytest.fixture -def schema_registry_container(karapace_container: KarapaceContainer) -> SchemaRegistryContainer: - return SchemaRegistryContainer(karapace_container=karapace_container) +@pytest.fixture(name="schema_registry_container", scope="session", autouse=True) +def fixture_schema_registry_container( + karapace_container: KarapaceContainer, telemetry_container: TelemetryContainer +) -> SchemaRegistryContainer: + return SchemaRegistryContainer(karapace_container=karapace_container, telemetry_container=telemetry_container) diff --git a/tests/e2e/schema_registry/test_jsonschema.py b/tests/e2e/schema_registry/test_jsonschema.py index 803b72223..01267f6f0 100644 --- a/tests/e2e/schema_registry/test_jsonschema.py +++ b/tests/e2e/schema_registry/test_jsonschema.py @@ -5,8 +5,8 @@ from jsonschema import Draft7Validator from karapace.client import Client from karapace.compatibility import CompatibilityModes -from karapace.schema_reader import SchemaType from karapace.typing import SchemaMetadata, SchemaRuleSet +from schema_registry.reader import SchemaType from tests.schemas.json_schemas import ( A_DINT_B_DINT_OBJECT_SCHEMA, A_DINT_B_INT_OBJECT_SCHEMA, diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 2f6a7097a..1a7937eca 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -11,7 +11,7 @@ from karapace.rapu import is_success from karapace.schema_type import SchemaType from karapace.utils import json_encode -from schema_registry.schema_registry_apis import SchemaErrorMessages +from schema_registry.controller import SchemaErrorMessages from tests.base_testcase import BaseTestCase from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 46643c9ce..0b5ae9f2c 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -12,8 +12,8 @@ from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode +from schema_registry.reader import KafkaSchemaReader from tests.base_testcase import BaseTestCase from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers diff --git a/tests/unit/schema_registry/__init__.py b/tests/unit/schema_registry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/schema_registry/telemetry/__init__.py b/tests/unit/schema_registry/telemetry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/schema_registry/telemetry/test_middleware.py b/tests/unit/schema_registry/telemetry/test_middleware.py new file mode 100644 index 000000000..ecbe79307 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_middleware.py @@ -0,0 +1,56 @@ +""" +schema_registry - telemetry middleware tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from fastapi import FastAPI, Request, Response +from opentelemetry.trace import SpanKind +from schema_registry.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import AsyncMock, MagicMock + +import logging + + +def test_setup_telemetry_middleware(caplog: LogCaptureFixture) -> None: + app = AsyncMock(spec=FastAPI) + with caplog.at_level(logging.INFO, logger="schema_registry.telemetry.middleware"): + setup_telemetry_middleware(app=app) + + for log in caplog.records: + assert log.name == "schema_registry.telemetry.middleware" + assert log.levelname == "INFO" + assert log.message == "Setting OTel tracing middleware" + + app.middleware.assert_called_once_with("http") + app.middleware.return_value.assert_called_once_with(telemetry_middleware) + + +async def test_telemetry_middleware() -> None: + tracer = MagicMock(spec=Tracer) + + request_mock = AsyncMock(spec=Request) + request_mock.method = "GET" + request_mock.url.path = "/test" + + response_mock = AsyncMock(spec=Response) + response_mock.status_code = 200 + + call_next = AsyncMock() + call_next.return_value = response_mock + + response = await telemetry_middleware(request=request_mock, call_next=call_next, tracer=tracer) + span = tracer.get_tracer.return_value.start_as_current_span.return_value.__enter__.return_value + + tracer.get_tracer.assert_called_once() + tracer.get_tracer.return_value.start_as_current_span.assert_called_once_with(name="GET: /test", kind=SpanKind.SERVER) + tracer.update_span_with_request.assert_called_once_with(request=request_mock, span=span) + tracer.update_span_with_response.assert_called_once_with(response=response_mock, span=span) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request_mock) + + assert response == response_mock diff --git a/tests/unit/schema_registry/telemetry/test_setup.py b/tests/unit/schema_registry/telemetry/test_setup.py new file mode 100644 index 000000000..e3dc90690 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_setup.py @@ -0,0 +1,32 @@ +""" +schema_registry - telemetry setup tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from opentelemetry.sdk.trace import TracerProvider +from schema_registry.telemetry.setup import setup_tracing +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import MagicMock, patch + +import logging + + +def test_setup_telemetry(caplog: LogCaptureFixture): + tracer_provider = MagicMock(spec=TracerProvider) + tracer = MagicMock(spec=Tracer) + with ( + caplog.at_level(logging.INFO, logger="schema_registry.telemetry.setup"), + patch("schema_registry.telemetry.setup.trace") as mock_trace, + ): + tracer.get_span_processor.return_value = "span_processor" + setup_tracing(tracer_provider=tracer_provider, tracer=tracer) + + tracer_provider.add_span_processor.assert_called_once_with("span_processor") + mock_trace.set_tracer_provider.assert_called_once_with(tracer_provider) + for log in caplog.records: + assert log.name == "schema_registry.telemetry.setup" + assert log.levelname == "INFO" + assert log.message == "Setting OTel tracing provider" diff --git a/tests/unit/schema_registry/telemetry/test_tracer.py b/tests/unit/schema_registry/telemetry/test_tracer.py new file mode 100644 index 000000000..f1edabde2 --- /dev/null +++ b/tests/unit/schema_registry/telemetry/test_tracer.py @@ -0,0 +1,121 @@ +""" +schema_registry - telemetry middleware tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from fastapi import Request, Response +from karapace.config import KarapaceTelemetry +from karapace.container import KarapaceContainer +from opentelemetry.sdk.trace.export import SpanProcessor +from opentelemetry.trace.span import Span +from schema_registry.telemetry.tracer import Tracer +from unittest.mock import call, MagicMock, patch + + +def test_tracer(karapace_container: KarapaceContainer): + with patch("schema_registry.telemetry.tracer.trace") as mock_trace: + Tracer.get_tracer(config=karapace_container.config()) + mock_trace.get_tracer.assert_called_once_with("Karapace.tracer") + + +def test_get_name_from_caller(): + def test_function(): + return Tracer.get_name_from_caller() + + assert test_function() == "test_function" + + +def test_get_name_from_caller_with_class(): + class Test: + def test_function(self): + return Tracer.get_name_from_caller_with_class(self, self.test_function) + + assert Test().test_function() == "Test.test_function()" + + +def test_get_span_processor_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} + ) + with ( + patch("schema_registry.telemetry.tracer.OTLPSpanExporter") as mock_otlp_exporter, + patch("schema_registry.telemetry.tracer.BatchSpanProcessor") as mock_batch_span_processor, + ): + processor: SpanProcessor = Tracer.get_span_processor(config=config) + mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") + mock_batch_span_processor.assert_called_once_with(mock_otlp_exporter.return_value) + assert processor is mock_batch_span_processor.return_value + + +def test_get_span_processor_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} + ) + with ( + patch("schema_registry.telemetry.tracer.ConsoleSpanExporter") as mock_console_exporter, + patch("schema_registry.telemetry.tracer.SimpleSpanProcessor") as mock_simple_span_processor, + ): + processor: SpanProcessor = Tracer.get_span_processor(config=config) + mock_simple_span_processor.assert_called_once_with(mock_console_exporter.return_value) + assert processor is mock_simple_span_processor.return_value + + +def test_add_span_attribute(): + span = MagicMock(spec=Span) + + # Test when span is not recording + span.is_recording.return_value = False + Tracer.add_span_attribute(span=span, key="key", value="value") + assert not span.set_attribute.called + + # Test when span is recording + span.is_recording.return_value = True + Tracer.add_span_attribute(span=span, key="key", value="value") + span.set_attribute.assert_called_once_with("key", "value") + + +def test_update_span_with_request(): + span = MagicMock(spec=Span) + span.is_recording.return_value = True + + request = MagicMock(spec=Request) + request.headers = {"content-type": "application/json", "connection": "keep-alive", "user-agent": "pytest"} + request.method = "GET" + request.url = MagicMock(port=8081, scheme="http", path="/test", hostname="server") + request.client = MagicMock(host="client", port=8080) + + Tracer.update_span_with_request(request=request, span=span) + span.set_attribute.assert_has_calls( + [ + call("client.address", "client"), + call("client.port", 8080), + call("server.address", "server"), + call("server.port", 8081), + call("url.scheme", "http"), + call("url.path", "/test"), + call("http.request.method", "GET"), + call("http.request.header.connection", "keep-alive"), + call("http.request.header.user_agent", "pytest"), + call("http.request.header.content_type", "application/json"), + ] + ) + + +def test_update_span_with_response(): + span = MagicMock(spec=Span) + + response = MagicMock(spec=Response) + response.status_code = 200 + response.headers = {"content-type": "application/json", "content-length": 8} + + span.is_recording.return_value = True + Tracer.update_span_with_response(response=response, span=span) + span.set_attribute.assert_has_calls( + [ + call("http.response.status_code", 200), + call("http.response.header.content_type", "application/json"), + call("http.response.header.content_length", 8), + ] + ) diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/schema_registry/test_controller.py similarity index 87% rename from tests/unit/test_schema_registry_api.py rename to tests/unit/schema_registry/test_controller.py index f21f47097..4c4b0b16d 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/schema_registry/test_controller.py @@ -5,8 +5,8 @@ from fastapi.exceptions import HTTPException from karapace.rapu import HTTPResponse from karapace.schema_models import SchemaType, ValidatedTypedSchema -from karapace.schema_reader import KafkaSchemaReader from schema_registry.container import SchemaRegistryContainer +from schema_registry.reader import KafkaSchemaReader from unittest.mock import Mock, patch, PropertyMock import asyncio @@ -45,7 +45,7 @@ async def test_validate_schema_request_body(schema_registry_container: SchemaReg async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryContainer) -> None: - with patch("karapace.container.KarapaceSchemaRegistry") as schema_registry_class: + with patch("schema_registry.container.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) type(schema_reader_mock).ready = ready_property_mock @@ -60,7 +60,6 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC close_func.return_value = close_future_result schema_registry_class.close = close_func - schema_registry_container.karapace_container().schema_registry = schema_registry_class controller = schema_registry_container.schema_registry_controller() controller.schema_registry = schema_registry_class @@ -78,3 +77,7 @@ async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryC user=None, authorizer=None, ) + with pytest.raises(HTTPResponse): + # prevent `future exception was never retrieved` warning logs + # future: + await mock_forward_func_future diff --git a/tests/unit/test_schema_reader.py b/tests/unit/schema_registry/test_reader.py similarity index 98% rename from tests/unit/test_schema_reader.py rename to tests/unit/schema_registry/test_reader.py index a245d026f..e1e9e0db5 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/schema_registry/test_reader.py @@ -16,7 +16,10 @@ from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import ( +from karapace.schema_type import SchemaType +from karapace.typing import SchemaId, Version +from pytest import MonkeyPatch +from schema_registry.reader import ( KafkaSchemaReader, MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, MAX_MESSAGES_TO_CONSUME_ON_STARTUP, @@ -24,9 +27,6 @@ OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) -from karapace.schema_type import SchemaType -from karapace.typing import SchemaId, Version -from pytest import MonkeyPatch from tests.base_testcase import BaseTestCase from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref from unittest.mock import Mock @@ -318,12 +318,12 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_cont database=database_mock, ) - with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARNING, logger="schema_registry.reader"): schema_reader._handle_msg_schema_hard_delete( # pylint: disable=protected-access key={"subject": "test-subject", "version": 2} ) for log in caplog.records: - assert log.name == "karapace.schema_reader" + assert log.name == "schema_registry.reader" assert log.levelname == "WARNING" assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have" @@ -598,14 +598,14 @@ def test_message_error_handling( consumer_messages = ([message],) schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) - with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARNING, logger="schema_registry.reader"): with pytest.raises(test_case.expected_error): schema_reader.handle_messages() assert schema_reader.offset == 1 assert not schema_reader.ready() for log in caplog.records: - assert log.name == "karapace.schema_reader" + assert log.name == "schema_registry.reader" assert log.levelname == "WARNING" assert log.message == test_case.expected_log_message @@ -636,7 +636,7 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( ) message_using_ref = message_factory(key=key_using_ref, value=value_using_ref) - with caplog.at_level(logging.WARN, logger="karapace.schema_reader"): + with caplog.at_level(logging.WARN, logger="schema_registry.reader"): # When handling the corrupted schema schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],)) @@ -662,8 +662,8 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( assert len(warn_records) == 2 # Check that different warnings are logged for each schema - assert warn_records[0].name == "karapace.schema_reader" + assert warn_records[0].name == "schema_registry.reader" assert warn_records[0].message == "Schema is not valid ProtoBuf definition" - assert warn_records[1].name == "karapace.schema_reader" + assert warn_records[1].name == "schema_registry.reader" assert warn_records[1].message == "Invalid Protobuf references" diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index 2a0156567..9dc8edf88 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -14,10 +14,10 @@ from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import SchemaVersion, TypedSchema -from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import Reference, Referents from karapace.typing import SchemaId, Version from pathlib import Path +from schema_registry.reader import KafkaSchemaReader from typing import Final TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/")