diff --git a/karapace/constants.py b/karapace/constants.py index c2214dc77..e18a6e574 100644 --- a/karapace/constants.py +++ b/karapace/constants.py @@ -7,7 +7,6 @@ from typing import Final SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1 -API_VERSION_AUTO_TIMEOUT_MS: Final = 30000 TOPIC_CREATION_TIMEOUT_S: Final = 20 DEFAULT_SCHEMA_TOPIC: Final = "_schemas" DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576 diff --git a/karapace/coordinator/__init__.py b/karapace/coordinator/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py new file mode 100644 index 000000000..b9fdb3a12 --- /dev/null +++ b/karapace/coordinator/master_coordinator.py @@ -0,0 +1,127 @@ +""" +karapace - master coordinator + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiokafka import AIOKafkaClient +from aiokafka.errors import KafkaConnectionError +from aiokafka.helpers import create_ssl_context +from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest +from karapace.config import Config +from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus +from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS +from typing import Final + +import asyncio +import logging + +__all__ = ("MasterCoordinator",) + +LOG = logging.getLogger(__name__) + + +class MasterCoordinator: + """Handles primary election""" + + def __init__(self, config: Config) -> None: + super().__init__() + self._config: Final = config + self._kafka_client: AIOKafkaClient | None = None + self._running = True + self._sc: SchemaCoordinator | None = None + + @property + def schema_coordinator(self) -> SchemaCoordinator | None: + return self._sc + + @property + def config(self) -> Config: + return self._config + + async def start(self) -> None: + self._kafka_client = self.init_kafka_client() + # Wait until schema coordinator is ready. + # This probably needs better synchronization than plain waits. + while True: + try: + await self._kafka_client.bootstrap() + break + except KafkaConnectionError: + LOG.exception("Kafka client bootstrap failed.") + await asyncio.sleep(0.5) + + while not self._kafka_client.cluster.brokers(): + LOG.info( + "Waiting cluster metadata update after Kafka client bootstrap: %s.", self._kafka_client.cluster.brokers() + ) + self._kafka_client.force_metadata_update() + await asyncio.sleep(0.5) + + self._sc = self.init_schema_coordinator() + while True: + if self._sc.ready(): + return + await asyncio.sleep(0.5) + + def init_kafka_client(self) -> AIOKafkaClient: + ssl_context = create_ssl_context( + cafile=self._config["ssl_cafile"], + certfile=self._config["ssl_certfile"], + keyfile=self._config["ssl_keyfile"], + ) + + return AIOKafkaClient( + bootstrap_servers=self._config["bootstrap_uri"], + client_id=self._config["client_id"], + metadata_max_age_ms=self._config["metadata_max_age_ms"], + request_timeout_ms=DEFAULT_REQUEST_TIMEOUT_MS, + # Set default "PLAIN" if not configured, aiokafka expects + # security protocol for SASL but requires a non-null value + # for sasl mechanism. + sasl_mechanism=self._config["sasl_mechanism"] or "PLAIN", + sasl_plain_username=self._config["sasl_plain_username"], + sasl_plain_password=self._config["sasl_plain_password"], + security_protocol=self._config["security_protocol"], + ssl_context=ssl_context, + ) + + def init_schema_coordinator(self) -> SchemaCoordinator: + assert self._kafka_client is not None + schema_coordinator = SchemaCoordinator( + client=self._kafka_client, + election_strategy=self._config.get("master_election_strategy", "lowest"), + group_id=self._config["group_id"], + hostname=self._config["advertised_hostname"], + master_eligibility=self._config["master_eligibility"], + port=self._config["advertised_port"], + scheme=self._config["advertised_protocol"], + session_timeout_ms=self._config["session_timeout_ms"], + ) + schema_coordinator.start() + return schema_coordinator + + def get_coordinator_status(self) -> SchemaCoordinatorStatus: + assert self._sc is not None + generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID + return SchemaCoordinatorStatus( + is_primary=self._sc.are_we_master if self._sc is not None else None, + is_primary_eligible=self._config["master_eligibility"], + primary_url=self._sc.master_url if self._sc is not None else None, + is_running=True, + group_generation_id=generation if generation is not None else -1, + ) + + def get_master_info(self) -> tuple[bool | None, str | None]: + """Return whether we're the master, and the actual master url that can be used if we're not""" + assert self._sc is not None + return self._sc.are_we_master, self._sc.master_url + + async def close(self) -> None: + LOG.info("Closing master_coordinator") + if self._sc: + await self._sc.close() + if self._kafka_client: + await self._kafka_client.close() diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py new file mode 100644 index 000000000..2b64d1187 --- /dev/null +++ b/karapace/coordinator/schema_coordinator.py @@ -0,0 +1,960 @@ +""" +karapace - schema coordinator + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiokafka.client import AIOKafkaClient, ConnectionGroup +from aiokafka.cluster import ClusterMetadata +from aiokafka.consumer.group_coordinator import CoordinationType +from aiokafka.protocol.api import Request +from aiokafka.protocol.commit import OffsetCommitRequest_v2 as OffsetCommitRequest +from aiokafka.protocol.group import ( + HeartbeatRequest, + JoinGroupRequest, + JoinGroupResponse_v0, + JoinGroupResponse_v1, + JoinGroupResponse_v2, + JoinGroupResponse_v5, + LeaveGroupRequest, + SyncGroupRequest, + SyncGroupRequest_v0, + SyncGroupRequest_v1, + SyncGroupRequest_v3, +) +from aiokafka.util import create_future, create_task +from kafka.protocol.api import Response +from karapace.dataclasses import default_dataclass +from karapace.typing import JsonData +from karapace.utils import json_decode, json_encode +from karapace.version import __version__ +from typing import Any, Coroutine, Final, Sequence +from typing_extensions import TypedDict + +import aiokafka.errors as Errors +import asyncio +import copy +import logging +import time + +__all__ = ("SchemaCoordinator",) + +# SR group errors +NO_ERROR: Final = 0 + +LOG = logging.getLogger(__name__) + + +class MemberIdentity(TypedDict): + host: str + port: int + scheme: str + master_eligibility: bool + + +class MemberAssignment(TypedDict): + master: str + master_identity: MemberIdentity + + +@default_dataclass +class JoinGroupMemberData: + member_id: str + member_data: bytes + + +@default_dataclass +class JoinGroupResponseData: + leader_id: str + protocol: str + members: list[JoinGroupMemberData] + + +@default_dataclass +class Assignment: + member_id: str + metadata: bytes + + def to_tuple(self) -> tuple[str, bytes]: + return (self.member_id, self.metadata) + + +def get_member_url(scheme: str, host: str, port: int) -> str: + return f"{scheme}://{host}:{port}" + + +def get_member_configuration(*, host: str, port: int, scheme: str, master_eligibility: bool) -> JsonData: + return { + "version": 2, + "karapace_version": __version__, + "host": host, + "port": port, + "scheme": scheme, + "master_eligibility": master_eligibility, + } + + +@default_dataclass +class SchemaCoordinatorStatus: + is_primary: bool | None + is_primary_eligible: bool + primary_url: str | None + is_running: bool + group_generation_id: int + + +SCHEMA_COORDINATOR_PROTOCOL: Final = "sr" + + +class SchemaCoordinator: + """Schema registry specific group coordinator. + + Consumer group management is used to select primary Karapace + from the Karapace cluster. + + This class is derived from aiokafka.consumer.group_coordinator.GroupCoordinator. + Contains original comments and also Schema Registry specific comments. + """ + + are_we_master: bool | None = None + master_url: str | None = None + + def __init__( + self, + client: AIOKafkaClient, + hostname: str, + port: int, + scheme: str, + master_eligibility: bool, + election_strategy: str, + group_id: str, + heartbeat_interval_ms: int = 3000, + max_poll_interval_ms: int = 300000, + rebalance_timeout_ms: int = 30000, + retry_backoff_ms: int = 100, + session_timeout_ms: int = 10000, + ) -> None: + # Coordination flags and futures + self._client: Final = client + self._cluster: Final = client.cluster + self._ready = False + + self.election_strategy: Final = election_strategy + self.hostname: Final = hostname + self.port: Final = port + self.scheme: Final = scheme + self.master_eligibility: Final = master_eligibility + self.master_url: str | None = None + self.are_we_master = False + + self.rejoin_needed_fut: asyncio.Future[None] | None = None + self._coordinator_dead_fut: asyncio.Future[None] | None = None + + self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + self.group_id: Final = group_id + self.coordinator_id: int | None = None + self.group_instance_id: str | None = None + + self._max_poll_interval: Final = max_poll_interval_ms / 1000 + self._heartbeat_interval_ms: Final = heartbeat_interval_ms + self._rebalance_timeout_ms: Final = rebalance_timeout_ms + self._retry_backoff_ms: Final = retry_backoff_ms + self._session_timeout_ms: Final = session_timeout_ms + + self._coordinator_lookup_lock: Final = asyncio.Lock() + self._coordination_task: asyncio.Future[None] | None = None + + # Will be started/stopped by coordination task + self._heartbeat_task: asyncio.Task | None = None + self._commit_refresh_task: asyncio.Task | None = None + + # Those are mostly unrecoverable exceptions, but user may perform an + # action to handle those (for example add permission for this group). + # Thus we set exception and pause coordination until user consumes it. + self._pending_exception: BaseException | None = None + self._error_consumed_fut: asyncio.Future | None = None + + # Will be set on close + self._closing: asyncio.Future[None] | None = None + + self._metadata_snapshot: list[Assignment] = [] + + def start(self) -> None: + """Must be called after creating SchemaCoordinator object to initialize + futures and start the coordination task. + """ + self.rejoin_needed_fut = create_future() + self._coordinator_dead_fut = create_future() + self._closing = create_future() + self._coordination_task = create_task(self._coordination_routine()) + + # update initial subscription state using currently known metadata + self._handle_metadata_update(self._cluster) + self._cluster.add_listener(self._handle_metadata_update) + + def ready(self) -> bool: + return self._ready + + async def send_req(self, request: Request) -> Response: + """Send request to coordinator node. In case the coordinator is not + ready a respective error will be raised. + """ + node_id = self.coordinator_id + if node_id is None: + raise Errors.GroupCoordinatorNotAvailableError() + try: + resp = await self._client.send(node_id, request, group=ConnectionGroup.COORDINATION) + except Errors.KafkaError as err: + LOG.error( + "Error sending %s to node %s [%s] -- marking coordinator dead", request.__class__.__name__, node_id, err + ) + self.coordinator_dead() + raise err + return resp + + def check_errors(self) -> None: + """Check if coordinator is well and no authorization or unrecoverable + errors occurred + """ + assert self._coordination_task is not None + if self._coordination_task.done(): + self._coordination_task.result() + if self._error_consumed_fut is not None: + self._error_consumed_fut.set_result(None) + self._error_consumed_fut = None + if self._pending_exception is not None: + exc = self._pending_exception + self._pending_exception = None + raise exc + + async def _push_error_to_user(self, exc: BaseException) -> Coroutine[Any, Any, tuple[set[Any], set[Any]]]: + """Most critical errors are not something we can continue execution + without user action. Well right now we just drop the Consumer, but + java client would certainly be ok if we just poll another time, maybe + it will need to rejoin, but not fail with GroupAuthorizationFailedError + till the end of days... + XXX: Research if we can't have the same error several times. For + example if user gets GroupAuthorizationFailedError and adds + permission for the group, would Consumer work right away or would + still raise exception a few times? + """ + exc = copy.copy(exc) + self._pending_exception = exc + self._error_consumed_fut = create_future() + return asyncio.wait( + [self._error_consumed_fut, self._closing], + return_when=asyncio.FIRST_COMPLETED, + ) + + async def close(self) -> None: + """Close the coordinator, leave the current group + and reset local generation/memberId.""" + if self._closing is None or self._closing.done(): + return + + assert self._coordination_task is not None + self._closing.set_result(None) + # We must let the coordination task properly finish all pending work + if not self._coordination_task.done(): + await self._coordination_task + await self._stop_heartbeat_task() + await self._maybe_leave_group() + + def maybe_leave_group(self) -> asyncio.Task: + task = create_task(self._maybe_leave_group()) + return task + + async def _maybe_leave_group(self) -> None: + if self.generation > 0 and self.group_instance_id is None: + # this is a minimal effort attempt to leave the group. we do not + # attempt any resending if the request fails or times out. + # Note: do not send this leave request if we are running in static + # partition assignment mode (when group_instance_id has been set). + version = 0 if self._client.api_version < (0, 11, 0) else 1 + request = LeaveGroupRequest[version](self.group_id, self.member_id) + try: + await self.send_req(request) + except Errors.KafkaError as err: + LOG.error("LeaveGroup request failed: %s", err) + else: + LOG.info("LeaveGroup request succeeded") + self.reset_generation() + + def _handle_metadata_update(self, _: ClusterMetadata) -> None: + """Schema registry metadata update handler. + + Originally the metadata handler was defined in the + aiokafka.consumer.group_coordinator.BaseCoordinator. + """ + metadata_snapshot = self.get_metadata_snapshot() + if self._metadata_snapshot != metadata_snapshot: + LOG.info("Metadata for topic has changed from %s to %s. ", self._metadata_snapshot, metadata_snapshot) + self._metadata_snapshot = metadata_snapshot + self._on_metadata_change() + + def _on_metadata_change(self) -> None: + """Schema registry specific behavior on metadata change is to request group rejoin.""" + self.request_rejoin() + + def get_metadata_snapshot(self) -> list[Assignment]: + """Get schema registry specific metadata.""" + assert self.scheme is not None + return [ + Assignment( + member_id="v0", + metadata=json_encode( + get_member_configuration( + host=self.hostname, + port=self.port, + scheme=self.scheme, + master_eligibility=self.master_eligibility, + ), + binary=True, + compact=True, + ), + ) + ] + + def _unpack_join_group_response( + self, + response: JoinGroupResponse_v0 | JoinGroupResponse_v1 | JoinGroupResponse_v2 | JoinGroupResponse_v5, + ) -> JoinGroupResponseData: + """Helper function to unpack the group join response data. + + The response versions are fixed to 0, 1, 2 and 5. + See Kafka Protocol guide. + """ + return JoinGroupResponseData( + leader_id=response.leader_id, + protocol=response.group_protocol, + members=[JoinGroupMemberData(member_id=record[0], member_data=record[2]) for record in response.members], + ) + + async def perform_assignment( + self, + response: JoinGroupResponse_v0 | JoinGroupResponse_v1 | JoinGroupResponse_v2 | JoinGroupResponse_v5, + ) -> Sequence[Assignment]: + """Schema registry specific assignment handler. + + Selects the primary Karapace instance. + This logic is run only on group leader instance. + """ + response_data = self._unpack_join_group_response(response=response) + LOG.info( + "Creating assignment: %r, protocol: %r, members: %r", + response_data.leader_id, + response_data.protocol, + response_data.members, + ) + self.are_we_master = None + error = NO_ERROR + urls = {} + fallback_urls = {} + for member in response_data.members: + member_identity = json_decode(member.member_data, MemberIdentity) + if member_identity["master_eligibility"] is True: + urls[get_member_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = ( + member.member_id, + member.member_data, + ) + else: + fallback_urls[ + get_member_url(member_identity["scheme"], member_identity["host"], member_identity["port"]) + ] = (member.member_id, member.member_data) + if len(urls) > 0: + chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = urls[chosen_url] + else: + # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be + chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = fallback_urls[chosen_url] + member_identity = json_decode(member_data, MemberIdentity) + identity = get_member_configuration( + host=member_identity["host"], + port=member_identity["port"], + scheme=member_identity["scheme"], + master_eligibility=member_identity["master_eligibility"], + ) + LOG.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) + + assignments: list[Assignment] = [] + for member in response_data.members: + member_data = json_encode( + {"master": schema_master_id, "master_identity": identity, "error": error}, binary=True, compact=True + ) + assignments.append(Assignment(member_id=member.member_id, metadata=member_data)) + return assignments + + async def _on_join_complete( + self, + generation: int, + member_id: str, + assignment: Assignment, + ) -> None: + """Schema registry specific handling of join complete. + + Sets the primary url and primary flag based on the assignment. + Marks the SchemaCoordinator ready. + """ + LOG.info( + "Join complete, generation %r, member_id: %r, protocol: %r, member_assignment_bytes: %r", + generation, + member_id, + assignment.member_id, + assignment.metadata, + ) + member_assignment = json_decode(assignment.metadata, MemberAssignment) + member_identity = member_assignment["master_identity"] + + master_url = get_member_url( + scheme=member_identity["scheme"], + host=member_identity["host"], + port=member_identity["port"], + ) + # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real + if member_assignment["master"] == member_id and member_identity["master_eligibility"]: + self.master_url = master_url + self.are_we_master = True + elif not member_identity["master_eligibility"]: + self.master_url = None + self.are_we_master = False + else: + self.master_url = master_url + self.are_we_master = False + self._ready = True + return None + + def coordinator_dead(self) -> None: + """Mark the current coordinator as dead. + NOTE: this will not force a group rejoin. If new coordinator is able to + recognize this member we will just continue with current generation. + """ + if self._coordinator_dead_fut is not None and self.coordinator_id is not None: + LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id) + self.coordinator_id = None + self._coordinator_dead_fut.set_result(None) + + def reset_generation(self) -> None: + """Coordinator did not recognize either generation or member_id. Will + need to re-join the group. + """ + self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + self.request_rejoin() + + def request_rejoin(self) -> None: + assert self.rejoin_needed_fut is not None + if not self.rejoin_needed_fut.done(): + self.rejoin_needed_fut.set_result(None) + + def need_rejoin(self) -> bool: + """Check whether the group should be rejoined + + Returns: + bool: True if consumer should rejoin group, False otherwise + """ + return self.rejoin_needed_fut is None or self.rejoin_needed_fut.done() + + async def ensure_coordinator_known(self) -> None: + """Block until the coordinator for this group is known.""" + if self.coordinator_id is not None: + return + + assert self._closing is not None + async with self._coordinator_lookup_lock: + retry_backoff = self._retry_backoff_ms / 1000 + while self.coordinator_id is None and not self._closing.done(): + try: + coordinator_id = await self._client.coordinator_lookup(CoordinationType.GROUP, self.group_id) + except Errors.GroupAuthorizationFailedError as exc: + err = Errors.GroupAuthorizationFailedError(self.group_id) + raise err from exc + except Errors.KafkaError as err: + LOG.error("Group Coordinator Request failed: %s", err) + if err.retriable: + await self._client.force_metadata_update() + await asyncio.sleep(retry_backoff) + continue + raise + + # Try to connect to confirm that the connection can be + # established. + ready = await self._client.ready(coordinator_id, group=ConnectionGroup.COORDINATION) + if not ready: + await asyncio.sleep(retry_backoff) + continue + + self.coordinator_id = coordinator_id + self._coordinator_dead_fut = create_future() + LOG.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) + + async def _coordination_routine(self) -> None: + try: + await self.__coordination_routine() + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception as exc: + LOG.error("Unexpected error in coordinator routine", exc_info=True) + kafka_exc = Errors.KafkaError(f"Unexpected error during coordination {exc!r}") + raise kafka_exc from exc + + async def __coordination_routine(self) -> None: + """Main background task, that keeps track of changes in group + coordination. This task will spawn/stop heartbeat task and perform + autocommit in times it's safe to do so. + """ + assert self._closing is not None + assert self._coordinator_dead_fut is not None + assert self.rejoin_needed_fut is not None + while not self._closing.done(): + # Ensure active group + try: + await self.ensure_coordinator_known() + if self.need_rejoin(): + new_assignment = await self.ensure_active_group() + if not new_assignment: + continue + except Errors.KafkaError as exc: + # The ignore of returned coroutines need to be checked. + # aiokafka also discards the return value + await self._push_error_to_user(exc) # type: ignore[unused-coroutine] + continue + + futures = [ + self._closing, # Will exit fast if close() called + self._coordinator_dead_fut, + ] + # No assignments. + # We don't want a heavy loop here. + # NOTE: metadata changes are for partition count and pattern + # subscription, which is irrelevant in case of user assignment. + futures.append(self.rejoin_needed_fut) + + # We should always watch for other task raising critical or + # unexpected errors, so we attach those as futures too. We will + # check them right after wait. + if self._heartbeat_task: + futures.append(self._heartbeat_task) + if self._commit_refresh_task: + futures.append(self._commit_refresh_task) + + _, _ = await asyncio.wait( + futures, + return_when=asyncio.FIRST_COMPLETED, + ) + + # Handle exceptions in other background tasks + for task in [self._heartbeat_task, self._commit_refresh_task]: + if task and task.done(): + task_exception = task.exception() + if task_exception: + # The ignore of returned coroutines need to be checked. + # aiokafka also discards the return value + await self._push_error_to_user(task_exception) # type: ignore[unused-coroutine] + + async def ensure_active_group(self) -> bool: + # due to a race condition between the initial metadata + # fetch and the initial rebalance, we need to ensure that + # the metadata is fresh before joining initially. This + # ensures that we have matched the pattern against the + # cluster's topics at least once before joining. + # Also the rebalance can be issued by another node, that + # discovered a new topic, which is still unknown to this + # one. + await self._client.force_metadata_update() + + # NOTE: we did not stop heartbeat task before to keep the + # member alive during the callback, as it can commit offsets. + # See the ``RebalanceInProgressError`` case in heartbeat + # handling. + await self._stop_heartbeat_task() + + # We will only try to perform the rejoin once. If it fails, + # we will spin this loop another time, checking for coordinator + # and subscription changes. + # NOTE: We do re-join in sync. The group rebalance will fail on + # subscription change and coordinator failure by itself and + # this way we don't need to worry about racing or cancellation + # issues that could occur if re-join were to be a task. + success = await self._do_rejoin_group() + if success: + self._start_heartbeat_task() + return True + return False + + def _start_heartbeat_task(self) -> None: + if self._heartbeat_task is None: + self._heartbeat_task = create_task(self._heartbeat_routine()) + + async def _stop_heartbeat_task(self) -> None: + if self._heartbeat_task is not None: + if not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + await self._heartbeat_task + self._heartbeat_task = None + + async def _heartbeat_routine(self) -> None: + last_ok_heartbeat = time.monotonic() + hb_interval = self._heartbeat_interval_ms / 1000 + session_timeout = self._session_timeout_ms / 1000 + retry_backoff = self._retry_backoff_ms / 1000 + sleep_time = hb_interval + + # There is no point to heartbeat after Broker stopped recognizing + # this consumer, so we stop after resetting generation. + while self.member_id != JoinGroupRequest[0].UNKNOWN_MEMBER_ID: + try: + await asyncio.sleep(sleep_time) + await self.ensure_coordinator_known() + + t0 = time.monotonic() + success = await self._do_heartbeat() + except asyncio.CancelledError: + break + + # NOTE: We let all other errors propagate up to coordination + # routine + + if success: + last_ok_heartbeat = time.monotonic() + sleep_time = max((0, hb_interval - last_ok_heartbeat + t0)) + else: + sleep_time = retry_backoff + + session_time = time.monotonic() - last_ok_heartbeat + if session_time > session_timeout: + # the session timeout has expired without seeing a successful + # heartbeat, so we should probably make sure the coordinator + # is still healthy. + LOG.error("Heartbeat session expired - marking coordinator dead") + self.coordinator_dead() + + LOG.debug("Stopping heartbeat task") + + async def _do_heartbeat(self) -> bool: + version = 0 if self._client.api_version < (0, 11, 0) else 1 + request = HeartbeatRequest[version](self.group_id, self.generation, self.member_id) + LOG.debug("Heartbeat: %s[%s] %s", self.group_id, self.generation, self.member_id) + + # _send_req may fail with error like `RequestTimedOutError` + # we need to catch it so coordinator_routine won't fail + try: + resp = await self.send_req(request) + except Errors.KafkaError as err: + LOG.error("Heartbeat send request failed: %s. Will retry.", err) + return False + error_type = Errors.for_code(resp.error_code) + if error_type is Errors.NoError: + LOG.debug("Received successful heartbeat response for group %s", self.group_id) + return True + if error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): + LOG.warning( + "Heartbeat failed for group %s: coordinator (node %s) is either not started or not valid", + self.group_id, + self.coordinator_id, + ) + self.coordinator_dead() + elif error_type is Errors.RebalanceInProgressError: + LOG.warning("Heartbeat failed for group %s because it is rebalancing", self.group_id) + self.request_rejoin() + # it is valid to continue heartbeating while the group is + # rebalancing. This ensures that the coordinator keeps the + # member in the group for as long as the duration of the + # rebalance timeout. If we stop sending heartbeats, + # however, then the session timeout may expire before we + # can rejoin. + return True + elif error_type is Errors.IllegalGenerationError: + LOG.warning("Heartbeat failed for group %s: generation id is not current.", self.group_id) + self.reset_generation() + elif error_type is Errors.UnknownMemberIdError: + LOG.warning("Heartbeat failed: local member_id was not recognized; resetting and re-joining group") + self.reset_generation() + elif error_type is Errors.GroupAuthorizationFailedError: + raise error_type(self.group_id) + else: + kafka_error = Errors.KafkaError(f"Unexpected exception in heartbeat task: {error_type()!r}") + LOG.error("Heartbeat failed: %r", kafka_error) + raise kafka_error + return False + + async def _do_rejoin_group(self) -> bool: + rebalance = SchemaCoordinatorGroupRebalance( + self, + self.group_id, + self.coordinator_id, + self._session_timeout_ms, + self._retry_backoff_ms, + ) + assignment = await rebalance.perform_group_join() + + if assignment is None: + # wait backoff and try again + await asyncio.sleep(self._retry_backoff_ms / 1000) + return False + + await self._on_join_complete(generation=self.generation, member_id=self.member_id, assignment=assignment) + return True + + +class SchemaCoordinatorGroupRebalance: + """ An adapter, that encapsulates rebalance logic + On how to handle cases read in https://cwiki.apache.org/confluence/\ + display/KAFKA/Kafka+Client-side+Assignment+Proposal + """ + + def __init__( + self, + coordinator: SchemaCoordinator, + group_id: str, + coordinator_id: int | None, + session_timeout_ms: int, + retry_backoff_ms: int, + ) -> None: + self._coordinator: Final = coordinator + self.group_id: Final = group_id + self.coordinator_id: Final = coordinator_id + self._session_timeout_ms: Final = session_timeout_ms + self._retry_backoff_ms: Final = retry_backoff_ms + self._api_version: Final = self._coordinator._client.api_version + self._rebalance_timeout_ms: Final = self._coordinator._rebalance_timeout_ms + + async def perform_group_join(self) -> Assignment | None: + """Join the group and return the assignment for the next generation. + + This function handles both JoinGroup and SyncGroup, delegating to + perform_assignment() if elected as leader by the coordinator node. + + Returns encoded-bytes assignment returned from the group leader + """ + # send a join group request to the coordinator + LOG.info("(Re-)joining group %s", self.group_id) + + metadata_list = [assignment.to_tuple() for assignment in self._coordinator.get_metadata_snapshot()] + # for KIP-394 we may have to send a second join request + try_join = True + while try_join: + try_join = False + + if self._api_version < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self._session_timeout_ms, + self._coordinator.member_id, + SCHEMA_COORDINATOR_PROTOCOL, + metadata_list, + ) + elif self._api_version < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + SCHEMA_COORDINATOR_PROTOCOL, + metadata_list, + ) + elif self._api_version < (2, 3, 0): + request = JoinGroupRequest[2]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + SCHEMA_COORDINATOR_PROTOCOL, + metadata_list, + ) + else: + request = JoinGroupRequest[3]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + self._coordinator.group_instance_id, + SCHEMA_COORDINATOR_PROTOCOL, + metadata_list, + ) + + # create the request for the coordinator + LOG.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) + try: + response = await self._coordinator.send_req(request) + except Errors.KafkaError: + # Return right away. It's a connection error, so backoff will be + # handled by coordinator lookup + return None + + error_type = Errors.for_code(response.error_code) + + if error_type is Errors.MemberIdRequired: + self._coordinator.member_id = response.member_id + try_join = True + + if error_type is Errors.NoError: + LOG.debug("Join group response %s", response) + self._coordinator.member_id = response.member_id + self._coordinator.generation = response.generation_id + protocol = response.group_protocol + LOG.info( + "Joined group '%s' (generation %s) with member_id %s", + self.group_id, + response.generation_id, + response.member_id, + ) + + if response.leader_id == response.member_id: + LOG.info("Elected group leader -- performing partition assignments using %s", protocol) + assignment_bytes = await self._on_join_leader(response) + else: + assignment_bytes = await self._on_join_follower() + + if assignment_bytes is None: + return None + return Assignment(member_id=protocol, metadata=assignment_bytes) + if error_type is Errors.GroupLoadInProgressError: + # Backoff and retry + LOG.debug( + "Attempt to join group %s rejected since coordinator %s is loading the group.", + self.group_id, + self.coordinator_id, + ) + await asyncio.sleep(self._retry_backoff_ms / 1000) + elif error_type is Errors.UnknownMemberIdError: + # reset the member id and retry immediately + self._coordinator.reset_generation() + LOG.debug("Attempt to join group %s failed due to unknown member id", self.group_id) + elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): + # Coordinator changed we should be able to find it immediately + err = error_type() + self._coordinator.coordinator_dead() + LOG.debug("Attempt to join group %s failed due to obsolete coordinator information: %s", self.group_id, err) + elif error_type in ( + Errors.InconsistentGroupProtocolError, + Errors.InvalidSessionTimeoutError, + Errors.InvalidGroupIdError, + ): + err = error_type() + LOG.error("Attempt to join group failed due to fatal error: %s", err) + raise err + elif error_type is Errors.GroupAuthorizationFailedError: + raise error_type(self.group_id) + else: + err = error_type() + LOG.error("Unexpected error in join group '%s' response: %s", self.group_id, err) + raise Errors.KafkaError(repr(err)) + return None + + async def _on_join_follower(self) -> bytes | None: + # send follower's sync group with an empty assignment + LOG.info("Joined as follower.") + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + [], + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator.group_instance_id, + [], + ) + LOG.debug( + "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request + ) + return await self._send_sync_group_request(request) + + async def _on_join_leader(self, response: JoinGroupResponse_v0) -> bytes | None: + """ + Perform leader synchronization and send back the assignment + for the group via SyncGroupRequest + + Arguments: + response (JoinResponse): broker response to parse + + Returns: + Future: resolves to member assignment encoded-bytes + """ + try: + group_assignment = await self._coordinator.perform_assignment(response) + except Exception as e: + raise Errors.KafkaError(repr(e)) + + assignment_req = [] + for assignment in group_assignment: + assignment_req.append((assignment.member_id, assignment.metadata)) + + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + assignment_req, + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator.group_instance_id, + assignment_req, + ) + + LOG.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) + return await self._send_sync_group_request(request) + + async def _send_sync_group_request( + self, + request: SyncGroupRequest_v0 | SyncGroupRequest_v1 | SyncGroupRequest_v3, + ) -> bytes | None: + # We need to reset the rejoin future right after the assignment to + # capture metadata changes after join group was performed. We do not + # set it directly after JoinGroup to avoid a false rejoin in case + # ``perform_assignment()`` does a metadata update. + self._coordinator.rejoin_needed_fut = create_future() + req_generation = self._coordinator.generation + req_member_id = self._coordinator.member_id + + try: + response = await self._coordinator.send_req(request) + except Errors.KafkaError: + # We lost connection to coordinator. No need to try and finish this + # group join, just rejoin again. + self._coordinator.request_rejoin() + return None + + error_type = Errors.for_code(response.error_code) + if error_type is Errors.NoError: + LOG.info("Successfully synced group %s with generation %s", self.group_id, self._coordinator.generation) + # make sure the right member_id/generation is set in case they changed + # while the rejoin was taking place + self._coordinator.generation = req_generation + self._coordinator.member_id = req_member_id + return response.member_assignment + + # Error case + self._coordinator.request_rejoin() + if error_type is Errors.RebalanceInProgressError: + LOG.debug("SyncGroup for group %s failed due to group rebalance", self.group_id) + elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): + err = error_type() + LOG.debug("SyncGroup for group %s failed due to %s,", self.group_id, err) + self._coordinator.reset_generation() + elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): + err = error_type() + LOG.debug("SyncGroup for group %s failed due to %s", self.group_id, err) + self._coordinator.coordinator_dead() + elif error_type is Errors.GroupAuthorizationFailedError: + raise error_type(self.group_id) + else: + err = error_type() + LOG.error("Unexpected error from SyncGroup: %s", err) + raise Errors.KafkaError(repr(err)) + + return None diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py deleted file mode 100644 index 9ff823290..000000000 --- a/karapace/master_coordinator.py +++ /dev/null @@ -1,297 +0,0 @@ -""" -karapace - master coordinator - -Copyright (c) 2023 Aiven Ltd -See LICENSE for details -""" -from dataclasses import dataclass -from kafka.coordinator.base import BaseCoordinator -from kafka.errors import NoBrokersAvailable, NodeNotReadyError -from kafka.metrics import MetricConfig, Metrics -from karapace import constants -from karapace.config import Config -from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS -from karapace.typing import JsonData, JsonObject -from karapace.utils import json_decode, json_encode, KarapaceKafkaClient -from karapace.version import __version__ -from threading import Event, Thread -from typing import Any, Final, List, Optional, Sequence, Tuple -from typing_extensions import TypedDict - -import logging -import time - -__all__ = ("MasterCoordinator",) - -# SR group errors -NO_ERROR: Final = 0 -DUPLICATE_URLS: Final = 1 -LOG = logging.getLogger(__name__) - - -class MemberIdentity(TypedDict): - host: str - port: int - scheme: str - master_eligibility: bool - - -class MemberAssignment(TypedDict): - master: str - master_identity: MemberIdentity - - -def get_member_url(scheme: str, host: str, port: int) -> str: - return f"{scheme}://{host}:{port}" - - -def get_member_configuration(*, host: str, port: int, scheme: str, master_eligibility: bool) -> JsonData: - return { - "version": 2, - "karapace_version": __version__, - "host": host, - "port": port, - "scheme": scheme, - "master_eligibility": master_eligibility, - } - - -@dataclass -class SchemaCoordinatorStatus: - is_primary: Optional[bool] - is_primary_eligible: bool - primary_url: Optional[str] - is_running: bool - group_generation_id: int - - -class SchemaCoordinator(BaseCoordinator): - are_we_master: Optional[bool] = None - master_url: Optional[str] = None - - def __init__( - self, - client: KarapaceKafkaClient, - metrics: Metrics, - hostname: str, - port: int, - scheme: str, - master_eligibility: bool, - election_strategy: str, - **configs: Any, - ) -> None: - super().__init__(client=client, metrics=metrics, **configs) - self.election_strategy = election_strategy - self.hostname = hostname - self.port = port - self.scheme = scheme - self.master_eligibility = master_eligibility - - def protocol_type(self) -> str: - return "sr" - - def group_protocols(self) -> List[Tuple[str, str]]: - assert self.scheme is not None - return [ - ( - "v0", - json_encode( - get_member_configuration( - host=self.hostname, - port=self.port, - scheme=self.scheme, - master_eligibility=self.master_eligibility, - ), - compact=True, - ), - ) - ] - - def _perform_assignment( - self, - leader_id: str, - protocol: str, - members: Sequence[Tuple[str, str]], - ) -> JsonObject: - LOG.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) - self.are_we_master = None - error = NO_ERROR - urls = {} - fallback_urls = {} - for member_id, member_data in members: - member_identity = json_decode(member_data, MemberIdentity) - if member_identity["master_eligibility"] is True: - urls[get_member_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = ( - member_id, - member_data, - ) - else: - fallback_urls[ - get_member_url(member_identity["scheme"], member_identity["host"], member_identity["port"]) - ] = (member_id, member_data) - if len(urls) > 0: - chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = urls[chosen_url] - else: - # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be - chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = fallback_urls[chosen_url] - member_identity = json_decode(member_data, MemberIdentity) - identity = get_member_configuration( - host=member_identity["host"], - port=member_identity["port"], - scheme=member_identity["scheme"], - master_eligibility=member_identity["master_eligibility"], - ) - LOG.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) - - assignments: JsonObject = {} - for member_id, member_data in members: - assignments[member_id] = json_encode( - {"master": schema_master_id, "master_identity": identity, "error": error}, compact=True - ) - return assignments - - def _on_join_prepare(self, generation: str, member_id: str) -> None: - """Invoked prior to each group join or rejoin.""" - # needs to be implemented in our class for pylint to be satisfied - - def _on_join_complete(self, generation: str, member_id: str, protocol: str, member_assignment_bytes: bytes) -> None: - LOG.info( - "Join complete, generation %r, member_id: %r, protocol: %r, member_assignment_bytes: %r", - generation, - member_id, - protocol, - member_assignment_bytes, - ) - member_assignment = json_decode(member_assignment_bytes, MemberAssignment) - member_identity = member_assignment["master_identity"] - - master_url = get_member_url( - scheme=member_identity["scheme"], - host=member_identity["host"], - port=member_identity["port"], - ) - # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real - if member_assignment["master"] == member_id and member_identity["master_eligibility"]: - self.master_url = master_url - self.are_we_master = True - elif not member_identity["master_eligibility"]: - self.master_url = None - self.are_we_master = False - else: - self.master_url = master_url - self.are_we_master = False - return super()._on_join_complete(generation, member_id, protocol, member_assignment_bytes) - - def _on_join_follower(self) -> None: - LOG.info("We are a follower, not a master") - return super()._on_join_follower() - - -class MasterCoordinator(Thread): - """Handles schema topic creation and master election""" - - def __init__(self, config: Config) -> None: - super().__init__() - self.config = config - self.timeout_ms = 10000 - self.kafka_client: Optional[KarapaceKafkaClient] = None - self.running = True - self.sc: Optional[SchemaCoordinator] = None - metrics_tags = {"client-id": self.config["client_id"]} - metric_config = MetricConfig(samples=2, time_window_ms=30000, tags=metrics_tags) - self._metrics = Metrics(metric_config, reporters=[]) - self.schema_coordinator_ready = Event() - - def init_kafka_client(self) -> bool: - try: - self.kafka_client = KarapaceKafkaClient( - api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, - bootstrap_servers=self.config["bootstrap_uri"], - client_id=self.config["client_id"], - security_protocol=self.config["security_protocol"], - ssl_cafile=self.config["ssl_cafile"], - ssl_certfile=self.config["ssl_certfile"], - ssl_keyfile=self.config["ssl_keyfile"], - sasl_mechanism=self.config["sasl_mechanism"], - sasl_plain_username=self.config["sasl_plain_username"], - sasl_plain_password=self.config["sasl_plain_password"], - metadata_max_age_ms=self.config["metadata_max_age_ms"], - ) - return True - except (NodeNotReadyError, NoBrokersAvailable): - LOG.warning("No Brokers available yet, retrying init_kafka_client()") - time.sleep(2.0) - return False - - def init_schema_coordinator(self) -> None: - session_timeout_ms = self.config["session_timeout_ms"] - assert self.kafka_client is not None - self.sc = SchemaCoordinator( - client=self.kafka_client, - metrics=self._metrics, - hostname=self.config["advertised_hostname"], - port=self.config["advertised_port"], - scheme=self.config["advertised_protocol"], - master_eligibility=self.config["master_eligibility"], - election_strategy=self.config.get("master_election_strategy", "lowest"), - group_id=self.config["group_id"], - session_timeout_ms=session_timeout_ms, - request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS), - ) - self.schema_coordinator_ready.set() - - def get_coordinator_status(self) -> SchemaCoordinatorStatus: - generation = self.sc.generation() if self.sc is not None else None - return SchemaCoordinatorStatus( - is_primary=self.sc.are_we_master if self.sc is not None else None, - is_primary_eligible=self.config["master_eligibility"], - primary_url=self.sc.master_url if self.sc is not None else None, - is_running=self.is_alive(), - group_generation_id=generation.generation_id if generation is not None else -1, - ) - - def get_master_info(self) -> Tuple[Optional[bool], Optional[str]]: - """Return whether we're the master, and the actual master url that can be used if we're not""" - self.schema_coordinator_ready.wait() - assert self.sc is not None - return self.sc.are_we_master, self.sc.master_url - - def close(self) -> None: - LOG.info("Closing master_coordinator") - self.running = False - - def run(self) -> None: - _hb_interval = 3.0 - while self.running: - try: - if not self.kafka_client: - if self.init_kafka_client() is False: - # If Kafka client is not initialized sleep a bit - time.sleep(0.5) - continue - if not self.sc: - self.init_schema_coordinator() - assert self.sc is not None - _hb_interval = self.sc.config["heartbeat_interval_ms"] / 1000 - - self.sc.ensure_active_group() - self.sc.poll_heartbeat() - LOG.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) - # In cases when heartbeat is missed the sleep min sleep time would be 0 - # from `time_to_next_heartbeat`. In that case halve the heartbeat interval for - # some sane sleep instead of running the loop without sleep for a while. - sleep_time = min(_hb_interval, self.sc.time_to_next_heartbeat()) - if not sleep_time: - sleep_time = _hb_interval / 2 - time.sleep(sleep_time) - except: # pylint: disable=bare-except - LOG.exception("Exception in master_coordinator") - time.sleep(1.0) - - if self.sc: - self.sc.close() - - if self.kafka_client: - self.kafka_client.close() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 88600f70c..9bcd69260 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -26,6 +26,7 @@ ) from karapace import constants from karapace.config import Config +from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase @@ -33,7 +34,6 @@ from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode -from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index aa6f1dabc..ab651f076 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -8,6 +8,7 @@ from karapace.compatibility import check_compatibility, CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible from karapace.config import Config +from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency from karapace.errors import ( IncompatibleSchema, @@ -23,7 +24,6 @@ ) from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import KeyFormatter -from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema @@ -97,14 +97,14 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=include_deleted) return list(schema_versions.values()) - def start(self) -> None: - self.mc.start() + async def start(self) -> None: + await self.mc.start() self.schema_reader.start() self.producer.initialize_karapace_producer() async def close(self) -> None: async with AsyncExitStack() as stack: - stack.enter_context(closing(self.mc)) + stack.push_async_callback(self.mc.close) stack.enter_context(closing(self.schema_reader)) stack.enter_context(closing(self.producer)) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0216b5e5e..bbb972d16 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -92,9 +92,9 @@ def __init__(self, config: Config) -> None: self.schema_registry = KarapaceSchemaRegistry(config) self._add_schema_registry_routes() - self.schema_registry.start() self._forward_client = None + self.app.on_startup.append(self._start_schema_registry) self.app.on_startup.append(self._create_forward_client) self.health_hooks.append(self.schema_registry_health) @@ -117,6 +117,10 @@ async def schema_registry_health(self) -> JsonObject: resp["schema_registry_coordinator_generation_id"] = cs.group_generation_id return resp + async def _start_schema_registry(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument + """Callback for aiohttp.Application.on_startup""" + await self.schema_registry.start() + async def _create_forward_client(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument """Callback for aiohttp.Application.on_startup""" self._forward_client = aiohttp.ClientSession(headers={"User-Agent": SERVER_NAME}) diff --git a/mypy.ini b/mypy.ini index 2797672ef..15ab9042f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -77,6 +77,9 @@ ignore_errors = True # dependencies. # - Write your own stubs. You don't need to write stubs for the whole library, # only the parts that Karapace is interacting with. +[mypy-aiokafka.*] +ignore_missing_imports = True + [mypy-kafka.*] ignore_missing_imports = True diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index df08e41d3..2079fcddc 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -8,6 +8,8 @@ accept-types==0.4.1 # via -r requirements.txt aiohttp==3.9.5 # via -r requirements.txt +aiokafka==0.10.0 + # via -r requirements.txt aiosignal==1.3.1 # via # -r requirements.txt @@ -20,6 +22,7 @@ async-timeout==4.0.3 # via # -r requirements.txt # aiohttp + # aiokafka attrs==23.2.0 # via # -r requirements.txt @@ -87,7 +90,7 @@ geventhttpclient==2.0.12 # via locust greenlet==3.0.3 # via gevent -hypothesis==6.101.0 +hypothesis==6.102.4 # via -r requirements-dev.in idna==3.7 # via @@ -144,7 +147,10 @@ multidict==6.0.5 networkx==3.1 # via -r requirements.txt packaging==24.0 - # via pytest + # via + # -r requirements.txt + # aiokafka + # pytest pdbpp==0.10.3 # via -r requirements-dev.in pkgutil-resolve-name==1.3.10 @@ -169,7 +175,7 @@ pyjwt==2.8.0 # via -r requirements.txt pyrepl==0.9.0 # via fancycompleter -pytest==8.2.0 +pytest==8.2.1 # via # -r requirements-dev.in # pytest-random-order @@ -205,7 +211,7 @@ rpds-py==0.18.1 # -r requirements.txt # jsonschema # referencing -sentry-sdk==2.1.1 +sentry-sdk==2.2.0 # via -r requirements-dev.in six==1.16.0 # via @@ -230,7 +236,7 @@ typing-extensions==4.11.0 # -r requirements.txt # anyio # rich -ujson==5.9.0 +ujson==5.10.0 # via -r requirements.txt urllib3==2.2.1 # via @@ -251,14 +257,14 @@ yarl==1.9.4 # via # -r requirements.txt # aiohttp -zipp==3.18.1 +zipp==3.18.2 # via # -r requirements.txt # importlib-metadata # importlib-resources zope-event==5.0 # via gevent -zope-interface==6.3 +zope-interface==6.4 # via gevent zstandard==0.22.0 # via -r requirements.txt diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 6d6932002..6fd26a934 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -27,7 +27,7 @@ rpds-py==0.18.1 # -c requirements-dev.txt # -c requirements.txt # referencing -sentry-sdk==2.1.1 +sentry-sdk==2.2.0 # via # -c requirements-dev.txt # -r requirements-typing.in diff --git a/requirements/requirements.in b/requirements/requirements.in index 8cddb1c38..d2ef94613 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -1,6 +1,7 @@ # PyPI dependencies accept-types<1 aiohttp<4 +aiokafka==0.10.0 confluent-kafka==2.3.0 isodate<1 jsonschema<5 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 2cac0123b..e74b9e1e7 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -8,12 +8,16 @@ accept-types==0.4.1 # via -r requirements.in aiohttp==3.9.5 # via -r requirements.in +aiokafka==0.10.0 + # via -r requirements.in aiosignal==1.3.1 # via aiohttp anyio==4.3.0 # via watchfiles async-timeout==4.0.3 - # via aiohttp + # via + # aiohttp + # aiokafka attrs==23.2.0 # via # aiohttp @@ -61,6 +65,8 @@ multidict==6.0.5 # yarl networkx==3.1 # via -r requirements.in +packaging==24.0 + # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema protobuf==3.20.3 @@ -96,7 +102,7 @@ typing-extensions==4.11.0 # -r requirements.in # anyio # rich -ujson==5.9.0 +ujson==5.10.0 # via -r requirements.in watchfiles==0.21.0 # via -r requirements.in @@ -104,7 +110,7 @@ xxhash==3.4.1 # via -r requirements.in yarl==1.9.4 # via aiohttp -zipp==3.18.1 +zipp==3.18.2 # via importlib-resources zstandard==0.22.0 # via -r requirements.in diff --git a/tests/integration/test_karapace.py b/tests/integration/test_karapace.py index 2928d7c06..65d99f128 100644 --- a/tests/integration/test_karapace.py +++ b/tests/integration/test_karapace.py @@ -5,9 +5,11 @@ from contextlib import ExitStack from karapace.config import set_config_defaults from pathlib import Path +from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process from tests.utils import popen_karapace_all +from typing import Iterator import json import socket @@ -16,6 +18,7 @@ def test_regression_server_must_exit_on_exception( port_range: PortRangeInclusive, tmp_path: Path, + kafka_servers: Iterator[KafkaServers], ) -> None: """Regression test for Karapace properly exiting. @@ -29,6 +32,7 @@ def test_regression_server_must_exit_on_exception( config = set_config_defaults( { + "bootstrap_uri": kafka_servers.bootstrap_servers, "karapace_registry": True, "port": port, } diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index bd803c1fd..225539f8d 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -4,9 +4,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from contextlib import closing from karapace.config import set_config_defaults -from karapace.master_coordinator import MasterCoordinator +from karapace.coordinator.master_coordinator import MasterCoordinator from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import PortRangeInclusive from tests.utils import new_random_name @@ -15,12 +14,11 @@ import json import pytest import requests -import time -def init_admin(config): +async def init_admin(config): mc = MasterCoordinator(config=config) - mc.start() + await mc.start() return mc @@ -30,17 +28,17 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url) + return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) def has_master(mc: MasterCoordinator) -> bool: """True if `mc` has a master.""" - return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url) + return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) @pytest.mark.timeout(60) # Github workflows need a bit of extra time @pytest.mark.parametrize("strategy", ["lowest", "highest"]) -def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None: +async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None: # Use random port to allow for parallel runs. with port_range.allocate_port() as port1, port_range.allocate_port() as port2: port_aa, port_bb = sorted((port1, port2)) @@ -69,7 +67,9 @@ def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaSe } ) - with closing(init_admin(config_aa)) as mc_aa, closing(init_admin(config_bb)) as mc_bb: + mc_aa = await init_admin(config_aa) + mc_bb = await init_admin(config_bb) + try: if strategy == "lowest": master = mc_aa slave = mc_bb @@ -79,20 +79,27 @@ def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaSe # Wait for the election to happen while not is_master(master): - time.sleep(0.3) + await asyncio.sleep(0.5) while not has_master(slave): - time.sleep(0.3) + await asyncio.sleep(0.5) # Make sure the end configuration is as expected master_url = f'http://{master.config["host"]}:{master.config["port"]}' - assert master.sc.election_strategy == strategy - assert slave.sc.election_strategy == strategy - assert master.sc.master_url == master_url - assert slave.sc.master_url == master_url - - -def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: + assert master.schema_coordinator is not None + assert slave.schema_coordinator is not None + assert master.schema_coordinator.election_strategy == strategy + assert slave.schema_coordinator.election_strategy == strategy + assert master.schema_coordinator.master_url == master_url + assert slave.schema_coordinator.master_url == master_url + finally: + print(f"expected: {master_url}") + print(slave.schema_coordinator.master_url) + await mc_aa.close() + await mc_bb.close() + + +async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: """Test that primary selection works when mixed set of roles is configured for Karapace instances. The Kafka group coordinator leader can be any node, it has no relation to Karapace primary role eligibility. @@ -134,27 +141,32 @@ def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, port_ra } ) - with closing(init_admin(config_non_primary_1)) as non_primary_1, closing( - init_admin(config_non_primary_2) - ) as non_primary_2, closing(init_admin(config_primary)) as primary: + non_primary_1 = await init_admin(config_non_primary_1) + non_primary_2 = await init_admin(config_non_primary_2) + primary = await init_admin(config_primary) + try: # Wait for the election to happen while not is_master(primary): - time.sleep(0.3) + await asyncio.sleep(0.5) while not has_master(non_primary_1): - time.sleep(0.3) + await asyncio.sleep(0.5) while not has_master(non_primary_2): - time.sleep(0.3) + await asyncio.sleep(0.5) # Make sure the end configuration is as expected primary_url = f'http://{primary.config["host"]}:{primary.config["port"]}' - assert primary.sc.master_url == primary_url - assert non_primary_1.sc.master_url == primary_url - assert non_primary_2.sc.master_url == primary_url + assert primary.schema_coordinator.master_url == primary_url + assert non_primary_1.schema_coordinator.master_url == primary_url + assert non_primary_2.schema_coordinator.master_url == primary_url + finally: + await non_primary_1.close() + await non_primary_2.close() + await primary.close() -def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: +async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: client_id = new_random_name("master_selection_") group_id = new_random_name("group_id") @@ -170,14 +182,17 @@ def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeIn } ) - with closing(init_admin(config_aa)) as mc: + mc = await init_admin(config_aa) + try: # Wait for the election to happen, ie. flag is not None - while not mc.sc or mc.sc.are_we_master is None: - time.sleep(0.3) + while not mc.schema_coordinator or mc.schema_coordinator.are_we_master is None: + await asyncio.sleep(0.5) # Make sure the end configuration is as expected - assert mc.sc.are_we_master is False - assert mc.sc.master_url is None + assert mc.schema_coordinator.are_we_master is False + assert mc.schema_coordinator.master_url is None + finally: + await mc.close() async def test_schema_request_forwarding(registry_async_pair): diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py new file mode 100644 index 000000000..7c7697065 --- /dev/null +++ b/tests/integration/test_schema_coordinator.py @@ -0,0 +1,789 @@ +""" +karapace - schema coordinator + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details + +Tests are adapted from aiokafka.tests.test_coordinator +""" +from __future__ import annotations + +from aiokafka.client import AIOKafkaClient, ConnectionGroup, CoordinationType +from aiokafka.cluster import ClusterMetadata +from aiokafka.protocol.api import Response +from aiokafka.protocol.group import ( + HeartbeatRequest_v0 as HeartbeatRequest, + JoinGroupRequest_v0 as JoinGroupRequest, + LeaveGroupRequest_v0 as LeaveGroupRequest, + SyncGroupResponse_v0 as SyncGroupResponse, +) +from aiokafka.util import create_future, create_task +from karapace.coordinator.schema_coordinator import Assignment, SchemaCoordinator, SchemaCoordinatorGroupRebalance +from karapace.utils import json_encode +from karapace.version import __version__ +from tests.integration.utils.kafka_server import KafkaServers +from typing import AsyncGenerator, Iterator +from unittest import mock + +import aiokafka.errors as Errors +import asyncio +import contextlib +import logging +import pytest + +UNKNOWN_MEMBER_ID = JoinGroupRequest.UNKNOWN_MEMBER_ID + +LOG = logging.getLogger(__name__) + + +@pytest.fixture(scope="function", name="mocked_client") +def fixture_mocked_aiokafka_client() -> Iterator[AIOKafkaClient]: + mocked_client = mock.MagicMock(spec=AIOKafkaClient) + mocked_client.cluster = ClusterMetadata() + yield mocked_client + + +@pytest.fixture(scope="function", name="coordinator") +async def fixture_admin( + loop: asyncio.AbstractEventLoop, mocked_client: AIOKafkaClient # pylint: disable=unused-argument +) -> AsyncGenerator: + coordinator = SchemaCoordinator( + mocked_client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + retry_backoff_ms=10, + ) + yield coordinator + await coordinator.close() + + +async def _get_client(kafka_servers: KafkaServers) -> AIOKafkaClient: + while True: + try: + client = AIOKafkaClient(bootstrap_servers=",".join(kafka_servers.bootstrap_servers)) + await client.bootstrap() + break + except: # pylint: disable=bare-except + LOG.exception("Kafka client bootstrap failed.") + await asyncio.sleep(0.5) + return client + + +@pytest.fixture(scope="function", name="client") +async def get_client( + loop: asyncio.AbstractEventLoop, kafka_servers: KafkaServers # pylint: disable=unused-argument +) -> AsyncGenerator: + client = await _get_client(kafka_servers) + yield client + await client.close() + + +@pytest.mark.parametrize("primary_selection_strategy", ["highest", "lowest"]) +async def test_coordinator_workflow( + primary_selection_strategy: str, + client: AIOKafkaClient, + kafka_servers: KafkaServers, +) -> None: + # Check if 2 coordinators will coordinate rebalances correctly + # Check if the initial group join is performed correctly with minimal + # setup + coordinator = SchemaCoordinator( + client, + "test-host-1", + 10101, + "https", + True, + primary_selection_strategy, + "test-group", + session_timeout_ms=10000, + heartbeat_interval_ms=500, + retry_backoff_ms=100, + ) + coordinator.start() + assert coordinator.coordinator_id is None + while not coordinator.ready(): + await asyncio.sleep(0.5) + assert coordinator.coordinator_id is not None + + await coordinator.ensure_coordinator_known() + assert coordinator.coordinator_id is not None + + assert coordinator.are_we_master + + # Check if adding an additional coordinator will rebalance correctly + client2 = await _get_client(kafka_servers=kafka_servers) + coordinator2 = SchemaCoordinator( + client2, + "test-host-2", + 10100, + "https", + True, + primary_selection_strategy, + "test-group", + session_timeout_ms=10000, + heartbeat_interval_ms=500, + retry_backoff_ms=100, + ) + coordinator2.start() + assert coordinator2.coordinator_id is None + + while not coordinator2.ready(): + await asyncio.sleep(0.5) + assert coordinator2.coordinator_id is not None + + await coordinator2.ensure_coordinator_known() + assert coordinator2.coordinator_id is not None + + # Helper variables to distinguish the expected primary and secondary + primary = coordinator2 if primary_selection_strategy == "highest" else coordinator + primary_client = client2 if primary_selection_strategy == "highest" else client + secondary = coordinator if primary_selection_strategy == "highest" else coordinator2 + secondary_client = client if primary_selection_strategy == "highest" else client2 + + assert primary.are_we_master + assert not secondary.are_we_master + + # Check is closing the primary coordinator will rebalance the secondary to change to primary + await primary.close() + await primary_client.close() + + while not secondary.are_we_master: + await asyncio.sleep(0.5) + assert secondary.are_we_master + await secondary.close() + await secondary_client.close() + + +async def test_failed_group_join(mocked_client: AIOKafkaClient, coordinator: SchemaCoordinator) -> None: + coordinator.start() + assert coordinator._coordination_task is not None # pylint: disable=protected-access + assert coordinator._client is not None # pylint: disable=protected-access + + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + coordinator.coordinator_id = 15 + + async def _on_join_leader_test(_: Response) -> bytes | None: + return b"123" + + _on_join_leader_mock = mock.Mock() + _on_join_leader_mock.side_effect = _on_join_leader_test + + async def do_rebalance() -> Assignment | None: + rebalance = SchemaCoordinatorGroupRebalance( + coordinator, + coordinator.group_id, + coordinator.coordinator_id, + coordinator._session_timeout_ms, # pylint: disable=protected-access + coordinator._retry_backoff_ms, # pylint: disable=protected-access + ) + rebalance._on_join_leader = _on_join_leader_mock # pylint: disable=protected-access + return await rebalance.perform_group_join() + + coordinator._client.api_version = (0, 10, 1) # pylint: disable=protected-access + error_type = Errors.NoError + + async def send(*_, **__) -> JoinGroupRequest: + resp = JoinGroupRequest.RESPONSE_TYPE( + error_code=error_type.errno, + generation_id=-1, # generation_id + group_protocol="sr", + leader_id="111", # leader_id + member_id="111", # member_id + members=[], + ) + return resp + + mocked_client.send.side_effect = send + + # Success case, joined successfully + resp = await do_rebalance() + assert resp == Assignment(member_id="sr", metadata=b"123") + assert _on_join_leader_mock.call_count == 1 + + # no exception expected, just wait + error_type = Errors.GroupLoadInProgressError + resp = await do_rebalance() + assert resp is None + assert coordinator.need_rejoin() + + error_type = Errors.InvalidGroupIdError + with pytest.raises(Errors.InvalidGroupIdError): + await do_rebalance() + assert coordinator.need_rejoin() + + # no exception expected, member_id should be reset + coordinator.member_id = "some_invalid_member_id" + error_type = Errors.UnknownMemberIdError + resp = await do_rebalance() + assert resp is None + assert coordinator.need_rejoin() + assert coordinator.member_id == JoinGroupRequest.UNKNOWN_MEMBER_ID + + error_type = Errors.UnknownError() + with pytest.raises(Errors.KafkaError): # Masked as unknown error + await do_rebalance() + + # no exception expected, coordinator_id should be reset + error_type = Errors.GroupCoordinatorNotAvailableError + resp = await do_rebalance() + assert resp is None + assert coordinator.need_rejoin() + assert coordinator.coordinator_id is None + coordinator.coordinator_id = 15 + coordinator._coordinator_dead_fut = create_future() # pylint: disable=protected-access + + async def _on_join_leader(_) -> bytes | None: + return None + + # Sync group fails case + error_type = Errors.NoError + _on_join_leader_mock.side_effect = _on_join_leader + resp = await do_rebalance() + assert coordinator.coordinator_id == 15 + assert resp is None + assert _on_join_leader_mock.call_count == 2 + + # `_send_req` itself raises an error + mocked_client.send.side_effect = Errors.GroupCoordinatorNotAvailableError() + resp = await do_rebalance() + assert resp is None + assert coordinator.need_rejoin() + assert coordinator.coordinator_id is None + + +async def test_failed_sync_group(mocked_client: AIOKafkaClient, coordinator: SchemaCoordinator) -> None: + coordinator.start() + assert coordinator._coordination_task is not None # pylint: disable=protected-access + assert coordinator._client is not None # pylint: disable=protected-access + + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + coordinator.coordinator_id = 15 + + async def do_sync_group() -> bytes | None: + rebalance = SchemaCoordinatorGroupRebalance( + coordinator, + coordinator.group_id, + coordinator.coordinator_id, + coordinator._session_timeout_ms, # pylint: disable=protected-access + coordinator._retry_backoff_ms, # pylint: disable=protected-access + ) + await rebalance._on_join_follower() # pylint: disable=protected-access + + coordinator._client.api_version = (0, 10, 1) # pylint: disable=protected-access + error_type = None + + async def send(*_, **__) -> SyncGroupResponse: + resp = SyncGroupResponse(error_code=error_type.errno, member_assignment=b"123") + return resp + + mocked_client.send.side_effect = send + + coordinator.member_id = "some_invalid_member_id" + + error_type = Errors.RebalanceInProgressError + await do_sync_group() + assert coordinator.member_id == "some_invalid_member_id" + assert coordinator.need_rejoin() + + error_type = Errors.UnknownMemberIdError + await do_sync_group() + assert coordinator.member_id == UNKNOWN_MEMBER_ID + assert coordinator.need_rejoin() + + error_type = Errors.NotCoordinatorForGroupError + await do_sync_group() + assert coordinator.coordinator_id is None + assert coordinator.need_rejoin() + + coordinator.coordinator_id = 15 + coordinator._coordinator_dead_fut = create_future() # pylint: disable=protected-access + + error_type = Errors.UnknownError() + with pytest.raises(Errors.KafkaError): # Masked as some KafkaError + await do_sync_group() + assert coordinator.need_rejoin() + + error_type = Errors.GroupAuthorizationFailedError() + with pytest.raises(Errors.GroupAuthorizationFailedError) as exception_info: + await do_sync_group() + assert coordinator.need_rejoin() + assert exception_info.value.args[0] == coordinator.group_id + + # If ``send()`` itself raises an error + mocked_client.send.side_effect = Errors.GroupCoordinatorNotAvailableError() + await do_sync_group() + assert coordinator.need_rejoin() + assert coordinator.coordinator_id is None + + +async def test_generation_change_during_rejoin_sync() -> None: + client = mock.MagicMock(spec=AIOKafkaClient) + coordinator = mock.MagicMock(spec=SchemaCoordinator) + member_assignment = mock.Mock(spec=Assignment) + + coordinator._client = client # pylint: disable=protected-access + coordinator._rebalance_timeout_ms = 1000 # pylint: disable=protected-access + coordinator._send_req = mock.MagicMock() # pylint: disable=protected-access + + rebalance = SchemaCoordinatorGroupRebalance( + coordinator, + "group_id", + 1, + 1000, + 1000, + ) + + async def send_req(_) -> Response: + await asyncio.sleep(0.1) + resp = mock.MagicMock(spec=Response) + resp.member_assignment = member_assignment + resp.error_code = 0 + return resp + + coordinator.send_req.side_effect = send_req + + request = mock.MagicMock() + coordinator.generation = 1 + coordinator.member_id = "member_id" + sync_req = asyncio.ensure_future(rebalance._send_sync_group_request(request)) # pylint: disable=protected-access + await asyncio.sleep(0.05) + + coordinator.generation = -1 + coordinator.member_id = "member_id-changed" + + assert await sync_req == member_assignment + + # make sure values are set correctly + assert coordinator.generation == 1 + assert coordinator.member_id == "member_id" + + +async def test_coordinator_metadata_update(client: AIOKafkaClient) -> None: + # Race condition where client.set_topics start MetadataUpdate, but it + # fails to arrive before leader performed assignment + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + retry_backoff_ms=10, + ) + coordinator.start() + + _metadata_update = client._metadata_update # pylint: disable=protected-access + with mock.patch.object(client, "_metadata_update") as mocked: + + async def _new(*args, **kw) -> bool: + # Just make metadata updates a bit more slow for test + # robustness + await asyncio.sleep(0.5) + res = await _metadata_update(*args, **kw) + return res + + mocked.side_effect = _new + assignment_metadata = json_encode( + { + "version": 2, + "karapace_version": __version__, + "host": "test-host", + "port": 10101, + "scheme": "https", + "master_eligibility": True, + }, + binary=True, + compact=True, + ) + assert coordinator.get_metadata_snapshot() == [Assignment(member_id="v0", metadata=assignment_metadata)] + finally: + await coordinator.close() + + +async def test_coordinator__send_req(client: AIOKafkaClient) -> None: + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + session_timeout_ms=6000, + heartbeat_interval_ms=1000, + ) + coordinator.start() + # Any request will do + # We did not call ensure_coordinator_known yet + request = LeaveGroupRequest() + with pytest.raises(Errors.GroupCoordinatorNotAvailableError): + await coordinator.send_req(request) + + await coordinator.ensure_coordinator_known() + assert coordinator.coordinator_id is not None + + with mock.patch.object(client, "send") as mocked: + + async def mock_send(*_, **__) -> None: + raise Errors.KafkaError("Some unexpected error") + + mocked.side_effect = mock_send + + # _send_req should mark coordinator dead on errors + with pytest.raises(Errors.KafkaError): + await coordinator.send_req(request) + assert coordinator.coordinator_id is None + finally: + await coordinator.close() + + +async def test_coordinator_ensure_coordinator_known(client: AIOKafkaClient) -> None: + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + heartbeat_interval_ms=20000, + ) + coordinator.start() + assert coordinator._coordination_task is not None # pylint: disable=protected-access + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + + def force_metadata_update() -> asyncio.Future: + fut = create_future() + fut.set_result(True) + return fut + + client.ready = mock.Mock() + client.force_metadata_update = mock.Mock() + client.force_metadata_update.side_effect = force_metadata_update + + async def ready(node_id: int, group: ConnectionGroup) -> bool: # pylint: disable=unused-argument + if node_id == 0: + return True + return False + + client.ready.side_effect = ready + client.coordinator_lookup = mock.Mock() + + coordinator_lookup: list | None = None + + async def _do_coordinator_lookup(_: CoordinationType, __: str) -> int: + assert coordinator_lookup is not None + node_id = coordinator_lookup.pop() + if isinstance(node_id, Exception): + raise node_id + return node_id + + client.coordinator_lookup.side_effect = _do_coordinator_lookup + + # CASE: the lookup returns a broken node, that can't be connected + # to. Ensure should wait until coordinator lookup finds the correct + # node. + coordinator.coordinator_dead() + coordinator_lookup = [0, 1, 1] + await coordinator.ensure_coordinator_known() + assert coordinator.coordinator_id == 0 + assert client.force_metadata_update.call_count == 0 + + # CASE: lookup fails with error first time. We update metadata and try + # again + coordinator.coordinator_dead() + coordinator_lookup = [0, Errors.UnknownTopicOrPartitionError()] + await coordinator.ensure_coordinator_known() + assert client.force_metadata_update.call_count == 1 + + # CASE: Special case for group authorization + coordinator.coordinator_dead() + coordinator_lookup = [0, Errors.GroupAuthorizationFailedError()] + with pytest.raises(Errors.GroupAuthorizationFailedError) as exception_info: + await coordinator.ensure_coordinator_known() + assert exception_info.value.args[0] == coordinator.group_id + + # CASE: unretriable errors should be reraised to higher level + coordinator.coordinator_dead() + coordinator_lookup = [0, Errors.UnknownError()] + with pytest.raises(Errors.UnknownError): + await coordinator.ensure_coordinator_known() + finally: + await coordinator.close() + + +async def test_coordinator__do_heartbeat(client: AIOKafkaClient) -> None: + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + heartbeat_interval_ms=20000, + ) + coordinator.start() + assert coordinator._coordination_task is not None # pylint: disable=protected-access + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + + _orig_send_req = coordinator.send_req + coordinator.send_req = mocked = mock.Mock() + heartbeat_error = None + send_req_error = None + + async def mock_send_req(request): + if send_req_error is not None: + raise send_req_error + if request.API_KEY == HeartbeatRequest.API_KEY: + if isinstance(heartbeat_error, list): + error_code = heartbeat_error.pop(0).errno + else: + error_code = heartbeat_error.errno + return HeartbeatRequest.RESPONSE_TYPE(error_code) + return await _orig_send_req(request) + + mocked.side_effect = mock_send_req + + coordinator.coordinator_id = 15 + heartbeat_error = Errors.GroupCoordinatorNotAvailableError() + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert not success + assert coordinator.coordinator_id is None + + coordinator.rejoin_needed_fut = create_future() + heartbeat_error = Errors.RebalanceInProgressError() + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert success + assert coordinator.rejoin_needed_fut.done() + + coordinator.member_id = "some_member" + coordinator.rejoin_needed_fut = create_future() + heartbeat_error = Errors.IllegalGenerationError() + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert not success + assert coordinator.rejoin_needed_fut.done() + assert coordinator.member_id == UNKNOWN_MEMBER_ID + + coordinator.member_id = "some_member" + coordinator.rejoin_needed_fut = create_future() + heartbeat_error = Errors.UnknownMemberIdError() + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert not success + assert coordinator.rejoin_needed_fut.done() + assert coordinator.member_id == UNKNOWN_MEMBER_ID + + heartbeat_error = Errors.GroupAuthorizationFailedError() + with pytest.raises(Errors.GroupAuthorizationFailedError) as exception_info: + await coordinator._do_heartbeat() # pylint: disable=protected-access + assert exception_info.value.args[0] == coordinator.group_id + + heartbeat_error = Errors.UnknownError() + with pytest.raises(Errors.KafkaError): + await coordinator._do_heartbeat() # pylint: disable=protected-access + + heartbeat_error = None + send_req_error = Errors.RequestTimedOutError() + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert not success + + heartbeat_error = Errors.NoError() + send_req_error = None + success = await coordinator._do_heartbeat() # pylint: disable=protected-access + assert success + finally: + await coordinator.close() + + +async def test_coordinator__heartbeat_routine(client: AIOKafkaClient) -> None: + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + heartbeat_interval_ms=100, + session_timeout_ms=300, + retry_backoff_ms=50, + ) + coordinator.start() + assert coordinator._coordination_task is not None # pylint: disable=protected-access + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + + mocked = mock.Mock() + coordinator._do_heartbeat = mocked # pylint: disable=protected-access + coordinator.coordinator_id = 15 + coordinator.member_id = 17 + coordinator.generation = 0 + success = False + + async def _do_heartbeat() -> bool: + if isinstance(success, list): + return success.pop(0) + return success + + mocked.side_effect = _do_heartbeat + + async def ensure_coordinator_known() -> None: + return None + + coordinator.ensure_coordinator_known = mock.Mock() + coordinator.ensure_coordinator_known.side_effect = ensure_coordinator_known + + routine = create_task(coordinator._heartbeat_routine()) # pylint: disable=protected-access + + # CASE: simple heartbeat + success = True + await asyncio.sleep(0.13) + assert not routine.done() + assert mocked.call_count == 1 + + # CASE: 2 heartbeat fail + success = False + await asyncio.sleep(0.15) + assert not routine.done() + # We did 2 heartbeats as we waited only retry_backoff_ms between them + assert mocked.call_count == 3 + + # CASE: session_timeout_ms elapsed without heartbeat + await asyncio.sleep(0.10) + assert mocked.call_count == 5 + assert coordinator.coordinator_id == 15 + + # last heartbeat try + await asyncio.sleep(0.05) + assert mocked.call_count == 6 + assert coordinator.coordinator_id is None + finally: + routine.cancel() + await coordinator.close() + + +async def test_coordinator__coordination_routine(client: AIOKafkaClient) -> None: + try: + coordinator = SchemaCoordinator( + client, + "test-host", + 10101, + "https", + True, + "highest", + "test-group", + heartbeat_interval_ms=20000, + retry_backoff_ms=50, + ) + + def start_coordination(): + if coordinator._coordination_task: # pylint: disable=protected-access + coordinator._coordination_task.cancel() # pylint: disable=protected-access + coordinator._coordination_task = task = create_task( # pylint: disable=protected-access + coordinator._coordination_routine() # pylint: disable=protected-access + ) + return task + + async def stop_coordination(): + if coordinator._coordination_task is not None: # pylint: disable=protected-access + # disable for test + coordinator._coordination_task.cancel() # pylint: disable=protected-access + with contextlib.suppress(asyncio.CancelledError): + await coordinator._coordination_task # pylint: disable=protected-access + coordinator._coordination_task = create_task(asyncio.sleep(0.1)) # pylint: disable=protected-access + + await stop_coordination() + + async def ensure_coordinator_known(): + return None + + coordinator.ensure_coordinator_known = coord_mock = mock.Mock() + coord_mock.side_effect = ensure_coordinator_known + + coordinator._do_rejoin_group = rejoin_mock = mock.Mock() # pylint: disable=protected-access + rejoin_ok = True + + async def do_rejoin(): + if rejoin_ok: + coordinator.rejoin_needed_fut = create_future() + return True + await asyncio.sleep(0.1) + return False + + rejoin_mock.side_effect = do_rejoin + + coordinator._start_heartbeat_task = mock.Mock() # pylint: disable=protected-access + client.force_metadata_update = metadata_mock = mock.Mock() + done_fut = create_future() + done_fut.set_result(None) + metadata_mock.side_effect = lambda: done_fut + + coordinator.rejoin_needed_fut = create_future() + coordinator._closing = create_future() # pylint: disable=protected-access + coordinator._coordinator_dead_fut = create_future() # pylint: disable=protected-access + + # CASE: coordination should coordinate and task get done + # present + task = start_coordination() + await asyncio.sleep(0.01) + assert not task.done() + assert coord_mock.call_count == 1 + + # CASE: with no assignment changes routine should not react to request_rejoin + coordinator.request_rejoin() + await asyncio.sleep(0.01) + assert not task.done() + assert coord_mock.call_count == 2 + assert rejoin_mock.call_count == 1 + + # CASE: rejoin fail + rejoin_ok = False + coordinator.request_rejoin() + await asyncio.sleep(0.01) + assert not task.done() + assert coord_mock.call_count == 3 + assert rejoin_mock.call_count == 2 + + # CASE: After rejoin fail, retry + rejoin_ok = True + coordinator.request_rejoin() + await asyncio.sleep(0.5) + assert not task.done() + assert coord_mock.call_count == 4 + assert rejoin_mock.call_count == 3 + + # Close + await coordinator.close() + assert task.done() + finally: + await coordinator.close() diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index ea39e663a..738f76498 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -6,11 +6,11 @@ from dataclasses import dataclass from karapace.config import set_config_defaults from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter, KeyMode -from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode @@ -20,11 +20,11 @@ from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name, new_topic from typing import List, Tuple +import asyncio import pytest -import time -def _wait_until_reader_is_ready_and_master( +async def _wait_until_reader_is_ready_and_master( master_coordinator: MasterCoordinator, reader: KafkaSchemaReader, ) -> None: @@ -35,16 +35,16 @@ def _wait_until_reader_is_ready_and_master( """ # Caught up with the topic while not reader.ready: - time.sleep(0.1) + await asyncio.sleep(0.1) # Won master election are_we_master = False while not are_we_master: are_we_master, _ = master_coordinator.get_master_info() - time.sleep(0.1) + await asyncio.sleep(0.1) -def test_regression_soft_delete_schemas_should_be_registered( +async def test_regression_soft_delete_schemas_should_be_registered( kafka_servers: KafkaServers, producer: KafkaProducer, ) -> None: @@ -71,80 +71,83 @@ def test_regression_soft_delete_schemas_should_be_registered( } ) master_coordinator = MasterCoordinator(config=config) - master_coordinator.start() - database = InMemoryDatabase() - offset_watcher = OffsetWatcher() - schema_reader = KafkaSchemaReader( - config=config, - offset_watcher=offset_watcher, - key_formatter=KeyFormatter(), - master_coordinator=master_coordinator, - database=database, - ) - schema_reader.start() + try: + await master_coordinator.start() + database = InMemoryDatabase() + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=KeyFormatter(), + master_coordinator=master_coordinator, + database=database, + ) + schema_reader.start() - with closing(master_coordinator), closing(schema_reader): - _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) + with closing(schema_reader): + await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) - # Send an initial schema to initialize the subject in the reader, this is preparing the state - key = { - "subject": subject, - "version": 1, - "magic": 1, - "keytype": "SCHEMA", - } - value = { - "deleted": False, - "id": 1, - "subject": subject, - "version": 1, - "schema": json_encode(TRUE_SCHEMA.schema), - } - future = producer.send( - topic_name, - key=json_encode(key, binary=True), - value=json_encode(value, binary=True), - ) - producer.flush() - msg = future.result() + # Send an initial schema to initialize the subject in the reader, this is preparing the state + key = { + "subject": subject, + "version": 1, + "magic": 1, + "keytype": "SCHEMA", + } + value = { + "deleted": False, + "id": 1, + "subject": subject, + "version": 1, + "schema": json_encode(TRUE_SCHEMA.schema), + } + future = producer.send( + topic_name, + key=json_encode(key, binary=True), + value=json_encode(value, binary=True), + ) + producer.flush() + msg = future.result() - schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access + schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access - schemas = database.find_subject_schemas(subject=subject, include_deleted=True) - assert len(schemas) == 1, "Deleted schemas must have been registered" + schemas = database.find_subject_schemas(subject=subject, include_deleted=True) + assert len(schemas) == 1, "Deleted schemas must have been registered" - # Produce a soft deleted schema, this is the regression test - key = { - "subject": subject, - "version": 2, - "magic": 1, - "keytype": "SCHEMA", - } - test_global_schema_id = 2 - value = { - "deleted": True, - "id": test_global_schema_id, - "subject": subject, - "version": 2, - "schema": json_encode(FALSE_SCHEMA.schema), - } - future = producer.send( - topic_name, - key=json_encode(key, binary=True), - value=json_encode(value, binary=True), - ) - producer.flush() - msg = future.result() + # Produce a soft deleted schema, this is the regression test + key = { + "subject": subject, + "version": 2, + "magic": 1, + "keytype": "SCHEMA", + } + test_global_schema_id = 2 + value = { + "deleted": True, + "id": test_global_schema_id, + "subject": subject, + "version": 2, + "schema": json_encode(FALSE_SCHEMA.schema), + } + future = producer.send( + topic_name, + key=json_encode(key, binary=True), + value=json_encode(value, binary=True), + ) + producer.flush() + msg = future.result() - seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access - assert seen is True - assert database.global_schema_id == test_global_schema_id + seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access + assert seen is True + assert database.global_schema_id == test_global_schema_id - schemas = database.find_subject_schemas(subject=subject, include_deleted=True) - assert len(schemas) == 2, "Deleted schemas must have been registered" + schemas = database.find_subject_schemas(subject=subject, include_deleted=True) + assert len(schemas) == 2, "Deleted schemas must have been registered" + finally: + await master_coordinator.close() -def test_regression_config_for_inexisting_object_should_not_throw( +async def test_regression_config_for_inexisting_object_should_not_throw( kafka_servers: KafkaServers, producer: KafkaProducer, ) -> None: @@ -160,40 +163,43 @@ def test_regression_config_for_inexisting_object_should_not_throw( } ) master_coordinator = MasterCoordinator(config=config) - master_coordinator.start() - database = InMemoryDatabase() - offset_watcher = OffsetWatcher() - schema_reader = KafkaSchemaReader( - config=config, - offset_watcher=offset_watcher, - key_formatter=KeyFormatter(), - master_coordinator=master_coordinator, - database=database, - ) - schema_reader.start() + try: + await master_coordinator.start() + database = InMemoryDatabase() + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=KeyFormatter(), + master_coordinator=master_coordinator, + database=database, + ) + schema_reader.start() - with closing(master_coordinator), closing(schema_reader): - _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) + with closing(schema_reader): + await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) - # Send an initial schema to initialize the subject in the reader, this is preparing the state - key = { - "subject": subject, - "magic": 0, - "keytype": "CONFIG", - } - value = "" # Delete the config + # Send an initial schema to initialize the subject in the reader, this is preparing the state + key = { + "subject": subject, + "magic": 0, + "keytype": "CONFIG", + } + value = "" # Delete the config - future = producer.send( - DEFAULT_SCHEMA_TOPIC, - key=json_encode(key, binary=True), - value=json_encode(value, binary=True), - ) - producer.flush() - msg = future.result() + future = producer.send( + DEFAULT_SCHEMA_TOPIC, + key=json_encode(key, binary=True), + value=json_encode(value, binary=True), + ) + producer.flush() + msg = future.result() - seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access - assert seen is True - assert database.find_subject(subject=subject) is not None, "The above message should be handled gracefully" + seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access + assert seen is True + assert database.find_subject(subject=subject) is not None, "The above message should be handled gracefully" + finally: + await master_coordinator.close() @dataclass @@ -235,7 +241,7 @@ class DetectKeyFormatCase(BaseTestCase): ), ], ) -def test_key_format_detection( +async def test_key_format_detection( testcase: DetectKeyFormatCase, kafka_servers: KafkaServers, producer: KafkaProducer, @@ -261,20 +267,23 @@ def test_key_format_detection( } ) master_coordinator = MasterCoordinator(config=config) - master_coordinator.start() - key_formatter = KeyFormatter() - database = InMemoryDatabase() - offset_watcher = OffsetWatcher() - schema_reader = KafkaSchemaReader( - config=config, - offset_watcher=offset_watcher, - key_formatter=key_formatter, - master_coordinator=master_coordinator, - database=database, - ) - schema_reader.start() + try: + await master_coordinator.start() + key_formatter = KeyFormatter() + database = InMemoryDatabase() + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=key_formatter, + master_coordinator=master_coordinator, + database=database, + ) + schema_reader.start() - with closing(master_coordinator), closing(schema_reader): - _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) + with closing(schema_reader): + await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) - assert key_formatter.get_keymode() == testcase.expected + assert key_formatter.get_keymode() == testcase.expected + finally: + await master_coordinator.close() diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 392662710..5b8e11c45 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -5,24 +5,24 @@ from aiohttp.test_utils import TestClient, TestServer from karapace.config import DEFAULTS, set_config_defaults from karapace.rapu import HTTPResponse +from karapace.schema_reader import KafkaSchemaReader +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.schema_registry_apis import KarapaceSchemaRegistryController -from unittest.mock import ANY, Mock, patch, PropertyMock +from unittest.mock import ANY, AsyncMock, Mock, patch, PropertyMock import asyncio async def test_forward_when_not_ready(): with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: - schema_reader_mock = Mock() + schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) - schema_registry = Mock() + schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) type(schema_reader_mock).ready = ready_property_mock schema_registry.schema_reader = schema_reader_mock schema_registry_class.return_value = schema_registry - get_master_future = asyncio.Future() - get_master_future.set_result((False, "http://primary-url")) - schema_registry.get_master.return_value = get_master_future + schema_registry.get_master.return_value = (False, "http://primary-url") close_future_result = asyncio.Future() close_future_result.set_result(True)