From 8ac726e2526e98ad43bfdc6a37c63423a6734f3f Mon Sep 17 00:00:00 2001 From: shloka-bhalgat-unskript Date: Wed, 24 Apr 2024 15:59:53 +0530 Subject: [PATCH 1/7] Change log collection for diagnostics --- unskript-ctl/diagnostics.py | 7 ++- unskript-ctl/diagnostics_worker.py | 91 ++++++++++++++---------------- 2 files changed, 47 insertions(+), 51 deletions(-) diff --git a/unskript-ctl/diagnostics.py b/unskript-ctl/diagnostics.py index f594e00c1..fdfd31312 100644 --- a/unskript-ctl/diagnostics.py +++ b/unskript-ctl/diagnostics.py @@ -106,7 +106,6 @@ def execute_diagnostics(self, diag_commands): if function: # Call the function with the commands diag_outputs[entry_function] = function(commands) - print(f"Function '{function_name}' is accessible.") else: raise ValueError(f"Function '{function_name}' not found in the global namespace.") except Exception as e: @@ -127,7 +126,7 @@ def main(self): if not diag_commands: print("Skipping Diagnostics: No diagnostic command found. You can define them in the YAML configuration file") return - + print("\nRunning Diagnostics...") diag_outputs = self.execute_diagnostics(diag_commands) if diag_outputs: @@ -144,6 +143,10 @@ def main(args): parser.add_argument("--output-dir-path", '-o', help="Path to output directory", required=True) ap = parser.parse_args(args) + print("\nFetching logs...") + fetch_pod_logs_not_running(ap.output_dir_path) + fetch_pod_logs_high_restarts(ap.output_dir_path) + diagnostics_script = DiagnosticsScript(ap) diagnostics_script.main() diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index 22a459453..4cdfdc219 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -6,11 +6,21 @@ import subprocess import json from unskript_ctl_factory import UctlLogger +import yaml logger = UctlLogger('UnskriptDiagnostics') +def append_to_yaml_file(data, file_path): + if not data: + return + try: + with open(file_path, 'a') as file: + yaml.safe_dump(data, file, default_flow_style=False, allow_unicode=True) + except Exception as e: + logger.error(f"Failed to write data to {file_path}: {e}") + def mongodb_diagnostics(commands:list): """ mongodb_diagnostics runs mongocli command with command as the parameter @@ -48,30 +58,33 @@ def mongodb_diagnostics(commands:list): logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") return command_outputs -def fetch_logs(namespace, pod, container): +def fetch_logs(namespace, pod, container, output_path): """ - Fetches logs and previous logs for a specified container in a pod. + Fetches logs and previous logs for a specified container in a pod and writes directly to a file with headers and separators. """ - outputs = [] - cmd_logs = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container] - result_logs = subprocess.run(cmd_logs, capture_output=True, text=True) - if result_logs.stderr: - outputs.append(f"Error: {result_logs.stderr.strip()}") - else: - outputs.append(result_logs.stdout.strip()) - - cmd_logs_previous = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"] - result_logs_previous = subprocess.run(cmd_logs_previous, capture_output=True, text=True) - if result_logs_previous.stderr: - outputs.append(f"Error: {result_logs_previous.stderr.strip()}") - else: - outputs.append(result_logs_previous.stdout.strip()) - - return outputs - -def fetch_pod_logs_not_running(): + logs_file_path = os.path.join(output_path, 'logs.txt') + separator = f"\n{'=' * 40}\n" + header = f"Logs for Namespace: {namespace}, Pod: {pod}, Container: {container}\n" + header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}, Container: {container}\n" + + try: + # Write header and current logs to file + with open(logs_file_path, 'a') as f: + f.write(separator + header) + subprocess.run(["kubectl", "logs", "--namespace", namespace, pod, "-c", container], + stdout=f, stderr=f, text=True, check=False) + + # Write header for previous logs and the logs themselves to file + with open(logs_file_path, 'a') as f: + f.write(separator + header_previous) + subprocess.run(["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"], + stdout=f, stderr=f, text=True, check=False) + + except Exception as e: + logger.error(f"Failed to fetch and write logs for {namespace}/{pod}/{container}: {e}") + +def fetch_pod_logs_not_running(output_path): logger.debug("\nK8s Diagnostics: Fetching logs for pods not running") - command_outputs = [] cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] result = subprocess.run(cmd, capture_output=True, text=True) pods = json.loads(result.stdout)['items'] @@ -81,18 +94,13 @@ def fetch_pod_logs_not_running(): name = pod['metadata']['name'] status = pod['status']['phase'] if status != "Running": - logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)") + # logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)") containers = [c['name'] for c in pod['spec'].get('initContainers', []) + pod['spec'].get('containers', [])] for container in containers: - logs_output = fetch_logs(namespace, name, container) - for output in logs_output: - logger.debug({f"Pod Not Running: {name}, Container: {container}": output}) - command_outputs.append({f"Pod Not Running: {name}, Container: {container}": output}) - return command_outputs + fetch_logs(namespace, name, container, output_path) -def fetch_pod_logs_high_restarts(): +def fetch_pod_logs_high_restarts(output_path): logger.debug("\nK8s Diagnostics: Fetching logs for pods with high restarts") - command_outputs = [] cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] result = subprocess.run(cmd, capture_output=True, text=True) pods = json.loads(result.stdout)['items'] @@ -101,17 +109,11 @@ def fetch_pod_logs_high_restarts(): namespace = pod['metadata']['namespace'] name = pod['metadata']['name'] pod_status = pod['status'].get('containerStatuses', []) - restarts = sum(cs['restartCount'] for cs in pod_status) - if restarts > 25: - logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} with high restarts") - result_logs = subprocess.run(["kubectl", "logs", "--namespace", namespace, name], capture_output=True, text=True) - if result_logs.stderr: - logger.debug({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"}) - command_outputs.append({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"}) - else: - logger.debug({f"Pod high restarts: {name}": result_logs.stdout.strip()}) - command_outputs.append({f"Pod high restarts: {name}": result_logs.stdout.strip()}) - return command_outputs + for container_status in pod_status: + if container_status['restartCount'] > 25: + container_name = container_status['name'] + # logger.debug(f"Fetching logs for Pod: {name}, Container: {container_name} in Namespace: {namespace} with high restarts") + fetch_logs(namespace, name, container_name, output_path) def k8s_diagnostics(commands:list): """ @@ -119,15 +121,6 @@ def k8s_diagnostics(commands:list): """ command_outputs = [] - if not hasattr(k8s_diagnostics, "already_called"): - command_outputs.extend(fetch_pod_logs_high_restarts()) - command_outputs.extend(fetch_pod_logs_not_running()) - - k8s_diagnostics.already_called = True - logger.debug("Logs have been fetched.") - else: - command_outputs = [] - logger.debug("Subsequent execution: Skipping logs") for command in commands: cmd_list = command.split() From f2e925bffa77aa1e3c0e236130fa79f0906d8158 Mon Sep 17 00:00:00 2001 From: Jayasimha Raghavan Date: Wed, 24 Apr 2024 21:59:26 -0700 Subject: [PATCH 2/7] Adding os._exit() --- unskript-ctl/unskript_ctl_main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/unskript-ctl/unskript_ctl_main.py b/unskript-ctl/unskript_ctl_main.py index 976283bdc..b737a0807 100644 --- a/unskript-ctl/unskript_ctl_main.py +++ b/unskript-ctl/unskript_ctl_main.py @@ -766,5 +766,7 @@ def rearrange_argv(argv): if args.command == 'run' and args.report: uc.notify(args) + os._exit(0) + if __name__ == '__main__': main() From 315585faeb24637543a74c61c034fe0b6f9e4af0 Mon Sep 17 00:00:00 2001 From: shloka-bhalgat-unskript Date: Thu, 25 Apr 2024 13:52:25 +0530 Subject: [PATCH 3/7] Remove logger statements --- unskript-ctl/diagnostics_worker.py | 66 +++++++++++++----------------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index 4cdfdc219..e85a651a7 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -11,16 +11,6 @@ logger = UctlLogger('UnskriptDiagnostics') - -def append_to_yaml_file(data, file_path): - if not data: - return - try: - with open(file_path, 'a') as file: - yaml.safe_dump(data, file, default_flow_style=False, allow_unicode=True) - except Exception as e: - logger.error(f"Failed to write data to {file_path}: {e}") - def mongodb_diagnostics(commands:list): """ mongodb_diagnostics runs mongocli command with command as the parameter @@ -52,10 +42,10 @@ def mongodb_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nMongodb Diagnostics") - logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nMongodb Diagnostics") + # logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") return command_outputs def fetch_logs(namespace, pod, container, output_path): @@ -134,10 +124,10 @@ def k8s_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\n Kubernetes Diagnostics") - logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\n Kubernetes Diagnostics") + # logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n") return command_outputs def redis_diagnostics(commands:list): @@ -174,10 +164,10 @@ def redis_diagnostics(commands:list): command_outputs.append({command: output}) except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nRedis Diagnostics") - logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nRedis Diagnostics") + # logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n") return command_outputs def postgresql_diagnostics(commands:list): @@ -210,10 +200,10 @@ def postgresql_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nPostgresql Diagnostics") - logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nPostgresql Diagnostics") + # logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n") return command_outputs def elasticsearch_diagnostics(commands: list) -> list: @@ -240,10 +230,10 @@ def elasticsearch_diagnostics(commands: list) -> list: except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nElasticsearch Diagnostics") - logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nElasticsearch Diagnostics") + # logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n") return command_outputs def keycloak_diagnostics(commands: list): @@ -269,10 +259,10 @@ def keycloak_diagnostics(commands: list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nKeycloak Diagnostics") - logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nKeycloak Diagnostics") + # logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n") return command_outputs def vault_diagnostics(commands: list): @@ -306,8 +296,8 @@ def vault_diagnostics(commands: list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nVault Diagnostics") - logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nVault Diagnostics") + # logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n") return command_outputs From 10ff5ecbfe5077fe9daa7de87ae7103d115a8a46 Mon Sep 17 00:00:00 2001 From: Jayasimha Raghavan Date: Sun, 28 Apr 2024 13:51:14 -0700 Subject: [PATCH 4/7] Fixed issue with missed checks reporting --- unskript-ctl/templates/last_cell_content.j2 | 31 ++++++++++++++++++++- unskript-ctl/unskript_ctl_run.py | 7 +++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/unskript-ctl/templates/last_cell_content.j2 b/unskript-ctl/templates/last_cell_content.j2 index d77ea77d6..805f44f72 100644 --- a/unskript-ctl/templates/last_cell_content.j2 +++ b/unskript-ctl/templates/last_cell_content.j2 @@ -1,19 +1,27 @@ from unskript.legos.utils import CheckOutput, CheckOutputStatus + global w global _logger all_outputs = [] other_outputs = [] +id_to_name = {} if _logger: _logger.debug(f"ERRORED CHECKS ARE: {w.errored_checks}") _logger.debug(f"TIMED OUT CHECKS ARE: {w.timeout_checks}") +if hasattr(w, 'check_uuid_entry_function_map'): + for key,value in w.check_uuid_entry_function_map.items(): + if value not in id_to_name: + id_to_name[value] = key + try: if 'w' in globals(): if w.check_run: for id,output in w.check_output.items(): output = json.loads(output) output['id'] = id + #output['name'] = id_to_name.get(id) if id else str() all_outputs.append(output) # Lets check if we have errored_checks or timeout_checks # exists, if yes then lets dump the output @@ -26,6 +34,7 @@ try: "objects": None, "error": err_msg, "id": str(_id) + #"name": str(name) }) if hasattr(w, 'errored_checks') and len(w.errored_checks): for name, err_msg in w.errored_checks.items(): @@ -35,6 +44,7 @@ try: "objects": None, "error": err_msg, "id": str(_id) + #"name": str(name) }) if other_outputs: @@ -48,14 +58,33 @@ try: if _logger: _logger.debug(f"FOUND DUPLICATE FOR {_other.get('id')}") + if _logger: + _logger.debug(f"OTHER OUTPUTS: {other_outputs}") existing_ids = set(output.get('id') for output in all_outputs) unique_other_outputs = [other_output for other_output in other_outputs if other_output.get('id') not in existing_ids] if unique_other_outputs: - all_outputs.extend(unique_other_outputs) + # Lets insert the unique other outputs at the same respective place + #all_outputs.extend(unique_other_outputs) + if _logger: + _logger.debug(f"LENGTH OF ALL OUTPUT BEFORE INSERT IS: {len(all_outputs)}") + for uo in unique_other_outputs: + insert_index = w.check_uuids.index(uo.get('id')) + if _logger: + _logger.debug(f"INSERTING RESULT FOR {uo.get('id')} at {insert_index} position") + if insert_index: + all_outputs.insert(insert_index, uo) + if not all_outputs: all_outputs = other_outputs + _outputs_with_valid_names = [] + for _output in all_outputs: + if id_to_name.get(_output.get('id')): + _outputs_with_valid_names.append(_output) + if _logger: + _logger.debug(f"All output has result for ID: {_output.get('id')} Name: {id_to_name.get(_output.get('id'))} Status: {_output.get('status')}") + all_outputs = _outputs_with_valid_names for _output in all_outputs: print(json.dumps(_output)) else: diff --git a/unskript-ctl/unskript_ctl_run.py b/unskript-ctl/unskript_ctl_run.py index 1d4330076..264b7cc6a 100644 --- a/unskript-ctl/unskript_ctl_run.py +++ b/unskript-ctl/unskript_ctl_run.py @@ -165,7 +165,7 @@ def display_check_result(self, checks_output): ids = self.check_uuids failed_result_available = False failed_result = {} - checks_output = self.output_after_merging_checks(checks_output, self.check_uuids) + #checks_output = self.output_after_merging_checks(checks_output, self.check_uuids) for result in checks_output: if result.get('skip') and result.get('skip') is True: idx += 1 @@ -406,7 +406,10 @@ def get_first_cell_content(self, list_of_checks: list): first_cell_content += f'{k}{index} = \"{value}\"' + '\n' first_cell_content += f'''w = Workflow(env, secret_store_cfg, None, global_vars=globals(), check_uuids={self.check_uuids})''' + '\n' # temp_map = {key: value for key, value in zip(self.check_entry_functions, self.check_uuids)} - temp_map = dict(zip(self.check_entry_functions, self.check_uuids)) + # temp_map = dict(zip(self.check_entry_functions, self.check_uuids)) + temp_map = {} + for index,value in enumerate(self.check_uuids): + temp_map[value] = self.check_entry_functions[index] first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}''' + '\n' first_cell_content += '''w.errored_checks = {}''' + '\n' first_cell_content += '''w.timeout_checks = {}''' + '\n' From 8b2a2a2ff7553fbd8781476aeaa829749ace50e5 Mon Sep 17 00:00:00 2001 From: Jayasimha Raghavan Date: Sun, 28 Apr 2024 14:55:03 -0700 Subject: [PATCH 5/7] Fixed priority mapping for checks --- unskript-ctl/unskript_ctl_run.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/unskript-ctl/unskript_ctl_run.py b/unskript-ctl/unskript_ctl_run.py index 264b7cc6a..19c3e88b2 100644 --- a/unskript-ctl/unskript_ctl_run.py +++ b/unskript-ctl/unskript_ctl_run.py @@ -59,6 +59,7 @@ def __init__(self, **kwargs): self.prioritized_checks_to_id_mapping = {} self.map_entry_function_to_check_name = {} self.map_check_name_to_connector = {} + self.check_name_to_id_mapping = {} for k,v in self.checks_globals.items(): os.environ[k] = json.dumps(v) @@ -172,12 +173,13 @@ def display_check_result(self, checks_output): continue payload = result try: + _action_uuid = payload.get('id') if self.checks_priority is None: priority = CHECK_PRIORITY_P2 else: - priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2) + # priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2) + priority = self.checks_priority.get(self.check_name_to_id_mapping.get(_action_uuid), CHECK_PRIORITY_P2) - _action_uuid = payload.get('id') if _action_uuid: #c_name = self.connector_types[idx] + ':' + self.prioritized_checks_to_id_mapping[_action_uuid] p_check_name = self.prioritized_checks_to_id_mapping[_action_uuid] @@ -409,7 +411,9 @@ def get_first_cell_content(self, list_of_checks: list): # temp_map = dict(zip(self.check_entry_functions, self.check_uuids)) temp_map = {} for index,value in enumerate(self.check_uuids): - temp_map[value] = self.check_entry_functions[index] + temp_map[self.check_entry_functions[index]] = value + self.check_name_to_id_mapping[value] = self.check_entry_functions[index] + first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}''' + '\n' first_cell_content += '''w.errored_checks = {}''' + '\n' first_cell_content += '''w.timeout_checks = {}''' + '\n' From 881e0c890b0e55e78e320b49c49f64b00c40d35c Mon Sep 17 00:00:00 2001 From: shloka-bhalgat-unskript Date: Mon, 29 Apr 2024 18:30:55 +0530 Subject: [PATCH 6/7] Update log collection functions --- unskript-ctl/diagnostics_worker.py | 119 ++++++++++++++++------------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index e85a651a7..691377347 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -5,7 +5,7 @@ import os import subprocess import json -from unskript_ctl_factory import UctlLogger +from unskript_ctl_factory import UctlLogger, ConfigParserFactory import yaml @@ -48,62 +48,75 @@ def mongodb_diagnostics(commands:list): # logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") return command_outputs -def fetch_logs(namespace, pod, container, output_path): - """ - Fetches logs and previous logs for a specified container in a pod and writes directly to a file with headers and separators. - """ - logs_file_path = os.path.join(output_path, 'logs.txt') - separator = f"\n{'=' * 40}\n" - header = f"Logs for Namespace: {namespace}, Pod: {pod}, Container: {container}\n" - header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}, Container: {container}\n" - - try: - # Write header and current logs to file - with open(logs_file_path, 'a') as f: - f.write(separator + header) - subprocess.run(["kubectl", "logs", "--namespace", namespace, pod, "-c", container], - stdout=f, stderr=f, text=True, check=False) - - # Write header for previous logs and the logs themselves to file - with open(logs_file_path, 'a') as f: - f.write(separator + header_previous) - subprocess.run(["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"], - stdout=f, stderr=f, text=True, check=False) - - except Exception as e: - logger.error(f"Failed to fetch and write logs for {namespace}/{pod}/{container}: {e}") +def get_matrix_namespaces(): + config_parser = ConfigParserFactory() + global_params = config_parser.get_checks_params() + + if 'global' in global_params and 'matrix' in global_params['global']: + namespaces = global_params['global']['matrix'].get('namespace', []) + return namespaces + return [] + +def fetch_logs(namespace, pod, output_path): + logs_file_path = os.path.join(output_path, f'logs.txt') + separator = "\n" + "=" * 40 + "\n" + header = f"Logs for Namespace: {namespace}, Pod: {pod}\n" + header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}\n" + + with open(logs_file_path, 'a') as log_file: + log_file.write(separator + header) + # Fetch current logs + proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod], + stdout=log_file, stderr=subprocess.PIPE, text=True) + stderr = proc.communicate()[1] + if proc.returncode != 0: + logger.debug(f"Error fetching logs for {pod}: {stderr}") + + log_file.write(separator + header_previous) + # Fetch previous logs + proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod, "--previous"], + stdout=log_file, stderr=subprocess.PIPE, text=True) + stderr = proc.communicate()[1] + if proc.returncode != 0: + logger.debug(f"Error fetching previous logs for {pod}: {stderr}") def fetch_pod_logs_not_running(output_path): - logger.debug("\nK8s Diagnostics: Fetching logs for pods not running") - cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] - result = subprocess.run(cmd, capture_output=True, text=True) - pods = json.loads(result.stdout)['items'] - - for pod in pods: - namespace = pod['metadata']['namespace'] - name = pod['metadata']['name'] - status = pod['status']['phase'] - if status != "Running": - # logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)") - containers = [c['name'] for c in pod['spec'].get('initContainers', []) + pod['spec'].get('containers', [])] - for container in containers: - fetch_logs(namespace, name, container, output_path) + allowed_namespaces = get_matrix_namespaces() + for namespace in allowed_namespaces: + proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + logger.debug(f"Error fetching pods: {stderr}") + continue + + try: + pods = json.loads(stdout)['items'] + for pod in pods: + if pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded": + fetch_logs(namespace, pod['metadata']['name'], output_path) + except json.JSONDecodeError: + logger.debug(f"Failed to decode JSON response: {stdout}") def fetch_pod_logs_high_restarts(output_path): - logger.debug("\nK8s Diagnostics: Fetching logs for pods with high restarts") - cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] - result = subprocess.run(cmd, capture_output=True, text=True) - pods = json.loads(result.stdout)['items'] - - for pod in pods: - namespace = pod['metadata']['namespace'] - name = pod['metadata']['name'] - pod_status = pod['status'].get('containerStatuses', []) - for container_status in pod_status: - if container_status['restartCount'] > 25: - container_name = container_status['name'] - # logger.debug(f"Fetching logs for Pod: {name}, Container: {container_name} in Namespace: {namespace} with high restarts") - fetch_logs(namespace, name, container_name, output_path) + allowed_namespaces = get_matrix_namespaces() + for namespace in allowed_namespaces: + proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + logger.debug(f"Error fetching pods: {stderr}") + continue + + try: + pods = json.loads(stdout)['items'] + for pod in pods: + for cs in pod['status'].get('containerStatuses', []): + if cs['restartCount'] > 25: + fetch_logs(namespace, pod['metadata']['name'], output_path) + except json.JSONDecodeError: + logger.debug(f"Failed to decode JSON response: {stdout}") + def k8s_diagnostics(commands:list): """ From 8380778fd6f3e8908f3efd3c0d6670b60c365cba Mon Sep 17 00:00:00 2001 From: shloka-bhalgat-unskript Date: Wed, 1 May 2024 09:29:19 +0530 Subject: [PATCH 7/7] Add threadpool executor --- unskript-ctl/diagnostics_worker.py | 64 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index 691377347..ab3612c8f 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -6,7 +6,7 @@ import subprocess import json from unskript_ctl_factory import UctlLogger, ConfigParserFactory -import yaml +from concurrent.futures import ThreadPoolExecutor logger = UctlLogger('UnskriptDiagnostics') @@ -80,42 +80,42 @@ def fetch_logs(namespace, pod, output_path): if proc.returncode != 0: logger.debug(f"Error fetching previous logs for {pod}: {stderr}") +def fetch_pod_logs_for_namespace(namespace, output_path, condition='not_running'): + # logger.debug(f"Starting log fetch for namespace: {namespace} with condition: {condition}") + proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + logger.debug(f"Error fetching pods in namespace {namespace}: {stderr}") + return + + try: + pods = json.loads(stdout)['items'] + for pod in pods: + if condition == 'not_running' and (pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded"): + # logger.debug(f"Fetching logs for not running/succeeded pod: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + elif condition == 'high_restarts': + for cs in pod['status'].get('containerStatuses', []): + if cs['restartCount'] > 25: + # logger.debug(f"Fetching logs for pod with high restarts: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + except json.JSONDecodeError: + logger.debug(f"Failed to decode JSON response from kubectl get pods in namespace {namespace}: {stdout}") + def fetch_pod_logs_not_running(output_path): allowed_namespaces = get_matrix_namespaces() - for namespace in allowed_namespaces: - proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - logger.debug(f"Error fetching pods: {stderr}") - continue - - try: - pods = json.loads(stdout)['items'] - for pod in pods: - if pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded": - fetch_logs(namespace, pod['metadata']['name'], output_path) - except json.JSONDecodeError: - logger.debug(f"Failed to decode JSON response: {stdout}") + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods not running across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'not_running') def fetch_pod_logs_high_restarts(output_path): allowed_namespaces = get_matrix_namespaces() - for namespace in allowed_namespaces: - proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - logger.debug(f"Error fetching pods: {stderr}") - continue - - try: - pods = json.loads(stdout)['items'] - for pod in pods: - for cs in pod['status'].get('containerStatuses', []): - if cs['restartCount'] > 25: - fetch_logs(namespace, pod['metadata']['name'], output_path) - except json.JSONDecodeError: - logger.debug(f"Failed to decode JSON response: {stdout}") + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods with high restarts across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'high_restarts') def k8s_diagnostics(commands:list):