diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 7c7697065..e6ccbc699 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -21,8 +21,10 @@ from karapace.coordinator.schema_coordinator import Assignment, SchemaCoordinator, SchemaCoordinatorGroupRebalance from karapace.utils import json_encode from karapace.version import __version__ +from tenacity import retry, stop_after_delay, TryAgain, wait_fixed from tests.integration.utils.kafka_server import KafkaServers -from typing import AsyncGenerator, Iterator +from tests.utils import new_random_name +from typing import AsyncGenerator, Final, Iterator from unittest import mock import aiokafka.errors as Errors @@ -35,6 +37,9 @@ LOG = logging.getLogger(__name__) +RETRY_TIME: Final = 20 +RETRY_WAIT_SECONDS: Final = 0.5 + @pytest.fixture(scope="function", name="mocked_client") def fixture_mocked_aiokafka_client() -> Iterator[AIOKafkaClient]: @@ -82,12 +87,26 @@ async def get_client( await client.close() +@retry(stop=stop_after_delay(RETRY_TIME), wait=wait_fixed(RETRY_WAIT_SECONDS)) +async def wait_for_ready(coordinator: SchemaCoordinator) -> None: + if not coordinator.ready and coordinator.coordinator_id is None: + raise TryAgain() + + +@retry(stop=stop_after_delay(RETRY_TIME), wait=wait_fixed(RETRY_WAIT_SECONDS)) +async def wait_for_primary_state(coordinator: SchemaCoordinator) -> None: + if not coordinator.are_we_master: + raise TryAgain() + await asyncio.sleep(0.1) + + @pytest.mark.parametrize("primary_selection_strategy", ["highest", "lowest"]) async def test_coordinator_workflow( primary_selection_strategy: str, client: AIOKafkaClient, kafka_servers: KafkaServers, ) -> None: + group_name = new_random_name("tg-") # Check if 2 coordinators will coordinate rebalances correctly # Check if the initial group join is performed correctly with minimal # setup @@ -98,21 +117,16 @@ async def test_coordinator_workflow( "https", True, primary_selection_strategy, - "test-group", + group_name, session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, ) coordinator.start() assert coordinator.coordinator_id is None - while not coordinator.ready(): - await asyncio.sleep(0.5) - assert coordinator.coordinator_id is not None - + await wait_for_ready(coordinator) await coordinator.ensure_coordinator_known() - assert coordinator.coordinator_id is not None - - assert coordinator.are_we_master + await wait_for_primary_state(coordinator) # Check if adding an additional coordinator will rebalance correctly client2 = await _get_client(kafka_servers=kafka_servers) @@ -123,7 +137,7 @@ async def test_coordinator_workflow( "https", True, primary_selection_strategy, - "test-group", + group_name, session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, @@ -131,12 +145,8 @@ async def test_coordinator_workflow( coordinator2.start() assert coordinator2.coordinator_id is None - while not coordinator2.ready(): - await asyncio.sleep(0.5) - assert coordinator2.coordinator_id is not None - + await wait_for_ready(coordinator2) await coordinator2.ensure_coordinator_known() - assert coordinator2.coordinator_id is not None # Helper variables to distinguish the expected primary and secondary primary = coordinator2 if primary_selection_strategy == "highest" else coordinator @@ -144,16 +154,14 @@ async def test_coordinator_workflow( secondary = coordinator if primary_selection_strategy == "highest" else coordinator2 secondary_client = client if primary_selection_strategy == "highest" else client2 - assert primary.are_we_master + await wait_for_primary_state(primary) assert not secondary.are_we_master # Check is closing the primary coordinator will rebalance the secondary to change to primary await primary.close() await primary_client.close() - while not secondary.are_we_master: - await asyncio.sleep(0.5) - assert secondary.are_we_master + await wait_for_primary_state(secondary) await secondary.close() await secondary_client.close()