Skip to content

Commit

Permalink
feature: master coordinator with aiokafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed May 17, 2024
1 parent d1bba10 commit 7c0f8e9
Show file tree
Hide file tree
Showing 15 changed files with 1,217 additions and 462 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
!README.rst
!container/start.sh
!container/healthcheck.py
!venv/lib/python3.11/site-packages/kafka/conn.py
!venv/lib/python3.11/site-packages/kafka/client_async.py
!venv/lib/python3.11/site-packages/kafka/coordinator/base.py

# Ignore some files in source directories.
**/.DS_Store
Expand Down
3 changes: 3 additions & 0 deletions container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ RUN chmod 500 /opt/karapace/start.sh \
&& chown karapace:karapace /opt/karapace/start.sh

COPY ./container/healthcheck.py /opt/karapace
COPY ./venv/lib/python3.11/site-packages/kafka/conn.py /venv/lib/python3.10/site-packages/kafka/conn.py
COPY ./venv/lib/python3.11/site-packages/kafka/client_async.py /venv/lib/python3.10/site-packages/kafka/client_async.py
COPY ./venv/lib/python3.11/site-packages/kafka/coordinator/base.py /venv/lib/python3.10/site-packages/kafka/coordinator/base.py

WORKDIR /opt/karapace
USER karapace
Expand Down
1 change: 0 additions & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Final

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_S: Final = 20
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
Expand Down
Empty file.
105 changes: 105 additions & 0 deletions karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
karapace - master coordinator
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from aiokafka import AIOKafkaClient
from aiokafka.helpers import create_ssl_context
from asyncio import Event
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from typing import Final, Optional, Tuple

import logging

__all__ = ("MasterCoordinator",)

LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: Optional[AIOKafkaClient] = None
self._running = True
self._sc: Optional[SchemaCoordinator] = None
self._schema_coordinator_ready: Final = Event()

@property
def schema_coordinator(self) -> Optional[SchemaCoordinator]:
return self._sc

@property
def config(self) -> Config:
return self._config

async def start(self) -> None:
self._kafka_client = self.init_kafka_client()
await self._kafka_client.bootstrap()
self._sc = self.init_schema_coordinator()
# Wait until schema coordinator is ready.
await self._schema_coordinator_ready.wait()

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
cafile=self._config["ssl_cafile"],
certfile=self._config["ssl_certfile"],
keyfile=self._config["ssl_keyfile"],
)

return AIOKafkaClient(
bootstrap_servers=self._config["bootstrap_uri"],
client_id=self._config["client_id"],
metadata_max_age_ms=self._config["metadata_max_age_ms"],
request_timeout_ms=DEFAULT_REQUEST_TIMEOUT_MS,
# Set default "PLAIN" if not configured, aiokafka expects
# security protocol for SASL but requires a non-null value
# for sasl mechanism.
sasl_mechanism=self._config["sasl_mechanism"] or "PLAIN",
sasl_plain_username=self._config["sasl_plain_username"],
sasl_plain_password=self._config["sasl_plain_password"],
security_protocol=self._config["security_protocol"],
ssl_context=ssl_context,
)

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
return SchemaCoordinator(
client=self._kafka_client,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
ready_event=self._schema_coordinator_ready,
)

def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else None
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> Tuple[Optional[bool], Optional[str]]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
Loading

0 comments on commit 7c0f8e9

Please sign in to comment.