Skip to content

Commit

Permalink
merge with main
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jun 13, 2024
2 parents 44cd5b8 + 7a30144 commit 13abf60
Show file tree
Hide file tree
Showing 59 changed files with 3,441 additions and 836 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
branch = True
relative_files = True
source = karapace
46 changes: 45 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,55 @@ jobs:

- run: make install version
- run: make unit-tests
- run: make integration-tests PYTEST_ARGS="--random-order"
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append"
- run: make integration-tests
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append --random-order"

- name: Archive logs
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: karapace-integration-test-logs-${{ matrix.python-version }}
path: /tmp/ci-logs
- name: Archive coverage file
uses: actions/upload-artifact@v4
with:
name: "coverage-${{ matrix.python-version }}"
path: ".coverage.${{ matrix.python-version }}"

coverage:
name: Coverage report
runs-on: ubuntu-latest
needs: tests
permissions:
pull-requests: write
contents: write
steps:
- uses: actions/checkout@v4

- name: Download coverage
id: download_coverage
uses: actions/download-artifact@v4
with:
pattern: coverage-*
merge-multiple: true

- run: make karapace/version.py

- name: Post coverage comment
id: post_coverage_comment
uses: py-cov-action/python-coverage-comment-action@v3
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
MERGE_COVERAGE_FILES: true

- name: Store PR comment to be posted
uses: actions/upload-artifact@v4
if: steps.post_coverage_comment.outputs.COMMENT_FILE_WRITTEN == 'true'
with:
name: python-coverage-comment-action
path: python-coverage-comment-action.txt
10 changes: 8 additions & 2 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
)
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from aiokafka.errors import KafkaError, TopicAlreadyExistsError
from concurrent.futures import Future
from confluent_kafka import Message, TopicPartition
from enum import Enum
from functools import partial
from kafka.errors import KafkaError, TopicAlreadyExistsError
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
Expand Down Expand Up @@ -299,7 +299,13 @@ def _consume_records(
if start_offset >= end_offset:
raise EmptyPartition

end_offset -= 1 # high watermark to actual end offset
# confluent-kafka-python returns end offset + 1, i.e. the value that will
# be assigned to the next record to be produced. To get the highest offset
# already produced we need to subtract 1. Note that this has little to do
# with high watermark, which takes into account the highest record offset
# to be fully in sync across ISRs, and therefore can be an arbitrary
# number less than or equal to the LEO.
end_offset -= 1

while True:
record: Message | None = consumer.poll(timeout=poll_timeout.seconds)
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import api
from .errors import BackupDataRestorationError, StaleConsumerError
from .poll_timeout import PollTimeout
from kafka.errors import BrokerResponseError
from aiokafka.errors import BrokerResponseError
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from typing import Iterator
Expand Down
1 change: 0 additions & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Final

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_S: Final = 20
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
Expand Down
Empty file.
127 changes: 127 additions & 0 deletions karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
karapace - master coordinator
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiokafka import AIOKafkaClient
from aiokafka.errors import KafkaConnectionError
from aiokafka.helpers import create_ssl_context
from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from typing import Final

import asyncio
import logging

__all__ = ("MasterCoordinator",)

LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
return self._sc

@property
def config(self) -> Config:
return self._config

async def start(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
while True:
try:
await self._kafka_client.bootstrap()
break
except KafkaConnectionError:
LOG.exception("Kafka client bootstrap failed.")
await asyncio.sleep(0.5)

while not self._kafka_client.cluster.brokers():
LOG.info(
"Waiting cluster metadata update after Kafka client bootstrap: %s.", self._kafka_client.cluster.brokers()
)
self._kafka_client.force_metadata_update()
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
if self._sc.ready():
return
await asyncio.sleep(0.5)

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
cafile=self._config["ssl_cafile"],
certfile=self._config["ssl_certfile"],
keyfile=self._config["ssl_keyfile"],
)

return AIOKafkaClient(
bootstrap_servers=self._config["bootstrap_uri"],
client_id=self._config["client_id"],
metadata_max_age_ms=self._config["metadata_max_age_ms"],
request_timeout_ms=DEFAULT_REQUEST_TIMEOUT_MS,
# Set default "PLAIN" if not configured, aiokafka expects
# security protocol for SASL but requires a non-null value
# for sasl mechanism.
sasl_mechanism=self._config["sasl_mechanism"] or "PLAIN",
sasl_plain_username=self._config["sasl_plain_username"],
sasl_plain_password=self._config["sasl_plain_password"],
security_protocol=self._config["security_protocol"],
ssl_context=ssl_context,
)

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
)
schema_coordinator.start()
return schema_coordinator

def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
Loading

0 comments on commit 13abf60

Please sign in to comment.