diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 5daf09885..a150778ee 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -451,10 +451,10 @@ def __init__( self.config = config self.kafka_timeout = kafka_timeout self.serializer = serializer - self._cluster_metadata = None + self._cluster_metadata: _ClusterMetadata = self._empty_cluster_metadata_cache() self._cluster_metadata_complete = False # birth of all the metadata (when the request was requiring all the metadata available in the cluster) - self._global_metadata_birth: float | None = None + self._global_metadata_birth: float = 0.0 # set to this value will always require a refresh at the first call. self._cluster_metadata_topic_birth: dict[str, float] = {} self.metadata_max_age = self.config["admin_metadata_max_age"] self.admin_client = None @@ -634,25 +634,19 @@ async def get_topic_config(self, topic: str) -> dict: return self.admin_client.get_topic_config(topic) def is_global_metadata_old(self) -> bool: - return ( - self._global_metadata_birth is None or (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age - ) + return (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age def is_metadata_of_topics_old(self, topics: list[str]) -> bool: # Return from metadata only if all queried topics have cached metadata - - if self._cluster_metadata_topic_birth is None: - return True - are_all_topic_queried_at_least_once = all(topic in self._cluster_metadata_topic_birth for topic in topics) if not are_all_topic_queried_at_least_once: return True - oldest_requested_topic_udpate_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics) + oldest_requested_topic_update_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics) return ( are_all_topic_queried_at_least_once - and (time.monotonic() - oldest_requested_topic_udpate_timestamp) > self.metadata_max_age + and (time.monotonic() - oldest_requested_topic_update_timestamp) > self.metadata_max_age ) def _update_all_metadata(self) -> _ClusterMetadata: @@ -685,17 +679,43 @@ def _update_metadata_for_topics(self, topics: list[str]) -> _ClusterMetadata: if self._cluster_metadata is None: self._cluster_metadata = self._empty_cluster_metadata_cache() + # we need to refresh if at least 1 broker isn't present in the current metadata + need_refresh = not all(broker in self._cluster_metadata["brokers"] for broker in metadata["brokers"]) + for topic in metadata["topics"]: + # or if there is a new topic + need_refresh = ( + need_refresh + or (topic not in self._cluster_metadata["topics"]) + # or if a topic has new/different data. + # nb: equality its valid since the _ClusterMetadata object its structurally + # composed only of primitives lists and dicts + or (self._cluster_metadata["topics"][topic] != metadata["topics"][topic]) + ) self._cluster_metadata_topic_birth[topic] = metadata_birth self._cluster_metadata["topics"][topic] = metadata["topics"][topic] - self._cluster_metadata_complete = False + if need_refresh: + # we don't need to reason about expiration time since at each request + # for the global metadata it's checked before performing the request, + # so we need to guard only for new missing pieces of info + self._cluster_metadata_complete = False + else: + # for malicious actors we may also cache that a certain topic (that do not exist) it has been queried + # and for a while the reply isn't present. not implementing this now since its an additional complexity + # that may be unrequired. Leaving a comment and a warning there, if its present often in the logs the feature + # may be needed. + log.warning( + "Requested metadata for topics %s but the reply didn't triggered a cache invalidation. " + "Data not present on server side", + topics, + ) return metadata async def cluster_metadata(self, topics: list[str] | None = None) -> _ClusterMetadata: async with self.admin_lock: try: - if topics is None: + if topics is None or len(topics) == 0: metadata = self._update_all_metadata() else: metadata = self._update_metadata_for_topics(topics) diff --git a/tests/unit/kafka_rest_apis/test_rest_proxy_cache.py b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py similarity index 64% rename from tests/unit/kafka_rest_apis/test_rest_proxy_cache.py rename to tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py index f49cfbf63..b47fb5e02 100644 --- a/tests/unit/kafka_rest_apis/test_rest_proxy_cache.py +++ b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py @@ -28,15 +28,70 @@ def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy: "partitions": [ { "partition": 0, - "leader": 10, + "leader": 69, + "replicas": [ + {"broker": 69, "leader": True, "in_sync": True}, + {"broker": 67, "leader": False, "in_sync": True}, + ], + } + ] + } + }, + "brokers": [69, 67], +} + +TOPIC_REQUEST_WITH_CHANGED_REPLICA = { + "topics": { + "topic_a": { + "partitions": [ + { + "partition": 0, + "leader": 69, "replicas": [ - {"broker": 10, "leader": True, "in_sync": True}, + {"broker": 69, "leader": True, "in_sync": True}, + {"broker": 68, "leader": False, "in_sync": True}, ], } ] } }, - "brokers": [10], + "brokers": [69, 68], +} + + +TOPIC_REQUEST_WITH_NEW_BROKER = { + "topics": { + "topic_a": { + "partitions": [ + { + "partition": 0, + "leader": 69, + "replicas": [ + {"broker": 69, "leader": True, "in_sync": True}, + {"broker": 67, "leader": False, "in_sync": True}, + ], + } + ] + } + }, + "brokers": [69, 67, 101300], +} + +TOPIC_REQUEST_WITH_NEW_TOPIC = { + "topics": { + "mistery_topic": { + "partitions": [ + { + "partition": 0, + "leader": 68, + "replicas": [ + {"broker": 68, "leader": True, "in_sync": True}, + ], + } + ] + } + }, + "brokers": [68], } ALL_TOPIC_REQUEST = { @@ -112,6 +167,15 @@ async def test_cache_is_evicted_after_expiration_global_initially() -> None: mocked_cluster_metadata.assert_called_once_with(None) # "initially the metadata are always old" +async def test_no_topic_means_all_metadata() -> None: + proxy = user_rest_proxy() + with patch( + "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY + ) as mocked_cluster_metadata: + await proxy.cluster_metadata([]) + mocked_cluster_metadata.assert_called_once_with(None) + + async def test_cache_is_evicted_after_expiration_global() -> None: proxy = user_rest_proxy(max_age_metadata=10) proxy._global_metadata_birth = 0 @@ -251,3 +315,74 @@ async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None: assert ( mocked_cluster_metadata.call_count == 1 ), "we should call the server since the previous time of caching for the topic_a was 0" + + +async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data() -> None: + proxy = user_rest_proxy(max_age_metadata=10) + proxy._global_metadata_birth = 0 + proxy._cluster_metadata_complete = True + proxy._cluster_metadata = ALL_TOPIC_REQUEST + proxy._cluster_metadata_topic_birth = {"topic_a": 0, "topic_b": 200, "__consumer_offsets": 200} + + with patch( + "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST + ) as mocked_cluster_metadata: + with patch("time.monotonic", return_value=208): + res = await proxy.cluster_metadata(["topic_a"]) + + assert res == TOPIC_REQUEST + + assert proxy._cluster_metadata_topic_birth == {"topic_a": 208, "topic_b": 200, "__consumer_offsets": 200} + + expected_metadata = copy.deepcopy(ALL_TOPIC_REQUEST) + expected_metadata["topics"]["topic_a"] = TOPIC_REQUEST["topics"]["topic_a"] + assert proxy._cluster_metadata == expected_metadata + assert ( + proxy._cluster_metadata_complete + ), "since wasn't containing new brokers and no new topics the metadata its completed" + + assert ( + mocked_cluster_metadata.call_count == 1 + ), "we should call the server since the previous time of caching for the topic_a was 0" + + +async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data() -> None: + proxy = user_rest_proxy(max_age_metadata=10) + proxy._global_metadata_birth = 0 + proxy._cluster_metadata_complete = True + proxy._cluster_metadata = ALL_TOPIC_REQUEST + proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200} + + with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_CHANGED_REPLICA): + with patch("time.monotonic", return_value=500): + await proxy.cluster_metadata(["topic_a"]) + + assert not proxy._cluster_metadata_complete, "new replica data incoming, should update the global metadata next!" + + +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data() -> None: + proxy = user_rest_proxy(max_age_metadata=10) + proxy._global_metadata_birth = 0 + proxy._cluster_metadata_complete = True + proxy._cluster_metadata = ALL_TOPIC_REQUEST + proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200} + + with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_NEW_TOPIC): + with patch("time.monotonic", return_value=200): + await proxy.cluster_metadata(["mistery_topic"]) + + assert not proxy._cluster_metadata_complete, "new topic data incoming, should update the global metadata next!" + + +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data() -> None: + proxy = user_rest_proxy(max_age_metadata=10) + proxy._global_metadata_birth = 0 + proxy._cluster_metadata_complete = True + proxy._cluster_metadata = ALL_TOPIC_REQUEST + proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200} + + with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_NEW_BROKER): + with patch("time.monotonic", return_value=500): + await proxy.cluster_metadata(["topic_a"]) + + assert not proxy._cluster_metadata_complete, "new broker data incoming, should update the global metadata next!"