From 98a9b1aa69f488e2d784b9f9814c303c6d999ac5 Mon Sep 17 00:00:00 2001 From: arik Date: Fri, 31 May 2024 13:42:06 +0300 Subject: [PATCH] solve race condition between job and pod creation by adding retries (#1446) * solve race condition between job and pod creation by adding retries * pull default config from env vars --- src/robusta/core/model/env_vars.py | 7 +++++-- .../core/sinks/robusta/dal/supabase_dal.py | 2 +- .../integrations/kubernetes/custom_models.py | 20 +++++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/robusta/core/model/env_vars.py b/src/robusta/core/model/env_vars.py index 8b957a58d..c44100bf1 100644 --- a/src/robusta/core/model/env_vars.py +++ b/src/robusta/core/model/env_vars.py @@ -115,7 +115,10 @@ def load_bool(env_var, default: bool): # lowered case k8s kinds in a json array string. "[\"configmap\", \"secret\"]" RESOURCE_YAML_BLOCK_LIST = json.loads(os.environ.get("RESOURCE_YAML_BLOCK_LIST", "[]")) -NAMESPACE_DATA_TTL = int(os.environ.get("NAMESPACE_DATA_TTL", 30*60)) # in seconds +NAMESPACE_DATA_TTL = int(os.environ.get("NAMESPACE_DATA_TTL", 30 * 60)) # in seconds -PROCESSED_ALERTS_CACHE_TTL = int(os.environ.get("PROCESSED_ALERT_CACHE_TTL", 2*3600)) +PROCESSED_ALERTS_CACHE_TTL = int(os.environ.get("PROCESSED_ALERT_CACHE_TTL", 2 * 3600)) PROCESSED_ALERTS_CACHE_MAX_SIZE = int(os.environ.get("PROCESSED_ALERTS_CACHE_MAX_SIZE", 100_000)) + +POD_WAIT_RETRIES = int(os.environ.get("POD_WAIT_RETRIES", 10)) +POD_WAIT_RETRIES_SECONDS = int(os.environ.get("POD_WAIT_RETRIES_SECONDS", 5)) diff --git a/src/robusta/core/sinks/robusta/dal/supabase_dal.py b/src/robusta/core/sinks/robusta/dal/supabase_dal.py index f8ea6af0b..654e63504 100644 --- a/src/robusta/core/sinks/robusta/dal/supabase_dal.py +++ b/src/robusta/core/sinks/robusta/dal/supabase_dal.py @@ -566,7 +566,7 @@ def publish_cluster_nodes(self, node_count: int, pod_count: int): try: self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute() except Exception as e: - logging.error(f"Failed to publish node count {data} error: {e}") + logging.exception(f"Failed to publish node count {data} error: {e}") logging.debug(f"cluster nodes: {UPDATE_CLUSTER_NODE_COUNT} => {data}") diff --git a/src/robusta/integrations/kubernetes/custom_models.py b/src/robusta/integrations/kubernetes/custom_models.py index a01ffee63..e65c46e5e 100644 --- a/src/robusta/integrations/kubernetes/custom_models.py +++ b/src/robusta/integrations/kubernetes/custom_models.py @@ -13,7 +13,13 @@ from kubernetes.client import ApiException from pydantic import BaseModel -from robusta.core.model.env_vars import IMAGE_REGISTRY, INSTALLATION_NAMESPACE, RUNNER_SERVICE_ACCOUNT +from robusta.core.model.env_vars import ( + IMAGE_REGISTRY, + INSTALLATION_NAMESPACE, + POD_WAIT_RETRIES, + POD_WAIT_RETRIES_SECONDS, + RUNNER_SERVICE_ACCOUNT, +) from robusta.integrations.kubernetes.api_client_utils import ( SUCCEEDED_STATE, exec_shell_command, @@ -451,13 +457,19 @@ def get_pods(self) -> List[RobustaPod]: # we serialize and then deserialize to work around https://github.com/haxsaw/hikaru/issues/15 return [hikaru.from_dict(pod.to_dict(), cls=RobustaPod) for pod in pods.items] - def get_single_pod(self) -> RobustaPod: + def get_single_pod(self, retries: int = POD_WAIT_RETRIES, wait: int = POD_WAIT_RETRIES_SECONDS) -> RobustaPod: """ like get_pods() but verifies that only one pod is associated with the job and returns that pod + if no pods, retry X times with Y seconds wait """ pods = self.get_pods() + while retries > 0 and len(pods) == 0: + time.sleep(wait) + pods = self.get_pods() + retries -= 1 + if len(pods) != 1: - raise Exception(f"got more pods than expected for job {self.metadata.name}: {pods}") + raise Exception(f"got {len(pods)} pods. expected 1 for job {self.metadata.name}: {pods}") return pods[0] def create_job_owned_secret(self, job_secret: JobSecret): @@ -478,7 +490,7 @@ def create_job_owned_secret(self, job_secret: JobSecret): metadata=ObjectMeta(name=job_secret.name, ownerReferences=[robusta_owner_reference]), data=job_secret.data ) try: - return secret.createNamespacedSecret(job_pod.metadata.namespace).obj + secret.createNamespacedSecret(job_pod.metadata.namespace).obj except Exception as e: logging.error(f"Failed to create secret {job_secret.name}", exc_info=True) raise e