diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index 691377347..ab3612c8f 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -6,7 +6,7 @@ import subprocess import json from unskript_ctl_factory import UctlLogger, ConfigParserFactory -import yaml +from concurrent.futures import ThreadPoolExecutor logger = UctlLogger('UnskriptDiagnostics') @@ -80,42 +80,42 @@ def fetch_logs(namespace, pod, output_path): if proc.returncode != 0: logger.debug(f"Error fetching previous logs for {pod}: {stderr}") +def fetch_pod_logs_for_namespace(namespace, output_path, condition='not_running'): + # logger.debug(f"Starting log fetch for namespace: {namespace} with condition: {condition}") + proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + logger.debug(f"Error fetching pods in namespace {namespace}: {stderr}") + return + + try: + pods = json.loads(stdout)['items'] + for pod in pods: + if condition == 'not_running' and (pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded"): + # logger.debug(f"Fetching logs for not running/succeeded pod: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + elif condition == 'high_restarts': + for cs in pod['status'].get('containerStatuses', []): + if cs['restartCount'] > 25: + # logger.debug(f"Fetching logs for pod with high restarts: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + except json.JSONDecodeError: + logger.debug(f"Failed to decode JSON response from kubectl get pods in namespace {namespace}: {stdout}") + def fetch_pod_logs_not_running(output_path): allowed_namespaces = get_matrix_namespaces() - for namespace in allowed_namespaces: - proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - logger.debug(f"Error fetching pods: {stderr}") - continue - - try: - pods = json.loads(stdout)['items'] - for pod in pods: - if pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded": - fetch_logs(namespace, pod['metadata']['name'], output_path) - except json.JSONDecodeError: - logger.debug(f"Failed to decode JSON response: {stdout}") + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods not running across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'not_running') def fetch_pod_logs_high_restarts(output_path): allowed_namespaces = get_matrix_namespaces() - for namespace in allowed_namespaces: - proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - logger.debug(f"Error fetching pods: {stderr}") - continue - - try: - pods = json.loads(stdout)['items'] - for pod in pods: - for cs in pod['status'].get('containerStatuses', []): - if cs['restartCount'] > 25: - fetch_logs(namespace, pod['metadata']['name'], output_path) - except json.JSONDecodeError: - logger.debug(f"Failed to decode JSON response: {stdout}") + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods with high restarts across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'high_restarts') def k8s_diagnostics(commands:list):