Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change log collection for diagnostics #1051

7 changes: 5 additions & 2 deletions unskript-ctl/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def execute_diagnostics(self, diag_commands):
if function:
# Call the function with the commands
diag_outputs[entry_function] = function(commands)
print(f"Function '{function_name}' is accessible.")
else:
raise ValueError(f"Function '{function_name}' not found in the global namespace.")
except Exception as e:
Expand All @@ -127,7 +126,7 @@ def main(self):
if not diag_commands:
print("Skipping Diagnostics: No diagnostic command found. You can define them in the YAML configuration file")
return

print("\nRunning Diagnostics...")
diag_outputs = self.execute_diagnostics(diag_commands)

if diag_outputs:
Expand All @@ -144,6 +143,10 @@ def main(args):
parser.add_argument("--output-dir-path", '-o', help="Path to output directory", required=True)
ap = parser.parse_args(args)

print("\nFetching logs...")
fetch_pod_logs_not_running(ap.output_dir_path)
fetch_pod_logs_high_restarts(ap.output_dir_path)

diagnostics_script = DiagnosticsScript(ap)
diagnostics_script.main()

Expand Down
200 changes: 98 additions & 102 deletions unskript-ctl/diagnostics_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import os
import subprocess
import json
from unskript_ctl_factory import UctlLogger
from unskript_ctl_factory import UctlLogger, ConfigParserFactory
from concurrent.futures import ThreadPoolExecutor


logger = UctlLogger('UnskriptDiagnostics')


def mongodb_diagnostics(commands:list):
"""
mongodb_diagnostics runs mongocli command with command as the parameter
Expand Down Expand Up @@ -42,92 +42,88 @@ def mongodb_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nMongodb Diagnostics")
logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nMongodb Diagnostics")
# logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def fetch_logs(namespace, pod, container):
"""
Fetches logs and previous logs for a specified container in a pod.
"""
outputs = []
cmd_logs = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container]
result_logs = subprocess.run(cmd_logs, capture_output=True, text=True)
if result_logs.stderr:
outputs.append(f"Error: {result_logs.stderr.strip()}")
else:
outputs.append(result_logs.stdout.strip())
def get_matrix_namespaces():
config_parser = ConfigParserFactory()
global_params = config_parser.get_checks_params()

if 'global' in global_params and 'matrix' in global_params['global']:
namespaces = global_params['global']['matrix'].get('namespace', [])
return namespaces
return []

def fetch_logs(namespace, pod, output_path):
logs_file_path = os.path.join(output_path, f'logs.txt')
separator = "\n" + "=" * 40 + "\n"
header = f"Logs for Namespace: {namespace}, Pod: {pod}\n"
header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}\n"

with open(logs_file_path, 'a') as log_file:
log_file.write(separator + header)
# Fetch current logs
proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod],
stdout=log_file, stderr=subprocess.PIPE, text=True)
stderr = proc.communicate()[1]
if proc.returncode != 0:
logger.debug(f"Error fetching logs for {pod}: {stderr}")

log_file.write(separator + header_previous)
# Fetch previous logs
proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod, "--previous"],
stdout=log_file, stderr=subprocess.PIPE, text=True)
stderr = proc.communicate()[1]
if proc.returncode != 0:
logger.debug(f"Error fetching previous logs for {pod}: {stderr}")

def fetch_pod_logs_for_namespace(namespace, output_path, condition='not_running'):
# logger.debug(f"Starting log fetch for namespace: {namespace} with condition: {condition}")
proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
logger.debug(f"Error fetching pods in namespace {namespace}: {stderr}")
return

try:
pods = json.loads(stdout)['items']
for pod in pods:
if condition == 'not_running' and (pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded"):
# logger.debug(f"Fetching logs for not running/succeeded pod: {pod['metadata']['name']} in {namespace}")
fetch_logs(namespace, pod['metadata']['name'], output_path)
elif condition == 'high_restarts':
for cs in pod['status'].get('containerStatuses', []):
if cs['restartCount'] > 25:
# logger.debug(f"Fetching logs for pod with high restarts: {pod['metadata']['name']} in {namespace}")
fetch_logs(namespace, pod['metadata']['name'], output_path)
except json.JSONDecodeError:
logger.debug(f"Failed to decode JSON response from kubectl get pods in namespace {namespace}: {stdout}")

def fetch_pod_logs_not_running(output_path):
allowed_namespaces = get_matrix_namespaces()
with ThreadPoolExecutor(max_workers=5) as executor:
# logger.debug("Initiating ThreadPool to fetch logs for pods not running across namespaces")
for namespace in allowed_namespaces:
executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'not_running')

def fetch_pod_logs_high_restarts(output_path):
allowed_namespaces = get_matrix_namespaces()
with ThreadPoolExecutor(max_workers=5) as executor:
# logger.debug("Initiating ThreadPool to fetch logs for pods with high restarts across namespaces")
for namespace in allowed_namespaces:
executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'high_restarts')

cmd_logs_previous = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"]
result_logs_previous = subprocess.run(cmd_logs_previous, capture_output=True, text=True)
if result_logs_previous.stderr:
outputs.append(f"Error: {result_logs_previous.stderr.strip()}")
else:
outputs.append(result_logs_previous.stdout.strip())

return outputs

def fetch_pod_logs_not_running():
logger.debug("\nK8s Diagnostics: Fetching logs for pods not running")
command_outputs = []
cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
pods = json.loads(result.stdout)['items']

for pod in pods:
namespace = pod['metadata']['namespace']
name = pod['metadata']['name']
status = pod['status']['phase']
if status != "Running":
logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)")
containers = [c['name'] for c in pod['spec'].get('initContainers', []) + pod['spec'].get('containers', [])]
for container in containers:
logs_output = fetch_logs(namespace, name, container)
for output in logs_output:
logger.debug({f"Pod Not Running: {name}, Container: {container}": output})
command_outputs.append({f"Pod Not Running: {name}, Container: {container}": output})
return command_outputs

def fetch_pod_logs_high_restarts():
logger.debug("\nK8s Diagnostics: Fetching logs for pods with high restarts")
command_outputs = []
cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
pods = json.loads(result.stdout)['items']

for pod in pods:
namespace = pod['metadata']['namespace']
name = pod['metadata']['name']
pod_status = pod['status'].get('containerStatuses', [])
restarts = sum(cs['restartCount'] for cs in pod_status)
if restarts > 25:
logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} with high restarts")
result_logs = subprocess.run(["kubectl", "logs", "--namespace", namespace, name], capture_output=True, text=True)
if result_logs.stderr:
logger.debug({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"})
command_outputs.append({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"})
else:
logger.debug({f"Pod high restarts: {name}": result_logs.stdout.strip()})
command_outputs.append({f"Pod high restarts: {name}": result_logs.stdout.strip()})
return command_outputs

def k8s_diagnostics(commands:list):
"""
k8s_diagnostics runs kubectl command

"""
command_outputs = []
if not hasattr(k8s_diagnostics, "already_called"):
command_outputs.extend(fetch_pod_logs_high_restarts())
command_outputs.extend(fetch_pod_logs_not_running())

k8s_diagnostics.already_called = True
logger.debug("Logs have been fetched.")
else:
command_outputs = []
logger.debug("Subsequent execution: Skipping logs")

for command in commands:
cmd_list = command.split()
Expand All @@ -141,10 +137,10 @@ def k8s_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\n Kubernetes Diagnostics")
logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\n Kubernetes Diagnostics")
# logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def redis_diagnostics(commands:list):
Expand Down Expand Up @@ -181,10 +177,10 @@ def redis_diagnostics(commands:list):
command_outputs.append({command: output})
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})
for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nRedis Diagnostics")
logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nRedis Diagnostics")
# logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def postgresql_diagnostics(commands:list):
Expand Down Expand Up @@ -217,10 +213,10 @@ def postgresql_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nPostgresql Diagnostics")
logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nPostgresql Diagnostics")
# logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def elasticsearch_diagnostics(commands: list) -> list:
Expand All @@ -247,10 +243,10 @@ def elasticsearch_diagnostics(commands: list) -> list:
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nElasticsearch Diagnostics")
logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nElasticsearch Diagnostics")
# logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def keycloak_diagnostics(commands: list):
Expand All @@ -276,10 +272,10 @@ def keycloak_diagnostics(commands: list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nKeycloak Diagnostics")
logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nKeycloak Diagnostics")
# logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def vault_diagnostics(commands: list):
Expand Down Expand Up @@ -313,8 +309,8 @@ def vault_diagnostics(commands: list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nVault Diagnostics")
logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nVault Diagnostics")
# logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n")
return command_outputs
31 changes: 30 additions & 1 deletion unskript-ctl/templates/last_cell_content.j2
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from unskript.legos.utils import CheckOutput, CheckOutputStatus

global w
global _logger

all_outputs = []
other_outputs = []
id_to_name = {}
if _logger:
_logger.debug(f"ERRORED CHECKS ARE: {w.errored_checks}")
_logger.debug(f"TIMED OUT CHECKS ARE: {w.timeout_checks}")

if hasattr(w, 'check_uuid_entry_function_map'):
for key,value in w.check_uuid_entry_function_map.items():
if value not in id_to_name:
id_to_name[value] = key

try:
if 'w' in globals():
if w.check_run:
for id,output in w.check_output.items():
output = json.loads(output)
output['id'] = id
#output['name'] = id_to_name.get(id) if id else str()
all_outputs.append(output)
# Lets check if we have errored_checks or timeout_checks
# exists, if yes then lets dump the output
Expand All @@ -26,6 +34,7 @@ try:
"objects": None,
"error": err_msg,
"id": str(_id)
#"name": str(name)
})
if hasattr(w, 'errored_checks') and len(w.errored_checks):
for name, err_msg in w.errored_checks.items():
Expand All @@ -35,6 +44,7 @@ try:
"objects": None,
"error": err_msg,
"id": str(_id)
#"name": str(name)
})

if other_outputs:
Expand All @@ -48,14 +58,33 @@ try:
if _logger:
_logger.debug(f"FOUND DUPLICATE FOR {_other.get('id')}")

if _logger:
_logger.debug(f"OTHER OUTPUTS: {other_outputs}")
existing_ids = set(output.get('id') for output in all_outputs)
unique_other_outputs = [other_output for other_output in other_outputs if other_output.get('id') not in existing_ids]
if unique_other_outputs:
all_outputs.extend(unique_other_outputs)
# Lets insert the unique other outputs at the same respective place
#all_outputs.extend(unique_other_outputs)
if _logger:
_logger.debug(f"LENGTH OF ALL OUTPUT BEFORE INSERT IS: {len(all_outputs)}")
for uo in unique_other_outputs:
insert_index = w.check_uuids.index(uo.get('id'))
if _logger:
_logger.debug(f"INSERTING RESULT FOR {uo.get('id')} at {insert_index} position")
if insert_index:
all_outputs.insert(insert_index, uo)


if not all_outputs:
all_outputs = other_outputs

_outputs_with_valid_names = []
for _output in all_outputs:
if id_to_name.get(_output.get('id')):
_outputs_with_valid_names.append(_output)
if _logger:
_logger.debug(f"All output has result for ID: {_output.get('id')} Name: {id_to_name.get(_output.get('id'))} Status: {_output.get('status')}")
all_outputs = _outputs_with_valid_names
for _output in all_outputs:
print(json.dumps(_output))
else:
Expand Down
2 changes: 2 additions & 0 deletions unskript-ctl/unskript_ctl_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,5 +766,7 @@ def rearrange_argv(argv):
if args.command == 'run' and args.report:
uc.notify(args)

os._exit(0)

if __name__ == '__main__':
main()
Loading
Loading