Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: master coordinator with aiokafka #880

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Final

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_S: Final = 20
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
Expand Down
Empty file.
127 changes: 127 additions & 0 deletions karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
karapace - master coordinator

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiokafka import AIOKafkaClient
from aiokafka.errors import KafkaConnectionError
from aiokafka.helpers import create_ssl_context
from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from typing import Final

import asyncio
import logging

__all__ = ("MasterCoordinator",)

LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
return self._sc

@property
def config(self) -> Config:
return self._config

async def start(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
while True:
try:
await self._kafka_client.bootstrap()
break
except KafkaConnectionError:
LOG.exception("Kafka client bootstrap failed.")
await asyncio.sleep(0.5)

while not self._kafka_client.cluster.brokers():
LOG.info(
"Waiting cluster metadata update after Kafka client bootstrap: %s.", self._kafka_client.cluster.brokers()
)
self._kafka_client.force_metadata_update()
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
if self._sc.ready():
return
await asyncio.sleep(0.5)

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
cafile=self._config["ssl_cafile"],
certfile=self._config["ssl_certfile"],
keyfile=self._config["ssl_keyfile"],
)

return AIOKafkaClient(
bootstrap_servers=self._config["bootstrap_uri"],
client_id=self._config["client_id"],
metadata_max_age_ms=self._config["metadata_max_age_ms"],
request_timeout_ms=DEFAULT_REQUEST_TIMEOUT_MS,
# Set default "PLAIN" if not configured, aiokafka expects
# security protocol for SASL but requires a non-null value
# for sasl mechanism.
sasl_mechanism=self._config["sasl_mechanism"] or "PLAIN",
sasl_plain_username=self._config["sasl_plain_username"],
sasl_plain_password=self._config["sasl_plain_password"],
security_protocol=self._config["security_protocol"],
ssl_context=ssl_context,
)

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
)
schema_coordinator.start()
return schema_coordinator

def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> tuple[bool | None, str | None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/optional: maybe a class like this?

@dataclasses.dataclass(frozen=True, kwonly=True)
class MasterInfo:
   this_node_is_master: bool
   master_url: str

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note also the @default_dataclass helper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gives you frozen + kw_only, and slots on Python 3.10+.

"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
Loading
Loading