Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into get-vault-secret-for-smtp
Browse files Browse the repository at this point in the history
jayasimha-raghavan-unskript authored Jul 24, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents b1f1379 + 433e7d2 commit 5350288
Showing 37 changed files with 712 additions and 410 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/all_module_test.yml
Original file line number Diff line number Diff line change
@@ -46,7 +46,8 @@ jobs:
pip install --no-cache-dir /tmp/main_module.tar.gz
aws s3 cp ${{ secrets.SUB_MODULE_BUILD_PACKAGE }} /tmp/sub_module.tar.gz
pip install --no-cache-dir /tmp/sub_module.tar.gz
pip install --no-cache-dir matplotlib>=3.7.1
- name: Run All Modules Check

run: /usr/bin/env python all_modules_test.py
run: /usr/bin/env python all_modules_test.py
1 change: 1 addition & 0 deletions .github/workflows/build-and-release-docker-lite.yml
Original file line number Diff line number Diff line change
@@ -148,6 +148,7 @@ jobs:
/usr/bin/env python -m pip install --upgrade urllib3==1.26.6
/usr/bin/env python -m pip install --upgrade types-urllib3==1.26.13
/usr/bin/env python -m pip install google-api-python-client==2.77.0
/usr/bin/env python -m pip install --upgrade numpy==1.23.4
make awesome-submodule
cd awesome
git checkout ${{ inputs.awesome_branch }}
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ def aws_check_ssl_certificate_expiry(handle, threshold_days: int, region: str,)
right_now = datetime.datetime.now(dateutil.tz.tzlocal())
diff = expiry_date-right_now
days_remaining = diff.days
days = 0
if 0 < days_remaining < threshold_days:
days = days_remaining
elif days_remaining < 0:
7 changes: 6 additions & 1 deletion AWS/legos/aws_ebs_modify_volume/aws_ebs_modify_volume.py
Original file line number Diff line number Diff line change
@@ -74,10 +74,15 @@ def aws_ebs_modify_volume(
# Get the current volume size.
Volume = ec2Resource.Volume(volume_id)
currentSize = Volume.size
newSize = None

if resize_option == SizingOption.Add:
newSize = currentSize + resize_value
elif resize_option == SizingOption.Mutiple:
elif resize_option == SizingOption.Multiple:
newSize = currentSize * resize_value
else:
raise ValueError(f"Invalid resize option: {resize_option}")


print(f'CurrentSize {currentSize}, NewSize {newSize}')

Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ def aws_ecs_detect_failed_deployment(handle, cluster_name: str, service_name: st
return ["Empty deployment"]

deploymentInProgress = False
primaryDeploymentID = ""
for deployment in deployments:
if deployment['status'] == "PRIMARY":
primaryDeploymentID = deployment['id']
Original file line number Diff line number Diff line change
@@ -27,38 +27,33 @@ def elasticsearch_check_health_status(handle, unassigned_shards:int = 20) -> Tup
:rtype: Result Tuple of result
"""
cluster_health = {}
output = handle.web_request("/_cluster/health?pretty", "GET", None)


# Early return if cluster status is green
if output['status'] == 'green':
return (True, None)

cluster_health['cluster_name'] = output['cluster_name']
cluster_health['status'] = output['status']

# Additional checks for both red and yellow statuses
additional_details = False

cluster_health = {
"cluster_name": output['cluster_name'],
"status": output['status'],
"unassigned_shards": output['unassigned_shards']
}

# Check for significant health issues
if output['unassigned_shards'] > unassigned_shards:
cluster_health['unassigned_shards'] = output['unassigned_shards']
additional_details = True

if output['delayed_unassigned_shards'] > 0:
cluster_health['delayed_unassigned_shards'] = output['delayed_unassigned_shards']
additional_details = True

if output['initializing_shards'] > 0 or output['relocating_shards'] > 0:
cluster_health['initializing_shards'] = output['initializing_shards']
cluster_health['relocating_shards'] = output['relocating_shards']
additional_details = True

if output['number_of_nodes'] != output['number_of_data_nodes']:
cluster_health['number_of_nodes'] = output['number_of_nodes']
cluster_health['number_of_data_nodes'] = output['number_of_data_nodes']
additional_details = True

# If status is red, return the result immediately
if output['status'] == 'red' or (output['status'] == 'yellow' and additional_details):
return (False, [cluster_health]) # Return immediately if unassigned shards exceed the threshold

# Additional checks for severe conditions
if output['status'] == 'red' or output['delayed_unassigned_shards'] > 0 or output['initializing_shards'] > 0 or output['relocating_shards'] > 0 or output['number_of_nodes'] != output['number_of_data_nodes']:
additional_details = {
"delayed_unassigned_shards": output['delayed_unassigned_shards'],
"initializing_shards": output['initializing_shards'],
"relocating_shards": output['relocating_shards'],
"number_of_nodes": output['number_of_nodes'],
"number_of_data_nodes": output['number_of_data_nodes']
}
cluster_health.update(additional_details)
return (False, [cluster_health])

# If status is yellow but no additional conditions are met, return as healthy
# If status is yellow but no additional critical issues, consider it healthy
return (True, None)
Original file line number Diff line number Diff line change
@@ -39,14 +39,14 @@ def elasticsearch_compare_cluster_disk_size_to_threshold(handle, threshold: floa
# Split the lines and skip the header
lines = allocation_output.splitlines()[1:]

result = []
# Find the max disk percentage from the lines, considering only assigned nodes
max_disk_percent = max(float(line.split()[5]) for line in lines if "UNASSIGNED" not in line)

if max_disk_percent > threshold:
result.append({"usage_disk_percentage": max_disk_percent, "threshold": threshold})

if len(result) != 0:
return (False, result)
# Calculate the max disk percentage from the lines, considering only assigned nodes
max_disk_percent = 0 # Initialize to 0 or an appropriately low number
for line in lines:
if "UNASSIGNED" not in line:
disk_usage = float(line.split()[5])
max_disk_percent = max(max_disk_percent, disk_usage)
if max_disk_percent > threshold:
result = [{"usage_disk_percentage": max_disk_percent, "threshold": threshold}]
return (False, result)
return (True, None)

Original file line number Diff line number Diff line change
@@ -51,59 +51,25 @@ def elasticsearch_get_index_health(handle, index_name="") -> Tuple:
:return: A list of dictionaries where each dictionary contains stats about each index
"""
try:
indices_output = []
# If no specific index is provided, get all indices
if len(index_name)==0 :
indices_output = handle.web_request("/_cat/indices?h=index", "GET", None)
indices_output = ''.join(indices_output).split('\n')
# If a specific index is provided, only consider that index
else:
indices_output.append(index_name)

all_indices_stats = []

for current_index in indices_output:
# Skip system indices
if current_index.startswith('.'):
continue
index_stats = {}

# Get settings for the current index
settings_output = handle.web_request(f"/{current_index}/_settings", "GET", None)
if "error" in settings_output:
print(f"Error for settings of {current_index}")
continue

# Get stats for the current index
stats_output = handle.web_request(f"/{current_index}/_stats", "GET", None)
if "error" in stats_output:
print(f"Error for stats of {current_index}")
continue

# Get any tasks associated with the current index
tasks_output = handle.web_request(f"/_tasks?actions=*{current_index}*&detailed", "GET", None)

# Get health of the current index
health_output = handle.web_request(f"/_cat/indices/{current_index}?format=json", "GET", None)
if "error" in health_output:
print(f"Error for health of {current_index}")
continue

if settings_output:
settings = settings_output.get(current_index, {}).get('settings', {}).get('index', {})
else:
settings = {}
# Consolidate stats for the current index
if health_output[0]['health'] in ['yellow', 'red']:
index_stats = {
**health_output[0],
'settings': settings,
'stats': stats_output['_all']['total'],
'tasks': tasks_output['nodes']
}
all_indices_stats.append(index_stats)
health_url = f"/_cat/indices/{index_name}?v&h=index,health&format=json" if index_name else "/_cat/indices?v&h=index,health&format=json"
health_response = handle.web_request(health_url, "GET", None)
if not health_response:
print(f"No indices found or error retrieving indices: {health_response.get('error', 'No response') if health_response else 'No data'}")
return (True, None)

# Filter indices that are not 'green'
problematic_indices = [
{"index": idx['index'], "health": idx['health']}
for idx in health_response if idx['health'] != 'green'
]

if not problematic_indices:
print("All indices are in good health.")
return (True, None)

except Exception as e:
raise e
if len(all_indices_stats)!=0:
return (False, all_indices_stats)
return (True, None)
print(f"Error processing index health: {str(e)}")
return (False, [])

return (False, problematic_indices)

1 change: 1 addition & 0 deletions Github/legos/github_create_team/github_create_team.py
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ def github_create_team(
team_details = {}
repo_names =[]
list_of_repos = ''
privacy_settings = ''
if privacy is None or len(privacy)==0:
privacy_settings = "secret"
organization = handle.get_organization(organization_name)
1 change: 1 addition & 0 deletions Github/legos/github_delete_branch/github_delete_branch.py
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ def github_delete_branch(handle, owner:str, repository: str, branch_name: str)->
:rtype: Deleted branch info
"""
flag_to_check_branch = 0
try:
user = handle.get_user(login=owner)
repo_name = user.login+"/"+repository
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ def github_remove_member_from_org(handle, organization_name:str, username:str)->
:rtype: List of return status of removing a member from Org
"""
result = ""
organization = handle.get_organization(organization_name)
try:
user = handle.get_user(username)
Original file line number Diff line number Diff line change
@@ -7,11 +7,17 @@
from typing import Tuple, Optional
from pydantic import BaseModel, Field
from croniter import croniter
from datetime import datetime, timezone, timedelta
import json


class InputSchema(BaseModel):
namespace: Optional[str] = Field(..., description='k8s Namespace', title='Namespace')
time_interval_to_check: int = Field(
24,
description='Time interval in hours. This time window is used to check if pod in a cronjob was in Pending state. Default is 24 hours.',
title="Time Interval"
)


def k8s_check_cronjob_pod_status_printer(output):
@@ -20,10 +26,12 @@ def k8s_check_cronjob_pod_status_printer(output):
print("CronJobs are running as expected.")
else:
for issue in issues:
print(f"CronJob '{issue['cronjob_name']}' Alert: {issue['message']}")
print(f"CronJob '{issue['cronjob_name']}' in namespace '{issue['namespace']}' has issues.")

def format_datetime(dt):
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")

def k8s_check_cronjob_pod_status(handle, namespace: str='') -> Tuple:
def k8s_check_cronjob_pod_status(handle, namespace: str='', time_interval_to_check=24) -> Tuple:
"""
Checks the status of the CronJob pods.
@@ -39,7 +47,10 @@ def k8s_check_cronjob_pod_status(handle, namespace: str='') -> Tuple:
batch_v1 = client.BatchV1Api(api_client=handle)
core_v1 = client.CoreV1Api(api_client=handle)

issues = {"NotAssociated": [], "Pending": [], "UnexpectedState": []}
issues = []
current_time = datetime.now(timezone.utc)
interval_time_to_check = current_time - timedelta(hours=time_interval_to_check)
interval_time_to_check = interval_time_to_check.replace(tzinfo=timezone.utc)

# Get namespaces to check
if namespace:
@@ -68,34 +79,35 @@ def k8s_check_cronjob_pod_status(handle, namespace: str='') -> Tuple:
continue
cronjob = json.loads(response.stdout)

schedule = cronjob['spec']['schedule']

# Calculate the next expected run
now = datetime.now(timezone.utc)
iter = croniter(schedule, now)
next_run = iter.get_next(datetime)
time_to_next_run = next_run - now

# Fetch the most recent Job associated with the CronJob
jobs = batch_v1.list_namespaced_job(ns) # Fetch all jobs, and then filter by prefix.

associated_jobs = [job for job in jobs.items if job.metadata.name.startswith(cronjob['metadata']['name'])]
if not associated_jobs:
# If no associated jobs, that means the job is not scheduled.
return (True, None)
continue

latest_job = sorted(associated_jobs, key=lambda x: x.status.start_time, reverse=True)[0]

# Check job's pods for any issues
pods = core_v1.list_namespaced_pod(ns, label_selector=f"job-name={latest_job.metadata.name}")

for pod in pods.items:
if pod.status.phase == 'Pending' and now - pod.status.start_time > time_to_next_run:
issues["Pending"].append({"pod_name": pod.metadata.name, "namespace": ns})
elif pod.status.phase not in ['Running', 'Succeeded']:
issues["UnexpectedState"].append({"pod_name": pod.metadata.name, "namespace": ns, "state": pod.status.phase})

if all(not val for val in issues.values()):
return (True, None)
else:
return (False, issues)
if pod.status.phase == 'Pending':
start_time = pod.status.start_time
if start_time and start_time >= interval_time_to_check:
issues.append({
"cronjob_name": cronjob_name,
"namespace": ns,
"pod_name": pod.metadata.name,
"start_time": format_datetime(start_time)
})
break
elif pod.status.phase not in ['Running', 'Succeeded','Completed']:
issues.append({
"cronjob_name": cronjob_name,
"namespace": ns,
"pod_name": pod.metadata.name,
"state": pod.status.phase
})
break

return (not issues, issues if issues else None)
Original file line number Diff line number Diff line change
@@ -48,18 +48,25 @@ def k8s_check_worker_cpu_utilization(handle, threshold: float=70.0) -> Tuple:
if response is None or response.stderr:
raise Exception(f"Error while executing command ({kubectl_command}): {response.stderr if response else 'empty response'}")

lines = response.stdout.split('\n')
# Ensure response.stdout is processed only once and correctly
lines = response.stdout.strip().split('\n')
seen_nodes = set() # Keep track of nodes that have already been processed

for line in lines:
parts = line.split()
if len(parts) < 5: # Check for correct line format
continue
node_name, cpu_percentage_str = parts[0], parts[2]
cpu_percentage = float(cpu_percentage_str.rstrip('%'))
node_name, cpu_percentage_str = parts[0], parts[2].rstrip('%')
if node_name in seen_nodes:
print(f"Duplicate entry detected for node {node_name}, skipping.")
continue
seen_nodes.add(node_name)

cpu_percentage = float(cpu_percentage_str)
if cpu_percentage > threshold:
exceeding_nodes.append({"node": node_name, "cpu": cpu_percentage})

if len(exceeding_nodes) != 0:
if exceeding_nodes:
return (False, exceeding_nodes)
return (True, None)

27 changes: 21 additions & 6 deletions Kubernetes/legos/k8s_get_cluster_health/k8s_get_cluster_health.py
Original file line number Diff line number Diff line change
@@ -82,13 +82,28 @@ def check_pod_health(handle, core_services, namespace):
for service in core_services:
label_selector = get_label_selector_for_service(handle, ns, service)
if label_selector:
command = f"kubectl get pods -n {ns} -l {label_selector} -o=jsonpath='{{.items[?(@.status.phase!=\"Running\")].metadata.name}}'"
pods_not_running = execute_kubectl_command(handle, command)
if pods_not_running:
for pod_name in pods_not_running.split():
health_issues.append({"type": "Pod", "name": pod_name, "namespace": ns, "issue": "Pod is not running."})
# Get all pods for the service
command_pods = f"kubectl get pods -n {ns} -l {label_selector} -o=json"
pods_info = execute_kubectl_command(handle, command_pods)
if pods_info:
pods_data = json.loads(pods_info)
total_pods = len(pods_data['items'])
running_pods = sum(1 for item in pods_data['items'] if item['status']['phase'] == "Running")

# Check if at least 70% of pods are running
if total_pods > 0:
running_percentage = (running_pods / total_pods) * 100
if running_percentage < 70:
health_issues.append({
"type": "Pod",
"name": service,
"namespace": ns,
"issue": f"Insufficient running pods. Only {running_pods} out of {total_pods} are running."
})
else:
print(f"No pods found for service {service} in namespace {ns}.")
else:
print(f"Service {service} not found or has no selectors in namespace {ns}. Skipping...")
print(f"No label selector found for service {service} in namespace {ns}. Skipping...")
else:
# Check all pods in the namespace if no specific services are given
command = f"kubectl get pods -n {ns} -o=jsonpath='{{.items[?(@.status.phase!=\"Running\")].metadata.name}}'"
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ def k8s_get_error_pods_from_all_jobs(handle, namespace:str="") -> Tuple:

if response.stderr:
raise Exception(f"Error occurred while executing command {kubectl_cmd}: {response.stderr}")

jobs = {}
try:
if response.stdout:
jobs = json.loads(response.stdout)
@@ -57,6 +57,7 @@ def k8s_get_error_pods_from_all_jobs(handle, namespace:str="") -> Tuple:
if pod_response.stderr:
print(f"Error occurred while fetching pods for job {job_name}: {pod_response.stderr}")
continue
pods = {}
try:
if response.stdout:
pods = json.loads(pod_response.stdout)
45 changes: 36 additions & 9 deletions Kubernetes/legos/k8s_get_pending_pods/k8s_get_pending_pods.py
Original file line number Diff line number Diff line change
@@ -6,10 +6,16 @@
from pydantic import BaseModel, Field
import json
from tabulate import tabulate
from datetime import datetime, timedelta, timezone


class InputSchema(BaseModel):
namespace: Optional[str] = Field('', description='k8s Namespace', title='Namespace')
time_interval_to_check: int = Field(
24,
description='Time interval in hours. This time window is used to check if POD was in Pending state. Default is 24 hours.',
title="Time Interval"
)



@@ -24,18 +30,25 @@ def k8s_get_pending_pods_printer(output):
print(tabulate(data, headers=headers, tablefmt="grid"))


def k8s_get_pending_pods(handle, namespace:str="") -> Tuple:
def format_datetime(dt):
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")

def k8s_get_pending_pods(handle, namespace: str = "", time_interval_to_check=24) -> Tuple:
"""
k8s_get_pending_pods checks if any pod in the Kubernetes cluster is in 'Pending' status.
k8s_get_pending_pods checks if any pod in the Kubernetes cluster is in 'Pending' status within the specified time interval.
:type handle: object
:param handle: Object returned from the Task validate method
:type namespace: string
:param namespace: Namespace in which to look for the resources. If not provided, all namespaces are considered
:type time_interval_to_check: int
:param time_interval_to_check: (Optional) Integer, in hours, the interval within which the
state of the POD should be checked.
:rtype: tuple
:return: Status,list of pending pods with their namespace
:return: Status, list of pending pods with their namespace and the time they became pending
"""
if handle.client_side_validation is not True:
print(f"K8S Connector is invalid: {handle}")
@@ -48,21 +61,35 @@ def k8s_get_pending_pods(handle, namespace:str="") -> Tuple:
result = handle.run_native_cmd(cmd)

if result.stderr:
raise Exception(f"Error occurred while executing command {cmd} {result.stderr}")
raise Exception(f"Error occurred while executing command {cmd}: {result.stderr}")

pods = json.loads(result.stdout)['items']
pending_pods = []

current_time = datetime.now(timezone.utc)
interval_time_to_check = current_time - timedelta(hours=time_interval_to_check)
interval_time_to_check = interval_time_to_check.replace(tzinfo=timezone.utc)

for pod in pods:
name = pod['metadata']['name']
status = pod['status']['phase']
pod_namespace = pod['metadata']['namespace']

if status == 'Pending':
pending_pods.append([name, pod_namespace])

if len(pending_pods) != 0:
# Check if the pod has been in Pending state within the specified the last 24 hours
start_time = pod['status'].get('startTime')
if start_time:
start_time = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
if start_time >= interval_time_to_check:
formatted_start_time = format_datetime(start_time)
formatted_interval_time_to_check = format_datetime(interval_time_to_check)
pending_pods.append({
"pod": name,
"namespace": pod_namespace,
"start_time": formatted_start_time,
"interval_time_to_check": formatted_interval_time_to_check
})

if pending_pods:
return (False, pending_pods)
return (True, None)


Original file line number Diff line number Diff line change
@@ -8,13 +8,20 @@
from kubernetes import client
from kubernetes.client.rest import ApiException
from tabulate import tabulate
import datetime
from datetime import timezone


class InputSchema(BaseModel):
namespace: Optional[str] = Field(
default='',
title='Namespace',
description='k8s Namespace')
time_interval_to_check: int = Field(
24,
description='Time interval in hours. This time window is used to check if POD was in Crashloopback. Default is 24 hours.',
title="Time Interval"
)


def k8s_get_pods_in_crashloopbackoff_state_printer(output):
@@ -27,16 +34,24 @@ def k8s_get_pods_in_crashloopbackoff_state_printer(output):
table_data = [(entry["pod"], entry["namespace"], entry["container"]) for entry in data]
print(tabulate(table_data, headers=headers, tablefmt="grid"))

def k8s_get_pods_in_crashloopbackoff_state(handle, namespace: str = '') -> Tuple:
def format_datetime(dt):
# Format datetime to a string 'YYYY-MM-DD HH:MM:SS UTC'
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')

def k8s_get_pods_in_crashloopbackoff_state(handle, namespace: str = '', time_interval_to_check=24) -> Tuple:
"""
k8s_get_pods_in_crashloopbackoff_state returns the pods that have CrashLoopBackOff state in their container statuses.
k8s_get_pods_in_crashloopbackoff_state returns the pods that have CrashLoopBackOff state in their container statuses within the specified time interval.
:type handle: Object
:param handle: Object returned from the task.validate(...) function
:type namespace: str
:param namespace: (Optional) String, K8S Namespace as python string
:type time_interval_to_check: int
:param time_interval_to_check: (Optional) Integer, in hours, the interval within which the
state of the POD should be checked.
:rtype: Status, List of objects of pods, namespaces, and containers that are in CrashLoopBackOff state
"""
result = []
@@ -47,12 +62,25 @@ def k8s_get_pods_in_crashloopbackoff_state(handle, namespace: str = '') -> Tuple

try:
if namespace:
pods = v1.list_namespaced_pod(namespace).items
response = v1.list_namespaced_pod(namespace)
else:
pods = v1.list_pod_for_all_namespaces().items
response = v1.list_pod_for_all_namespaces()

if response is None or not hasattr(response, 'items'):
raise ApiException("Unexpected response from the Kubernetes API. 'items' not found in the response.")

pods = response.items

except ApiException as e:
raise e

if pods is None:
raise ApiException("No pods returned from the Kubernetes API.")

current_time = datetime.datetime.now(timezone.utc)
interval_time_to_check = current_time - datetime.timedelta(hours=time_interval_to_check)
interval_time_to_check = interval_time_to_check.replace(tzinfo=timezone.utc)

for pod in pods:
pod_name = pod.metadata.name
namespace = pod.metadata.namespace
@@ -62,6 +90,20 @@ def k8s_get_pods_in_crashloopbackoff_state(handle, namespace: str = '') -> Tuple
for container_status in container_statuses:
container_name = container_status.name
if container_status.state and container_status.state.waiting and container_status.state.waiting.reason == "CrashLoopBackOff":
result.append({"pod": pod_name, "namespace": namespace, "container": container_name})
# Check if the last transition time to CrashLoopBackOff is within the specified interval
if container_status.last_state and container_status.last_state.terminated:
last_transition_time = container_status.last_state.terminated.finished_at
if last_transition_time:
last_transition_time = last_transition_time.replace(tzinfo=timezone.utc)
if last_transition_time >= interval_time_to_check:
formatted_transition_time = format_datetime(last_transition_time)
formatted_interval_time_to_check = format_datetime(interval_time_to_check)
result.append({
"pod": pod_name,
"namespace": namespace,
"container": container_name,
"last_transition_time": formatted_transition_time,
"interval_time_to_check": formatted_interval_time_to_check
})

return (False, result) if result else (True, None)
Original file line number Diff line number Diff line change
@@ -4,11 +4,12 @@
##
from typing import Tuple, Optional
from pydantic import BaseModel, Field
DEFAULT_SIZE= 2048000 # 2GB in KB


class InputSchema(BaseModel):
index_threshold: Optional[float] = Field(
512000, # 500MB in KB
DEFAULT_SIZE,
description='The threshold for total index size. Default is 512000KB.',
title='Index threshold(in KB)',
)
@@ -26,7 +27,7 @@ def mongodb_check_large_index_size_printer(output):
print(f"Alert! Index size of {alert['indexSizeKB']} KB for database '{alert['db']}' in collection '{alert['collection']}' exceeds threshold !")


def mongodb_check_large_index_size(handle, threshold: float = 512000) -> Tuple:
def mongodb_check_large_index_size(handle, threshold: float = DEFAULT_SIZE) -> Tuple:
"""
mongodb_check_large_index_size checks the index sizes for all databases and collections.
It compares the size of each index with a given threshold and returns any indexes that exceed the threshold.
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@

class InputSchema(BaseModel):
threshold: Optional[float] = Field(
52428800, # 50GB in KB
83886080 , # 80GB in KB
description='Threshold for disk size in KB.',
title='Threshold (in KB)'
)
@@ -24,7 +24,7 @@ def mongodb_compare_disk_size_to_threshold_printer(output):
print(f"Alert! Disk size of {alert['totalDiskSize']} KB for database {alert['db']} exceeds threshold of {alert['threshold']} KB.")


def mongodb_compare_disk_size_to_threshold(handle, threshold: float=52428800) -> Tuple:
def mongodb_compare_disk_size_to_threshold(handle, threshold: float=83886080) -> Tuple:
"""
mongodb_compare_disk_size_to_threshold compares the total disk size used by MongoDB to a given threshold.
Original file line number Diff line number Diff line change
@@ -35,4 +35,4 @@ def mongodb_get_server_status(handle) -> Tuple:
return (True, None)
except Exception as e:
return (False, str(e))
return (False, "Unable to check Mongo server status")
return (False, {"message":"Unable to check Mongo server status"})
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
"action_type": "LEGO_TYPE_POSTGRESQL",
"action_entry_function": "postgresql_get_cache_hit_ratio",
"action_needs_credential": true,
"action_is_check": true,
"action_is_check": false,
"action_supports_poll": true,
"action_output_type": "ACTION_OUTPUT_TYPE_LIST",
"action_supports_iteration": true,
Original file line number Diff line number Diff line change
@@ -2,94 +2,38 @@
# Copyright (c) 2023 unSkript, Inc
# All rights reserved.
##
from typing import Tuple, Optional
from pydantic import BaseModel, Field
from typing import Tuple
from pydantic import BaseModel


class InputSchema(BaseModel):
connection_threshold: Optional[int] = Field(
10000,
title='Connection threshold',
description='Threshold for the number of connections considered abnormal. Default- 10000 clients')
cache_hit_ratio_threshold: Optional[int] = Field(
90,
title='Cache hit ratio threshold (in %)',
description='Threshold for the cache hit ratio considered abnormal. Default- 90%')
blocked_query_threshold: Optional[int] = Field(
10000,
title='Blocked query threshold',
description='TThreshold for the number of blocked queries considered abnormal. Default- 10000')
pass


def postgresql_get_server_status_printer(output):
is_healthy, server_status_info = output

print("PostgreSQL Server Status:")
print(f" Overall Health: {'Healthy' if is_healthy else 'Unhealthy'}")
print(f" Total Connections: {server_status_info['total_connections']}")
print(f" Cache Hit Ratio: {server_status_info['cache_hit_ratio']}%")
print(f" Blocked Queries: {server_status_info['blocked_queries']}")

abnormal_metrics = server_status_info.get('abnormal_metrics')
if abnormal_metrics:
print("\nAbnormal Metrics:")
for metric in abnormal_metrics:
print(f" - {metric}")

def postgresql_get_server_status(handle, connection_threshold: int = 10000, cache_hit_ratio_threshold: int = 90, blocked_query_threshold: int = 10000) -> Tuple:
if output[0]:
print("PostgreSQL Server Status: Reachable")
else:
error_message = output[1]['message'] if output[1] else "Unknown error"
print("PostgreSQL Server Status: Unreachable")
print(f"Error: {error_message}")

def postgresql_get_server_status(handle) -> Tuple:
"""
Returns the status of the PostgreSQL instance.
Returns a simple status indicating the reachability of the PostgreSQL server.
:type handle: object
:param handle: PostgreSQL connection object
:type handle: object
:param connection_threshold: Threshold for the number of connections considered abnormal
:type handle: object
:param cache_hit_ratio_threshold: Threshold for the cache hit ratio considered abnormal
:type handle: object
:param blocked_query_threshold: Threshold for the number of blocked queries considered abnormal
:return: Tuple containing a status and a dictionary with detailed information
:return: Tuple containing a boolean indicating success and optional error message
"""
server_status_info = {}
abnormal_metrics = []

try:
cur = handle.cursor()

# Check total number of connections
cur.execute("SELECT COUNT(*) FROM pg_stat_activity;")
total_connections = cur.fetchone()[0]
server_status_info['total_connections'] = total_connections
if total_connections > connection_threshold:
abnormal_metrics.append(f"High number of connections: {total_connections}")

# Check cache hit ratio
cur.execute("SELECT sum(heap_blks_hit) / sum(heap_blks_hit + heap_blks_read) * 100 AS ratio FROM pg_statio_user_tables;")
cache_hit_ratio = cur.fetchone()[0]
server_status_info['cache_hit_ratio'] = cache_hit_ratio
if cache_hit_ratio < cache_hit_ratio_threshold:
abnormal_metrics.append(f"Cache hit ratio below {cache_hit_ratio_threshold}%: {cache_hit_ratio}")

# Check blocked queries
cur.execute("SELECT COUNT(*) FROM pg_locks WHERE granted = false;")
blocked_queries = cur.fetchone()[0]
server_status_info['blocked_queries'] = blocked_queries
if blocked_queries > blocked_query_threshold:
abnormal_metrics.append(f"Blocked queries above threshold: {blocked_queries}")

# Append abnormal metrics if any are found
if abnormal_metrics:
server_status_info['abnormal_metrics'] = abnormal_metrics
return (False, server_status_info)

return (True, server_status_info)

cur.execute("SELECT 1;")
cur.fetchone()
return (True, None)
except Exception as e:
raise e
return (False, {"message": str(e)})
finally:
handle.close()

2 changes: 1 addition & 1 deletion Redis/legos/redis_list_large_keys/redis_list_large_keys.py
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ def redis_list_large_keys(handle, size_in_bytes: int = 5368709120) -> Tuple :
for key in keys:
value = handle.memory_usage(key)
if value > int(size_in_bytes):
large_key = {key.decode('utf-8'): value}
large_key = {"large_key": key.decode('utf-8'), "value": value}
result.append(large_key)
except Exception as e:
raise e
Original file line number Diff line number Diff line change
@@ -31,8 +31,6 @@ class InputSchema(BaseModel):
def slack_create_channel_invite_users_printer(output):
if output is not None:
pprint.pprint(output)
else:
return


def slack_create_channel_invite_users(
Original file line number Diff line number Diff line change
@@ -25,8 +25,6 @@ class InputSchema(BaseModel):
def slack_lookup_user_by_email_printer(output):
if output is not None:
pprint.pprint(output)
else:
return


def slack_lookup_user_by_email(
2 changes: 0 additions & 2 deletions Slack/legos/slack_post_image/slack_post_image.py
Original file line number Diff line number Diff line change
@@ -28,8 +28,6 @@ class InputSchema(BaseModel):
def slack_post_image_printer(output):
if output is not None:
pprint.pprint(output)
else:
return


@beartype
2 changes: 0 additions & 2 deletions Slack/legos/slack_post_message/slack_post_message.py
Original file line number Diff line number Diff line change
@@ -23,8 +23,6 @@ class InputSchema(BaseModel):
def slack_post_message_printer(output):
if output is not None:
pprint.pprint(output)
else:
return


@beartype
2 changes: 0 additions & 2 deletions Slack/legos/slack_send_DM/slack_send_DM.py
Original file line number Diff line number Diff line change
@@ -26,8 +26,6 @@ class InputSchema(BaseModel):
def slack_send_DM_printer(output):
if output is not None:
pprint.pprint(output)
else:
return


def slack_send_DM(
Original file line number Diff line number Diff line change
@@ -36,11 +36,11 @@ def vault_get_service_health(handle) -> Tuple:
else:
error_msg = []
if not health_data["initialized"]:
error_msg.append("Vault is not initialized.")
error_msg.append({"message":"Vault is not initialized."})
if health_data["standby"]:
error_msg.append("Vault is in standby mode.")
error_msg.append({"message":"Vault is in standby mode."})
if health_data["sealed"]:
error_msg.append("Vault is sealed.")
error_msg.append({"message": "Vault is sealed."})
return (False, error_msg)

except Exception as e:
7 changes: 5 additions & 2 deletions unskript-ctl/diagnostics.py
Original file line number Diff line number Diff line change
@@ -106,7 +106,6 @@ def execute_diagnostics(self, diag_commands):
if function:
# Call the function with the commands
diag_outputs[entry_function] = function(commands)
print(f"Function '{function_name}' is accessible.")
else:
raise ValueError(f"Function '{function_name}' not found in the global namespace.")
except Exception as e:
@@ -127,7 +126,7 @@ def main(self):
if not diag_commands:
print("Skipping Diagnostics: No diagnostic command found. You can define them in the YAML configuration file")
return

print("\nRunning Diagnostics...")
diag_outputs = self.execute_diagnostics(diag_commands)

if diag_outputs:
@@ -144,6 +143,10 @@ def main(args):
parser.add_argument("--output-dir-path", '-o', help="Path to output directory", required=True)
ap = parser.parse_args(args)

print("\nFetching logs...")
fetch_pod_logs_not_running(ap.output_dir_path)
fetch_pod_logs_high_restarts(ap.output_dir_path)

diagnostics_script = DiagnosticsScript(ap)
diagnostics_script.main()

200 changes: 98 additions & 102 deletions unskript-ctl/diagnostics_worker.py
Original file line number Diff line number Diff line change
@@ -5,12 +5,12 @@
import os
import subprocess
import json
from unskript_ctl_factory import UctlLogger
from unskript_ctl_factory import UctlLogger, ConfigParserFactory
from concurrent.futures import ThreadPoolExecutor


logger = UctlLogger('UnskriptDiagnostics')


def mongodb_diagnostics(commands:list):
"""
mongodb_diagnostics runs mongocli command with command as the parameter
@@ -42,92 +42,88 @@ def mongodb_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nMongodb Diagnostics")
logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nMongodb Diagnostics")
# logger.debug(f"Mongosh Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def fetch_logs(namespace, pod, container):
"""
Fetches logs and previous logs for a specified container in a pod.
"""
outputs = []
cmd_logs = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container]
result_logs = subprocess.run(cmd_logs, capture_output=True, text=True)
if result_logs.stderr:
outputs.append(f"Error: {result_logs.stderr.strip()}")
else:
outputs.append(result_logs.stdout.strip())
def get_matrix_namespaces():
config_parser = ConfigParserFactory()
global_params = config_parser.get_checks_params()

if 'global' in global_params and 'matrix' in global_params['global']:
namespaces = global_params['global']['matrix'].get('namespace', [])
return namespaces
return []

def fetch_logs(namespace, pod, output_path):
logs_file_path = os.path.join(output_path, f'logs.txt')
separator = "\n" + "=" * 40 + "\n"
header = f"Logs for Namespace: {namespace}, Pod: {pod}\n"
header_previous = f"Previous Logs for Namespace: {namespace}, Pod: {pod}\n"

with open(logs_file_path, 'a') as log_file:
log_file.write(separator + header)
# Fetch current logs
proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod],
stdout=log_file, stderr=subprocess.PIPE, text=True)
stderr = proc.communicate()[1]
if proc.returncode != 0:
logger.debug(f"Error fetching logs for {pod}: {stderr}")

log_file.write(separator + header_previous)
# Fetch previous logs
proc = subprocess.Popen(["kubectl", "logs", "--namespace", namespace, "--tail=100", "--all-containers", pod, "--previous"],
stdout=log_file, stderr=subprocess.PIPE, text=True)
stderr = proc.communicate()[1]
if proc.returncode != 0:
logger.debug(f"Error fetching previous logs for {pod}: {stderr}")

def fetch_pod_logs_for_namespace(namespace, output_path, condition='not_running'):
# logger.debug(f"Starting log fetch for namespace: {namespace} with condition: {condition}")
proc = subprocess.Popen(["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
logger.debug(f"Error fetching pods in namespace {namespace}: {stderr}")
return

try:
pods = json.loads(stdout)['items']
for pod in pods:
if condition == 'not_running' and (pod['status']['phase'] != "Running" or pod['status']['phase'] != "Succeeded"):
# logger.debug(f"Fetching logs for not running/succeeded pod: {pod['metadata']['name']} in {namespace}")
fetch_logs(namespace, pod['metadata']['name'], output_path)
elif condition == 'high_restarts':
for cs in pod['status'].get('containerStatuses', []):
if cs['restartCount'] > 25:
# logger.debug(f"Fetching logs for pod with high restarts: {pod['metadata']['name']} in {namespace}")
fetch_logs(namespace, pod['metadata']['name'], output_path)
except json.JSONDecodeError:
logger.debug(f"Failed to decode JSON response from kubectl get pods in namespace {namespace}: {stdout}")

def fetch_pod_logs_not_running(output_path):
allowed_namespaces = get_matrix_namespaces()
with ThreadPoolExecutor(max_workers=5) as executor:
# logger.debug("Initiating ThreadPool to fetch logs for pods not running across namespaces")
for namespace in allowed_namespaces:
executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'not_running')

def fetch_pod_logs_high_restarts(output_path):
allowed_namespaces = get_matrix_namespaces()
with ThreadPoolExecutor(max_workers=5) as executor:
# logger.debug("Initiating ThreadPool to fetch logs for pods with high restarts across namespaces")
for namespace in allowed_namespaces:
executor.submit(fetch_pod_logs_for_namespace, namespace, output_path, 'high_restarts')

cmd_logs_previous = ["kubectl", "logs", "--namespace", namespace, pod, "-c", container, "--previous"]
result_logs_previous = subprocess.run(cmd_logs_previous, capture_output=True, text=True)
if result_logs_previous.stderr:
outputs.append(f"Error: {result_logs_previous.stderr.strip()}")
else:
outputs.append(result_logs_previous.stdout.strip())

return outputs

def fetch_pod_logs_not_running():
logger.debug("\nK8s Diagnostics: Fetching logs for pods not running")
command_outputs = []
cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
pods = json.loads(result.stdout)['items']

for pod in pods:
namespace = pod['metadata']['namespace']
name = pod['metadata']['name']
status = pod['status']['phase']
if status != "Running":
logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} (Not Running)")
containers = [c['name'] for c in pod['spec'].get('initContainers', []) + pod['spec'].get('containers', [])]
for container in containers:
logs_output = fetch_logs(namespace, name, container)
for output in logs_output:
logger.debug({f"Pod Not Running: {name}, Container: {container}": output})
command_outputs.append({f"Pod Not Running: {name}, Container: {container}": output})
return command_outputs

def fetch_pod_logs_high_restarts():
logger.debug("\nK8s Diagnostics: Fetching logs for pods with high restarts")
command_outputs = []
cmd = ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
pods = json.loads(result.stdout)['items']

for pod in pods:
namespace = pod['metadata']['namespace']
name = pod['metadata']['name']
pod_status = pod['status'].get('containerStatuses', [])
restarts = sum(cs['restartCount'] for cs in pod_status)
if restarts > 25:
logger.debug(f"Fetching logs for Pod: {name} in Namespace: {namespace} with high restarts")
result_logs = subprocess.run(["kubectl", "logs", "--namespace", namespace, name], capture_output=True, text=True)
if result_logs.stderr:
logger.debug({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"})
command_outputs.append({f"Pod high restarts: {name}": f"Error: {result_logs.stderr.strip()}"})
else:
logger.debug({f"Pod high restarts: {name}": result_logs.stdout.strip()})
command_outputs.append({f"Pod high restarts: {name}": result_logs.stdout.strip()})
return command_outputs

def k8s_diagnostics(commands:list):
"""
k8s_diagnostics runs kubectl command
"""
command_outputs = []
if not hasattr(k8s_diagnostics, "already_called"):
command_outputs.extend(fetch_pod_logs_high_restarts())
command_outputs.extend(fetch_pod_logs_not_running())

k8s_diagnostics.already_called = True
logger.debug("Logs have been fetched.")
else:
command_outputs = []
logger.debug("Subsequent execution: Skipping logs")

for command in commands:
cmd_list = command.split()
@@ -141,10 +137,10 @@ def k8s_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\n Kubernetes Diagnostics")
logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\n Kubernetes Diagnostics")
# logger.debug(f"K8S Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def redis_diagnostics(commands:list):
@@ -181,10 +177,10 @@ def redis_diagnostics(commands:list):
command_outputs.append({command: output})
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})
for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nRedis Diagnostics")
logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nRedis Diagnostics")
# logger.debug(f"Redis Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def postgresql_diagnostics(commands:list):
@@ -217,10 +213,10 @@ def postgresql_diagnostics(commands:list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nPostgresql Diagnostics")
logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nPostgresql Diagnostics")
# logger.debug(f"Postgres Command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def elasticsearch_diagnostics(commands: list) -> list:
@@ -247,10 +243,10 @@ def elasticsearch_diagnostics(commands: list) -> list:
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nElasticsearch Diagnostics")
logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nElasticsearch Diagnostics")
# logger.debug(f"Elasticsearch curl command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def keycloak_diagnostics(commands: list):
@@ -276,10 +272,10 @@ def keycloak_diagnostics(commands: list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nKeycloak Diagnostics")
logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nKeycloak Diagnostics")
# logger.debug(f"Keycloak curl command: {command}\nOutput: {cmd_output}\n")
return command_outputs

def vault_diagnostics(commands: list):
@@ -313,8 +309,8 @@ def vault_diagnostics(commands: list):
except Exception as e:
command_outputs.append({command: f"Exception: {str(e)}"})

for result_dict in command_outputs:
for command, cmd_output in result_dict.items():
logger.debug("\nVault Diagnostics")
logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n")
# for result_dict in command_outputs:
# for command, cmd_output in result_dict.items():
# logger.debug("\nVault Diagnostics")
# logger.debug(f"Vault Command: {command}\nOutput: {cmd_output}\n")
return command_outputs
31 changes: 30 additions & 1 deletion unskript-ctl/templates/last_cell_content.j2
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from unskript.legos.utils import CheckOutput, CheckOutputStatus

global w
global _logger

all_outputs = []
other_outputs = []
id_to_name = {}
if _logger:
_logger.debug(f"ERRORED CHECKS ARE: {w.errored_checks}")
_logger.debug(f"TIMED OUT CHECKS ARE: {w.timeout_checks}")

if hasattr(w, 'check_uuid_entry_function_map'):
for key,value in w.check_uuid_entry_function_map.items():
if value not in id_to_name:
id_to_name[value] = key

try:
if 'w' in globals():
if w.check_run:
for id,output in w.check_output.items():
output = json.loads(output)
output['id'] = id
#output['name'] = id_to_name.get(id) if id else str()
all_outputs.append(output)
# Lets check if we have errored_checks or timeout_checks
# exists, if yes then lets dump the output
@@ -26,6 +34,7 @@ try:
"objects": None,
"error": err_msg,
"id": str(_id)
#"name": str(name)
})
if hasattr(w, 'errored_checks') and len(w.errored_checks):
for name, err_msg in w.errored_checks.items():
@@ -35,6 +44,7 @@ try:
"objects": None,
"error": err_msg,
"id": str(_id)
#"name": str(name)
})

if other_outputs:
@@ -48,14 +58,33 @@ try:
if _logger:
_logger.debug(f"FOUND DUPLICATE FOR {_other.get('id')}")

if _logger:
_logger.debug(f"OTHER OUTPUTS: {other_outputs}")
existing_ids = set(output.get('id') for output in all_outputs)
unique_other_outputs = [other_output for other_output in other_outputs if other_output.get('id') not in existing_ids]
if unique_other_outputs:
all_outputs.extend(unique_other_outputs)
# Lets insert the unique other outputs at the same respective place
#all_outputs.extend(unique_other_outputs)
if _logger:
_logger.debug(f"LENGTH OF ALL OUTPUT BEFORE INSERT IS: {len(all_outputs)}")
for uo in unique_other_outputs:
insert_index = w.check_uuids.index(uo.get('id'))
if _logger:
_logger.debug(f"INSERTING RESULT FOR {uo.get('id')} at {insert_index} position")
if insert_index:
all_outputs.insert(insert_index, uo)


if not all_outputs:
all_outputs = other_outputs

_outputs_with_valid_names = []
for _output in all_outputs:
if id_to_name.get(_output.get('id')):
_outputs_with_valid_names.append(_output)
if _logger:
_logger.debug(f"All output has result for ID: {_output.get('id')} Name: {id_to_name.get(_output.get('id'))} Status: {_output.get('status')}")
all_outputs = _outputs_with_valid_names
for _output in all_outputs:
print(json.dumps(_output))
else:
16 changes: 12 additions & 4 deletions unskript-ctl/templates/template_script.j2
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ def do_run_(logger = None, script_to_check_mapping = {}):
global _logger
global _script_to_check_mapping
global w
all_outputs = []

output = None
if logger:
@@ -98,19 +99,26 @@ def do_run_(logger = None, script_to_check_mapping = {}):
if _logger:
_logger.debug("Starting to execute {{ num_checks }} number of checks")

for i in tqdm(range({{ num_checks + 1 }}), desc="Running", leave=True, ncols=100):
{# check_i should always start with 1 #}
for i in tqdm(range(1, {{ num_checks + 1 }}), desc="Running", leave=True, ncols=100):
fn = "check_" + str(i)
if hasattr(globals().get(fn), "__call__"):
result = _run_function(fn)
{# if _logger: #}
{# _logger.debug(f"FUNCTION: {fn} RESULT FOR FUNCTION RUN : {result}") #}
if _logger:
if result:
if isinstance(result, tuple):
if result[-1]:
_logger.debug(f"Check {fn} was successful")
else:
_logger.debug(f"Check {fn} failed")

output, _ = _run_function('last_cell')

{# Get last_output and last_status #}
output, _ = _run_function('last_cell')
all_outputs.append(output)
{# if _logger:
_logger.debug(f"ALL OUTPUTS {all_outputs} for {fn}") #}

# Lets dump the output in the log file so we can refer to the status of it
# later on
@@ -120,7 +128,7 @@ def do_run_(logger = None, script_to_check_mapping = {}):
else:
_logger.debug("No output for the checks run")

return output
return all_outputs

if __name__ == "__main__":
logger = None
10 changes: 9 additions & 1 deletion unskript-ctl/unskript_ctl_main.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
from unskript_ctl_version import *
from unskript_ctl_upload_session_logs import upload_session_logs
from diagnostics import main as diagnostics
from unskript_upload_results_to_s3 import S3Uploader

YAML_CONFIG_FILE = "/etc/unskript/unskript_ctl_config.yaml"

@@ -44,6 +45,7 @@ def __init__(self, **kwargs):
self._script = Script()
self._checks_priority = self._config.get_checks_priority()


self._db = DBInterface()
# Create execution directory so all results
# gets logged there
@@ -155,7 +157,7 @@ def run_main(self, **kwargs):
else:
output_dir = os.path.join(UNSKRIPT_EXECUTION_DIR, self.uglobals.get('exec_id'))

failed_objects_file = os.path.join(UNSKRIPT_EXECUTION_DIR, self.uglobals.get('exec_id')) + '_output.txt'
failed_objects_file = os.path.join(output_dir, self.uglobals.get('exec_id')) + '_output.txt'
diag_args = [
'--yaml-file',
YAML_CONFIG_FILE,
@@ -166,6 +168,10 @@ def run_main(self, **kwargs):
]
diagnostics(diag_args)

print("Uploading run artifacts to S3...")
uploader = S3Uploader()
uploader.rename_and_upload_other_items()

def run_info(self):
"""This function runs the info gathering actions"""
# Lets find out if any specific info action mentioned, if mentioned
@@ -766,5 +772,7 @@ def rearrange_argv(argv):
if args.command == 'run' and args.report:
uc.notify(args)

os._exit(0)

if __name__ == '__main__':
main()
71 changes: 48 additions & 23 deletions unskript-ctl/unskript_ctl_notification.py
Original file line number Diff line number Diff line change
@@ -199,10 +199,10 @@ def create_temp_files_of_failed_check_results(self,
return list_of_failed_files

def create_script_summary_message(self, output_metadata_file: str):
# message = ''
message = ''
if os.path.exists(output_metadata_file) is False:
self.logger.error(f"ERROR: The metadata file is missing, please check if file exists? {output_metadata_file}")
return ''
return message

metadata = ''
with open(output_metadata_file, 'r', encoding='utf-8') as f:
@@ -216,24 +216,24 @@ def create_script_summary_message(self, output_metadata_file: str):
self.logger.debug(f"\tStatus: {metadata.get('status')} \n\tTime (in seconds): {metadata.get('time_taken')} \n\tError: {metadata.get('error')} \n")

# Remove from email
# message += f'''
# <br>
# <h3> Custom Script Run Result </h3>
# <table border="1">
# <tr>
# <th> Status </th>
# <th> Time (in seconds) </th>
# <th> Error </th>
# </tr>
# <tr>
# <td>{metadata.get('status')}</td>
# <td>{metadata.get('time_taken')}</td>
# <td>{metadata.get('error')}</td>
# </tr>
# </table>
# '''

return ''
message += f'''
<br>
<h3> Custom Script Run Result </h3>
<table border="1">
<tr>
<th> Status </th>
<th> Time (in seconds) </th>
<th> Error </th>
</tr>
<tr>
<td>{metadata.get('status')}</td>
<td>{metadata.get('time_taken')}</td>
<td>{metadata.get('error')}</td>
</tr>
</table>
'''

return message

def create_info_legos_output_file(self):
"""create_info_legos_output_file: This function creates a file that will
@@ -470,6 +470,10 @@ def prepare_combined_email(self,
if info_result:
message += info_result
self.create_info_legos_output_file()
# print("Output Metadata File\n",output_metadata_file)
if output_metadata_file:
message += self.create_script_summary_message(output_metadata_file=output_metadata_file)
temp_attachment = self.create_email_attachment(output_metadata_file=output_metadata_file)

if len(os.listdir(self.execution_dir)) == 0 or not self.create_tarball_archive(tar_file_name=tar_file_name, output_metadata_file=None, parent_folder=parent_folder):
self.logger.error("Execution directory is empty , tarball creation unsuccessful!")
@@ -610,13 +614,22 @@ def send_sendgrid_notification(self,
if len(os.listdir(self.execution_dir)) == 0 or not self.create_tarball_archive(tar_file_name=tar_file_name, output_metadata_file=None, parent_folder=parent_folder):
self.logger.error("Execution directory is empty , tarball creation unsuccessful!")

if output_metadata_file:
html_message += self.create_script_summary_message(output_metadata_file=output_metadata_file)

info_result = self.create_info_gathering_action_result()
if info_result:
html_message += info_result

to_email_list = []
if isinstance(to_email, list):
to_email_list = to_email
else:
to_email_list = [to_email]

email_message = Mail(
from_email=from_email,
to_emails=to_email,
to_emails=to_email_list,
subject=email_subject,
html_content=html_message
)
@@ -751,10 +764,16 @@ def do_send_awsses_email(self, from_email: str,
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key

client = boto3.client('ses', region_name=region)
to_email_list = []
if isinstance(to_email, list):
to_email_list = to_email
else:
to_email_list = [to_email]

try:
response = client.send_raw_email(
Source=from_email,
Destinations=[to_email],
Destinations=to_email_list,
RawMessage={'Data': attachment_.as_string()}
)
if response.get('ResponseMetadata') and response.get('ResponseMetadata').get('HTTPStatusCode') == 200:
@@ -863,7 +882,13 @@ def send_smtp_notification(self,
else:
msg['From'] = smtp_user

msg['To'] = to_email
to_email_list = []
if isinstance(to_email, list):
to_email_list = to_email
else:
to_email_list = [to_email]

msg['To'] = ", ".join(to_email_list)
msg['Subject'] = subject
try:
server = smtplib.SMTP(smtp_host, self.SMTP_TLS_PORT)
137 changes: 96 additions & 41 deletions unskript-ctl/unskript_ctl_run.py
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
from unskript_utils import *
from unskript_ctl_factory import ChecksFactory, ScriptsFactory
from unskript.legos.utils import CheckOutputStatus
from unskript_upload_results_to_s3 import S3Uploader


# Implements Checks Class that is wrapper for All Checks Function
@@ -59,6 +60,7 @@ def __init__(self, **kwargs):
self.prioritized_checks_to_id_mapping = {}
self.map_entry_function_to_check_name = {}
self.map_check_name_to_connector = {}
self.check_name_to_id_mapping = {}

for k,v in self.checks_globals.items():
os.environ[k] = json.dumps(v)
@@ -85,12 +87,27 @@ def run(self, **kwargs):
from jit_script import do_run_
temp_output = do_run_(self.logger, self.script_to_check_mapping)
output_list = []
for o in temp_output.split('\n'):
if not o:
continue
d = json.loads(json.dumps(o))
if isinstance(d, dict) is False:
d = json.loads(d)
# Combine all parts of all_outputs in template_script.j2 do_run function into a single string
combined_output = ''.join(temp_output)

# Correct the formatting to ensure it's proper JSON
formatted_output = combined_output.replace('}\n{', '},\n{')
if not formatted_output.endswith('\n'):
formatted_output += '\n'

# Strip trailing comma and newline, then wrap in array brackets
formatted_output = formatted_output.rstrip(',\n')
json_output = f"[{formatted_output}]"

try:
# Parse the JSON array into a list of dictionaries
data = json.loads(json_output)
except json.JSONDecodeError as e:
# Handle the case where the JSON could not be decoded
self.logger.error(f"Failed to decode JSON: {e}")
raise ValueError("Invalid JSON format of output") from e
for d in data:
# Assign appropriate check names
d['name'] = self.check_names[self.check_uuids.index(d.get('id'))]
d['check_entry_function'] = self.check_entry_functions[self.check_uuids.index(d.get('id'))]
output_list.append(d)
@@ -100,7 +117,8 @@ def run(self, **kwargs):
self._error(str(e))
finally:
self._common.update_exec_id()
output_file = os.path.join(UNSKRIPT_EXECUTION_DIR, self.uglobals.get('exec_id')) + '_output.txt'
output_file = os.path.join(self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY'),
self.uglobals.get('exec_id')) + '_output.txt'
if not outputs:
self.logger.error("Output is None from check's output")
self._error('OUTPUT IS EMPTY FROM CHECKS RUN!')
@@ -166,18 +184,49 @@ def display_check_result(self, checks_output):
failed_result_available = False
failed_result = {}
checks_output = self.output_after_merging_checks(checks_output, self.check_uuids)
self.uglobals.create_property('CHECKS_OUTPUT')
self.uglobals['CHECKS_OUTPUT'] = checks_output
self.logger.debug("Creating checks output JSON to upload to S3")
# print("Uploading failed objects to S3...")
# uploader = S3Uploader()
# uploader.rename_and_upload_failed_objects(checks_output)
now = datetime.now()
rfc3339_timestamp = now.isoformat() + 'Z'
parent_folder = '/tmp'
if self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY'):
parent_folder = self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY')
dashboard_checks_output_file = f"dashboard_{rfc3339_timestamp}.json"
dashboard_checks_output_file_path = os.path.join(parent_folder, dashboard_checks_output_file)
try:
# Convert checks_output to JSON format
checks_output_json = json.dumps(checks_output, indent=2)
except json.JSONDecodeError:
self.logger.debug(f"Failed to decode JSON response for {self.customer_name}")
return

# Write checks output JSON to a separate file
try:
if checks_output_json:
self.logger.debug(f"Writing JSON data to dashboard json file")
with open(dashboard_checks_output_file_path, 'w') as json_file:
json_file.write(checks_output_json)
except IOError as e:
self.logger.debug(f"Failed to write JSON data to {dashboard_checks_output_file_path}: {e}")
return

for result in checks_output:
if result.get('skip') and result.get('skip') is True:
idx += 1
continue
payload = result
try:
_action_uuid = payload.get('id')
if self.checks_priority is None:
priority = CHECK_PRIORITY_P2
else:
priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2)
# priority = self.checks_priority.get(self.check_entry_functions[idx], CHECK_PRIORITY_P2)
priority = self.checks_priority.get(self.check_name_to_id_mapping.get(_action_uuid), CHECK_PRIORITY_P2)

_action_uuid = payload.get('id')
if _action_uuid:
#c_name = self.connector_types[idx] + ':' + self.prioritized_checks_to_id_mapping[_action_uuid]
p_check_name = self.prioritized_checks_to_id_mapping[_action_uuid]
@@ -266,45 +315,45 @@ def display_check_result(self, checks_output):
print('\x1B[1;4m', '\x1B[0m')
return



def output_after_merging_checks(self, outputs: list, ids: list) -> list:
"""output_after_merging_checks: this function combines the output from duplicated
checks and stores the combined output.
TBD: What if one duplicated check returns an ERROR
Status:
1 : PASS
2 : FAIL
3 : ERROR
"""
new_outputs = []
# Remove empty strings
filtered_output = []
result_dict = {}

for output in outputs:
if not output:
continue
filtered_output.append(output)

outputs = filtered_output
if self.uglobals.get('uuid_mapping') is None:
return outputs
check_id = output.get('id')
current_output = result_dict.get(check_id)

index = 0
while index < len(outputs):
if self.uglobals['uuid_mapping'].get(ids[index]) is None:
new_outputs.append(outputs[index])
index = index+1
if current_output is None:
# If no entry exists, directly use this output
result_dict[check_id] = output
else:
parent_index = index - 1
while index < len(outputs):
if self.uglobals['uuid_mapping'].get(ids[index]):
outputs[index]['skip'] = True
new_outputs.append(outputs[index])
index = index + 1
else:
break
combined_output = self.calculate_combined_check_status(outputs[parent_index:index])
# Combined output should be the output of the parent check, so
# overwrite it.
#print(f'parent_index {parent_index}, index {index}, combined_output {combined_output}')
new_outputs[parent_index] = combined_output
return new_outputs
# If an entry exists, merge this output with the existing one
if current_output['status'] < output['status']:
# If the new status is more severe, overwrite the old status
current_output['status'] = output['status']
current_output['objects'] = output.get('objects', [])

if output['status'] == 2 and output.get('objects'):
# Append objects if status is FAILED and objects are non-empty
if 'objects' not in current_output or not isinstance(current_output['objects'], list):
current_output['objects'] = []
current_output['objects'].extend(output.get('objects', []))

# Update error message if there's a new one and it's non-empty
if 'error' in output and output['error']:
current_output['error'] = output['error']

return list(result_dict.values())

def calculate_combined_check_status(self, outputs:list):
combined_output = {}
@@ -406,7 +455,12 @@ def get_first_cell_content(self, list_of_checks: list):
first_cell_content += f'{k}{index} = \"{value}\"' + '\n'
first_cell_content += f'''w = Workflow(env, secret_store_cfg, None, global_vars=globals(), check_uuids={self.check_uuids})''' + '\n'
# temp_map = {key: value for key, value in zip(self.check_entry_functions, self.check_uuids)}
temp_map = dict(zip(self.check_entry_functions, self.check_uuids))
# temp_map = dict(zip(self.check_entry_functions, self.check_uuids))
temp_map = {}
for index,value in enumerate(self.check_uuids):
temp_map[self.check_entry_functions[index]] = value
self.check_name_to_id_mapping[value] = self.check_entry_functions[index]

first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}''' + '\n'
first_cell_content += '''w.errored_checks = {}''' + '\n'
first_cell_content += '''w.timeout_checks = {}''' + '\n'
@@ -619,8 +673,9 @@ def create_checks_for_matrix_argument(self, actions: list, matrix: dict):
# as the one its copied from.
new_uuid = str(uuid.uuid4())
self.uglobals["uuid_mapping"][new_uuid] = action["uuid"]
newcheck['uuid'] = new_uuid
newcheck['id'] = str(uuid.uuid4())[:8]
# newcheck['uuid'] = new_uuid
newcheck['uuid'] = action["uuid"]
newcheck['id'] = str(action["uuid"])
#print(f'Adding duplicate check {new_uuid}, parent_uuid {check.get("uuid")}')
newcheck['matrixinputline'] = input_json_line.rstrip(',')
action_list.append(newcheck)
@@ -919,4 +974,4 @@ def create_checks_for_matrix_argument(self, list_of_actions: list):
if self.info_globals and len(self.info_globals):
action_list = self._common.create_checks_for_matrix_argument(actions=list_of_actions,
matrix=self.matrix)
return action_list
return action_list
166 changes: 166 additions & 0 deletions unskript-ctl/unskript_upload_results_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
import os
from datetime import datetime
import json
from unskript_ctl_factory import UctlLogger
from unskript_utils import *

logger = UctlLogger('UnskriptDiagnostics')

class S3Uploader:
def __init__(self):
logger.debug("Initializing S3Uploader")
aws_access_key_id = os.getenv('LIGHTBEAM_AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('LIGHTBEAM_AWS_SECRET_ACCESS_KEY')
self.bucket_name = 'lightbeam-reports'
now = datetime.now()
rfc3339_timestamp = now.isoformat() + 'Z'
self.ts = rfc3339_timestamp
year = now.strftime("%Y")
month = now.strftime("%m")
day = now.strftime("%d")
self.customer_name = os.getenv('CUSTOMER_NAME','UNKNOWN_CUSTOMER_NAME')
self.file_name = f"dashboard_{rfc3339_timestamp}.json"
self.folder_path = f"{self.customer_name}/{year}/{month}/{day}/"
self.file_path = f"{self.folder_path}{self.file_name}"
self.local_file_name = f"/tmp/{self.file_name}"

if not aws_access_key_id or not aws_secret_access_key:
logger.debug("AWS credentials are not set in environment variables")
return

self.uglobals = UnskriptGlobals()

try:
self.s3_client = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
self.s3_client.list_buckets()
logger.debug("AWS credentials are valid")
except (NoCredentialsError, PartialCredentialsError) as e:
logger.debug("Invalid AWS credentials")
except ClientError as e:
logger.debug(f"Client error: {e}")

def create_s3_folder_path(self):
# Initialize folder_exists
folder_exists = False

# Ensure the bucket exists
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
logger.debug(f"S3 bucket {self.bucket_name} exists")
except ClientError as e:
if e.response['Error']['Code'] == '404':
logger.debug(f"S3 bucket {self.bucket_name} does not exist, creating bucket")
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
logger.debug(f"S3 bucket {self.bucket_name} created")
except ClientError as e:
logger.debug(f"Failed to create bucket: {e}")
return False # Exit if the bucket cannot be created
else:
logger.debug(f"Error checking bucket existence: {e}")
return False # Exit if there is any other error

# Ensure the folder structure exists in the bucket
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=self.folder_path)
folder_exists = True
logger.debug(f"S3 folder {self.folder_path} exists")
except ClientError as e:
if e.response['Error']['Code'] == '404':
folder_exists = False
logger.debug(f"S3 folder {self.folder_path} does not exist")
else:
logger.debug(f"Error checking folder existence: {e}")

# Create folder if it doesn't exist
if not folder_exists:
logger.debug(f"Creating folder {self.folder_path} in the bucket")
try:
self.s3_client.put_object(Bucket=self.bucket_name, Key=self.folder_path)
except ClientError as e:
logger.debug(f"Failed to create folder: {e}")

return True

# def rename_and_upload_failed_objects(self, checks_output):
# try:
# # Convert checks_output to JSON format
# checks_output_json = json.dumps(checks_output, indent=2)
# except json.JSONDecodeError:
# logger.debug(f"Failed to decode JSON response for {self.customer_name}")
# return

# # Write JSON data to a local file
# try:
# logger.debug(f"Writing JSON data to local file: {self.local_file_name}")
# with open(self.local_file_name, 'w') as json_file:
# json_file.write(checks_output_json)
# except IOError as e:
# logger.debug(f"Failed to write JSON data to local file: {e}")
# return

# if not self.create_s3_folder_path():
# logger.debug("Unable to create bucket")
# return

# # Upload the JSON file
# try:
# logger.debug(f"Uploading file {self.file_name} to {self.bucket_name}/{self.file_path}")
# self.s3_client.upload_file(self.local_file_name, self.bucket_name, self.file_path)
# logger.debug(f"File {self.file_name} uploaded successfully to {self.bucket_name}/{self.folder_path}")
# except NoCredentialsError:
# logger.debug("Credentials not available")
# except Exception as e:
# logger.debug(f"Unable to upload failed objetcs file to S3 bucket: {e}")
# # Remove the local file after upload
# logger.debug(f"Removing local file of check outputs json from /tmp: {self.local_file_name}")
# os.remove(self.local_file_name)

def rename_and_upload_other_items(self):
if not self.create_s3_folder_path():
logger.debug("Unable to create bucket")
return
# Upload the files in the CURRENT_EXECUTION_RUN_DIRECTORY
file_list_to_upload = [self.local_file_name]
if self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY') and \
os.path.exists(self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY')):
try:
for parent_dir, _, _files in os.walk(self.uglobals.get('CURRENT_EXECUTION_RUN_DIRECTORY')):
# Currently there is no need to read the sub_directories (child_dir) under CURRENT_EXECUTION_RUN_DIRECTORY
# So we can ignore it. Lets create list of files that needs to be uploaded
# to S3.
for _file in _files:
file_list_to_upload.append(os.path.join(parent_dir, _file))
except:
logger.debug("Failed to get contents of Execution Run directory")

for _file in file_list_to_upload:
base_name, extension = os.path.splitext(os.path.basename(_file))
if base_name.startswith("dashboard"):
file_path = os.path.join(self.folder_path, os.path.basename(_file))
else:
temp_fp = f"{base_name}_{self.ts}{extension}"
file_path = os.path.join(self.folder_path, temp_fp)

if not self.do_upload_(_file, file_path):
logger.debug(f"ERROR: Uploading error for {_file}")

def do_upload_(self, file_name, file_path):
"""Uploads the given file_name to s3 bucket defined in file_path
"""
try:
logger.debug(f"Uploading file {file_name} to {self.bucket_name}/{file_path}")
self.s3_client.upload_file(file_name, self.bucket_name, file_path)
logger.debug(f"File {file_name} uploaded successfully to {self.bucket_name}/{file_path}")
return True
except NoCredentialsError:
logger.debug("Credentials not available")
except Exception as e:
logger.debug(f"Unable to upload failed objects file to S3 bucket: {e}")

return False

0 comments on commit 5350288

Please sign in to comment.