From 791358ac83cb1fdb5f4913637362681474f7a9c3 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Tue, 4 Jun 2024 13:06:09 +0200 Subject: [PATCH] fix cache: the cache each time was replacing the original metadata even if the metadata request was just for a subset of topics, this pr tries to address this by updating only partially the metadata object --- karapace/kafka_rest_apis/__init__.py | 69 ++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 329b5361d..30814aab3 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -429,7 +429,9 @@ def __init__( self.serializer = serializer self._cluster_metadata = None self._cluster_metadata_complete = False - self._metadata_birth = None + # birth of all the metadata (when the request was requiring all the metadata available in the cluster) + self._global_metadata_birth = None + self._cluster_metadata_topic_birth: dict[str, float] = {} self.metadata_max_age = self.config["admin_metadata_max_age"] self.admin_client = None self.admin_lock = asyncio.Lock() @@ -607,28 +609,55 @@ async def get_topic_config(self, topic: str) -> dict: async with self.admin_lock: return self.admin_client.get_topic_config(topic) - async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict: - async with self.admin_lock: - if self._metadata_birth is None or time.monotonic() - self._metadata_birth > self.metadata_max_age: - self._cluster_metadata = None - if self._cluster_metadata: - # Return from metadata only if all queried topics have cached metadata - if topics is None: - if self._cluster_metadata_complete: - return self._cluster_metadata - elif all(topic in self._cluster_metadata["topics"] for topic in topics): - return { - **self._cluster_metadata, - "topics": {topic: self._cluster_metadata["topics"][topic] for topic in topics}, - } + def is_global_metadata_old(self) -> bool: + return self._global_metadata_birth is None or (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age + + def is_metadata_of_topics_old(self, topics: list[str]) -> bool: + # Return from metadata only if all queried topics have cached metadata + are_all_topic_queried_at_least_once = all(topic in self._cluster_metadata_topic_birth for topic in topics) + oldest_requested_topic_udpate_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics) + return are_all_topic_queried_at_least_once and (time.monotonic() - oldest_requested_topic_udpate_timestamp) > self.metadata_max_age + + async def _update_all_metadata(self) -> dict: + if not self.is_global_metadata_old() and self._cluster_metadata_complete: + return self._cluster_metadata + + metadata_birth = time.monotonic() + metadata = self.admin_client.cluster_metadata(None) + for topic in metadata["topics"]: + self._cluster_metadata_topic_birth[topic] = metadata_birth + + self._global_metadata_birth = metadata_birth + self._cluster_metadata = metadata + self._cluster_metadata_complete = True + + async def _update_metadata_for_topics(self, topics: list[str]) -> dict: + if self.is_metadata_of_topics_old(topics): + return { + **self._cluster_metadata, + "topics": {topic: self._cluster_metadata["topics"][topic] for topic in topics}, + } + metadata_birth = time.monotonic() + metadata = self.admin_client.cluster_metadata(topics) + for topic in metadata["topics"]: + self._cluster_metadata_topic_birth[topic] = metadata_birth + + for topic in metadata["topics"]: + self._cluster_metadata[topic] = metadata["topics"][topic] + + self._cluster_metadata = metadata + self._cluster_metadata_complete = False + + + async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict: + async with self.admin_lock: try: - metadata_birth = time.monotonic() - metadata = self.admin_client.cluster_metadata(topics) - self._metadata_birth = metadata_birth - self._cluster_metadata = metadata - self._cluster_metadata_complete = topics is None + if topics is None: + metadata = await self._update_all_metadata() + else: + metadata = await self._update_metadata_for_topics(topics) except KafkaException: log.warning("Could not refresh cluster metadata") KafkaRest.r(