Skip to content

Commit

Permalink
fix cache: the cache each time was replacing the original metadata ev…
Browse files Browse the repository at this point in the history
…en if the metadata request was just for a subset of topics, this pr tries to address this by updating only partially the metadata object
  • Loading branch information
eliax1996 committed Jun 4, 2024
1 parent 3f899ac commit 791358a
Showing 1 changed file with 49 additions and 20 deletions.
69 changes: 49 additions & 20 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ def __init__(
self.serializer = serializer
self._cluster_metadata = None
self._cluster_metadata_complete = False
self._metadata_birth = None
# birth of all the metadata (when the request was requiring all the metadata available in the cluster)
self._global_metadata_birth = None
self._cluster_metadata_topic_birth: dict[str, float] = {}
self.metadata_max_age = self.config["admin_metadata_max_age"]
self.admin_client = None
self.admin_lock = asyncio.Lock()
Expand Down Expand Up @@ -607,28 +609,55 @@ async def get_topic_config(self, topic: str) -> dict:
async with self.admin_lock:
return self.admin_client.get_topic_config(topic)

async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict:
async with self.admin_lock:
if self._metadata_birth is None or time.monotonic() - self._metadata_birth > self.metadata_max_age:
self._cluster_metadata = None

if self._cluster_metadata:
# Return from metadata only if all queried topics have cached metadata
if topics is None:
if self._cluster_metadata_complete:
return self._cluster_metadata
elif all(topic in self._cluster_metadata["topics"] for topic in topics):
return {
**self._cluster_metadata,
"topics": {topic: self._cluster_metadata["topics"][topic] for topic in topics},
}
def is_global_metadata_old(self) -> bool:
return self._global_metadata_birth is None or (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age

def is_metadata_of_topics_old(self, topics: list[str]) -> bool:
# Return from metadata only if all queried topics have cached metadata
are_all_topic_queried_at_least_once = all(topic in self._cluster_metadata_topic_birth for topic in topics)
oldest_requested_topic_udpate_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics)
return are_all_topic_queried_at_least_once and (time.monotonic() - oldest_requested_topic_udpate_timestamp) > self.metadata_max_age

async def _update_all_metadata(self) -> dict:
if not self.is_global_metadata_old() and self._cluster_metadata_complete:
return self._cluster_metadata

metadata_birth = time.monotonic()
metadata = self.admin_client.cluster_metadata(None)
for topic in metadata["topics"]:
self._cluster_metadata_topic_birth[topic] = metadata_birth

self._global_metadata_birth = metadata_birth
self._cluster_metadata = metadata
self._cluster_metadata_complete = True

async def _update_metadata_for_topics(self, topics: list[str]) -> dict:
if self.is_metadata_of_topics_old(topics):
return {
**self._cluster_metadata,
"topics": {topic: self._cluster_metadata["topics"][topic] for topic in topics},
}

metadata_birth = time.monotonic()
metadata = self.admin_client.cluster_metadata(topics)
for topic in metadata["topics"]:
self._cluster_metadata_topic_birth[topic] = metadata_birth

for topic in metadata["topics"]:
self._cluster_metadata[topic] = metadata["topics"][topic]

self._cluster_metadata = metadata
self._cluster_metadata_complete = False


async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict:
async with self.admin_lock:
try:
metadata_birth = time.monotonic()
metadata = self.admin_client.cluster_metadata(topics)
self._metadata_birth = metadata_birth
self._cluster_metadata = metadata
self._cluster_metadata_complete = topics is None
if topics is None:
metadata = await self._update_all_metadata()
else:
metadata = await self._update_metadata_for_topics(topics)
except KafkaException:
log.warning("Could not refresh cluster metadata")
KafkaRest.r(
Expand Down

0 comments on commit 791358a

Please sign in to comment.