diff --git a/unskript-ctl/diagnostics.py b/unskript-ctl/diagnostics.py index f594e00c1..fdfd31312 100644 --- a/unskript-ctl/diagnostics.py +++ b/unskript-ctl/diagnostics.py @@ -106,7 +106,6 @@ def execute_diagnostics(self, diag_commands): if function: # Call the function with the commands diag_outputs[entry_function] = function(commands) - print(f"Function '{function_name}' is accessible.") else: raise ValueError(f"Function '{function_name}' not found in the global namespace.") except Exception as e: @@ -127,7 +126,7 @@ def main(self): if not diag_commands: print("Skipping Diagnostics: No diagnostic command found. You can define them in the YAML configuration file") return - + print("\nRunning Diagnostics...") diag_outputs = self.execute_diagnostics(diag_commands) if diag_outputs: @@ -144,6 +143,10 @@ def main(args): parser.add_argument("--output-dir-path", '-o', help="Path to output directory", required=True) ap = parser.parse_args(args) + print("\nFetching logs...") + fetch_pod_logs_not_running(ap.output_dir_path) + fetch_pod_logs_high_restarts(ap.output_dir_path) + diagnostics_script = DiagnosticsScript(ap) diagnostics_script.main() diff --git a/unskript-ctl/diagnostics_worker.py b/unskript-ctl/diagnostics_worker.py index 22a459453..ab3612c8f 100644 --- a/unskript-ctl/diagnostics_worker.py +++ b/unskript-ctl/diagnostics_worker.py @@ -5,12 +5,12 @@ import os import subprocess import json -from unskript_ctl_factory import UctlLogger +from unskript_ctl_factory import UctlLogger, ConfigParserFactory +from concurrent.futures import ThreadPoolExecutor logger = UctlLogger('UnskriptDiagnostics') - def mongodb_diagnostics(commands:list): """ mongodb_diagnostics runs mongocli command with command as the parameter @@ -42,76 +42,81 @@ def mongodb_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nMongodb Diagnostics") - logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nMongodb Diagnostics") + # logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n") return command_outputs -def fetch_logs(namespace, pod, container): - """ - Fetches logs and previous logs for a specified container in a pod. - """ - outputs = [] - cmd_logs = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container] - result_logs = subprocess.run(cmd_logs, capture_output=True, text=True) - if result_logs.stderr: - outputs.append(f"Error: {result_logs.stderr.strip()}") - else: - outputs.append(result_logs.stdout.strip()) +def get_matrix_namespaces(): + config_parser = ConfigParserFactory() + global_params = config_parser.get_checks_params() + + if 'global' in global_params and 'matrix' in global_params['global']: + namespaces = global_params['global']['matrix'].get('namespace', []) + return namespaces + return [] + +def fetch_logs(namespace, pod, output_path): + logs_file_path = os.path.join(output_path, f'logs.txt') + separator = "\n" + "=" * 40 + "\n" + header = f"Logs for Namespace: {namespace}, Pod: {pod}\n" + header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}\n" + + with open(logs_file_path, 'a') as log_file: + log_file.write(separator + header) + # Fetch current logs + proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod], + stdout=log_file, stderr=subprocess.PIPE, text=True) + stderr = proc.communicate()[1] + if proc.returncode != 0: + logger.debug(f"Error fetching logs for {pod}: {stderr}") + + log_file.write(separator + header_previous) + # Fetch previous logs + proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod, "--previous"], + stdout=log_file, stderr=subprocess.PIPE, text=True) + stderr = proc.communicate()[1] + if proc.returncode != 0: + logger.debug(f"Error fetching previous logs for {pod}: {stderr}") + +def fetch_pod_logs_for_namespace(namespace, output_path, condition='not_running'): + # logger.debug(f"Starting log fetch for namespace: {namespace} with condition: {condition}") + proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + logger.debug(f"Error fetching pods in namespace {namespace}: {stderr}") + return + + try: + pods = json.loads(stdout)['items'] + for pod in pods: + if condition == 'not_running' and (pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded"): + # logger.debug(f"Fetching logs for not running/succeeded pod: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + elif condition == 'high_restarts': + for cs in pod['status'].get('containerStatuses', []): + if cs['restartCount'] > 25: + # logger.debug(f"Fetching logs for pod with high restarts: {pod['metadata']['name']} in {namespace}") + fetch_logs(namespace, pod['metadata']['name'], output_path) + except json.JSONDecodeError: + logger.debug(f"Failed to decode JSON response from kubectl get pods in namespace {namespace}: {stdout}") + +def fetch_pod_logs_not_running(output_path): + allowed_namespaces = get_matrix_namespaces() + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods not running across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'not_running') + +def fetch_pod_logs_high_restarts(output_path): + allowed_namespaces = get_matrix_namespaces() + with ThreadPoolExecutor(max_workers=5) as executor: + # logger.debug("Initiating ThreadPool to fetch logs for pods with high restarts across namespaces") + for namespace in allowed_namespaces: + executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'high_restarts') - cmd_logs_previous = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"] - result_logs_previous = subprocess.run(cmd_logs_previous, capture_output=True, text=True) - if result_logs_previous.stderr: - outputs.append(f"Error: {result_logs_previous.stderr.strip()}") - else: - outputs.append(result_logs_previous.stdout.strip()) - - return outputs - -def fetch_pod_logs_not_running(): - logger.debug("\nK8s Diagnostics: Fetching logs for pods not running") - command_outputs = [] - cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] - result = subprocess.run(cmd, capture_output=True, text=True) - pods = json.loads(result.stdout)['items'] - - for pod in pods: - namespace = pod['metadata']['namespace'] - name = pod['metadata']['name'] - status = pod['status']['phase'] - if status != "Running": - logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)") - containers = [c['name'] for c in pod['spec'].get('initContainers', []) + pod['spec'].get('containers', [])] - for container in containers: - logs_output = fetch_logs(namespace, name, container) - for output in logs_output: - logger.debug({f"Pod Not Running: {name}, Container: {container}": output}) - command_outputs.append({f"Pod Not Running: {name}, Container: {container}": output}) - return command_outputs - -def fetch_pod_logs_high_restarts(): - logger.debug("\nK8s Diagnostics: Fetching logs for pods with high restarts") - command_outputs = [] - cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"] - result = subprocess.run(cmd, capture_output=True, text=True) - pods = json.loads(result.stdout)['items'] - - for pod in pods: - namespace = pod['metadata']['namespace'] - name = pod['metadata']['name'] - pod_status = pod['status'].get('containerStatuses', []) - restarts = sum(cs['restartCount'] for cs in pod_status) - if restarts > 25: - logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} with high restarts") - result_logs = subprocess.run(["kubectl", "logs", "--namespace", namespace, name], capture_output=True, text=True) - if result_logs.stderr: - logger.debug({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"}) - command_outputs.append({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"}) - else: - logger.debug({f"Pod high restarts: {name}": result_logs.stdout.strip()}) - command_outputs.append({f"Pod high restarts: {name}": result_logs.stdout.strip()}) - return command_outputs def k8s_diagnostics(commands:list): """ @@ -119,15 +124,6 @@ def k8s_diagnostics(commands:list): """ command_outputs = [] - if not hasattr(k8s_diagnostics, "already_called"): - command_outputs.extend(fetch_pod_logs_high_restarts()) - command_outputs.extend(fetch_pod_logs_not_running()) - - k8s_diagnostics.already_called = True - logger.debug("Logs have been fetched.") - else: - command_outputs = [] - logger.debug("Subsequent execution: Skipping logs") for command in commands: cmd_list = command.split() @@ -141,10 +137,10 @@ def k8s_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\n Kubernetes Diagnostics") - logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\n Kubernetes Diagnostics") + # logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n") return command_outputs def redis_diagnostics(commands:list): @@ -181,10 +177,10 @@ def redis_diagnostics(commands:list): command_outputs.append({command: output}) except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nRedis Diagnostics") - logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nRedis Diagnostics") + # logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n") return command_outputs def postgresql_diagnostics(commands:list): @@ -217,10 +213,10 @@ def postgresql_diagnostics(commands:list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nPostgresql Diagnostics") - logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nPostgresql Diagnostics") + # logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n") return command_outputs def elasticsearch_diagnostics(commands: list) -> list: @@ -247,10 +243,10 @@ def elasticsearch_diagnostics(commands: list) -> list: except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nElasticsearch Diagnostics") - logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nElasticsearch Diagnostics") + # logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n") return command_outputs def keycloak_diagnostics(commands: list): @@ -276,10 +272,10 @@ def keycloak_diagnostics(commands: list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nKeycloak Diagnostics") - logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nKeycloak Diagnostics") + # logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n") return command_outputs def vault_diagnostics(commands: list): @@ -313,8 +309,8 @@ def vault_diagnostics(commands: list): except Exception as e: command_outputs.append({command: f"Exception: {str(e)}"}) - for result_dict in command_outputs: - for command, cmd_output in result_dict.items(): - logger.debug("\nVault Diagnostics") - logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n") + # for result_dict in command_outputs: + # for command, cmd_output in result_dict.items(): + # logger.debug("\nVault Diagnostics") + # logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n") return command_outputs diff --git a/unskript-ctl/templates/last_cell_content.j2 b/unskript-ctl/templates/last_cell_content.j2 index d77ea77d6..805f44f72 100644 --- a/unskript-ctl/templates/last_cell_content.j2 +++ b/unskript-ctl/templates/last_cell_content.j2 @@ -1,19 +1,27 @@ from unskript.legos.utils import CheckOutput, CheckOutputStatus + global w global _logger all_outputs = [] other_outputs = [] +id_to_name = {} if _logger: _logger.debug(f"ERRORED CHECKS ARE: {w.errored_checks}") _logger.debug(f"TIMED OUT CHECKS ARE: {w.timeout_checks}") +if hasattr(w, 'check_uuid_entry_function_map'): + for key,value in w.check_uuid_entry_function_map.items(): + if value not in id_to_name: + id_to_name[value] = key + try: if 'w' in globals(): if w.check_run: for id,output in w.check_output.items(): output = json.loads(output) output['id'] = id + #output['name'] = id_to_name.get(id) if id else str() all_outputs.append(output) # Lets check if we have errored_checks or timeout_checks # exists, if yes then lets dump the output @@ -26,6 +34,7 @@ try: "objects": None, "error": err_msg, "id": str(_id) + #"name": str(name) }) if hasattr(w, 'errored_checks') and len(w.errored_checks): for name, err_msg in w.errored_checks.items(): @@ -35,6 +44,7 @@ try: "objects": None, "error": err_msg, "id": str(_id) + #"name": str(name) }) if other_outputs: @@ -48,14 +58,33 @@ try: if _logger: _logger.debug(f"FOUND DUPLICATE FOR {_other.get('id')}") + if _logger: + _logger.debug(f"OTHER OUTPUTS: {other_outputs}") existing_ids = set(output.get('id') for output in all_outputs) unique_other_outputs = [other_output for other_output in other_outputs if other_output.get('id') not in existing_ids] if unique_other_outputs: - all_outputs.extend(unique_other_outputs) + # Lets insert the unique other outputs at the same respective place + #all_outputs.extend(unique_other_outputs) + if _logger: + _logger.debug(f"LENGTH OF ALL OUTPUT BEFORE INSERT IS: {len(all_outputs)}") + for uo in unique_other_outputs: + insert_index = w.check_uuids.index(uo.get('id')) + if _logger: + _logger.debug(f"INSERTING RESULT FOR {uo.get('id')} at {insert_index} position") + if insert_index: + all_outputs.insert(insert_index, uo) + if not all_outputs: all_outputs = other_outputs + _outputs_with_valid_names = [] + for _output in all_outputs: + if id_to_name.get(_output.get('id')): + _outputs_with_valid_names.append(_output) + if _logger: + _logger.debug(f"All output has result for ID: {_output.get('id')} Name: {id_to_name.get(_output.get('id'))} Status: {_output.get('status')}") + all_outputs = _outputs_with_valid_names for _output in all_outputs: print(json.dumps(_output)) else: diff --git a/unskript-ctl/unskript_ctl_main.py b/unskript-ctl/unskript_ctl_main.py index 976283bdc..b737a0807 100644 --- a/unskript-ctl/unskript_ctl_main.py +++ b/unskript-ctl/unskript_ctl_main.py @@ -766,5 +766,7 @@ def rearrange_argv(argv): if args.command == 'run' and args.report: uc.notify(args) + os._exit(0) + if __name__ == '__main__': main() diff --git a/unskript-ctl/unskript_ctl_run.py b/unskript-ctl/unskript_ctl_run.py index 1d4330076..19c3e88b2 100644 --- a/unskript-ctl/unskript_ctl_run.py +++ b/unskript-ctl/unskript_ctl_run.py @@ -59,6 +59,7 @@ def __init__(self, **kwargs): self.prioritized_checks_to_id_mapping = {} self.map_entry_function_to_check_name = {} self.map_check_name_to_connector = {} + self.check_name_to_id_mapping = {} for k,v in self.checks_globals.items(): os.environ[k] = json.dumps(v) @@ -165,19 +166,20 @@ def display_check_result(self, checks_output): ids = self.check_uuids failed_result_available = False failed_result = {} - checks_output = self.output_after_merging_checks(checks_output, self.check_uuids) + #checks_output = self.output_after_merging_checks(checks_output, self.check_uuids) for result in checks_output: if result.get('skip') and result.get('skip') is True: idx += 1 continue payload = result try: + _action_uuid = payload.get('id') if self.checks_priority is None: priority = CHECK_PRIORITY_P2 else: - priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2) + # priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2) + priority = self.checks_priority.get(self.check_name_to_id_mapping.get(_action_uuid), CHECK_PRIORITY_P2) - _action_uuid = payload.get('id') if _action_uuid: #c_name = self.connector_types[idx] + ':' + self.prioritized_checks_to_id_mapping[_action_uuid] p_check_name = self.prioritized_checks_to_id_mapping[_action_uuid] @@ -406,7 +408,12 @@ def get_first_cell_content(self, list_of_checks: list): first_cell_content += f'{k}{index} = \"{value}\"' + '\n' first_cell_content += f'''w = Workflow(env, secret_store_cfg, None, global_vars=globals(), check_uuids={self.check_uuids})''' + '\n' # temp_map = {key: value for key, value in zip(self.check_entry_functions, self.check_uuids)} - temp_map = dict(zip(self.check_entry_functions, self.check_uuids)) + # temp_map = dict(zip(self.check_entry_functions, self.check_uuids)) + temp_map = {} + for index,value in enumerate(self.check_uuids): + temp_map[self.check_entry_functions[index]] = value + self.check_name_to_id_mapping[value] = self.check_entry_functions[index] + first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}''' + '\n' first_cell_content += '''w.errored_checks = {}''' + '\n' first_cell_content += '''w.timeout_checks = {}''' + '\n'