Skip to content

Commit

Permalink
Merge branch 'master' into update-info-legos-1
Browse files Browse the repository at this point in the history
  • Loading branch information
jayasimha-raghavan-unskript authored Apr 12, 2024
2 parents 71f8f5f + 8a54039 commit 69e49e5
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 18 deletions.
33 changes: 28 additions & 5 deletions Kubernetes/legos/k8s_get_oomkilled_pods/k8s_get_oomkilled_pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# All rights reserved.
#
import pprint
import datetime
from datetime import timezone
from typing import Tuple, Optional
from pydantic import BaseModel, Field
from kubernetes import client
Expand All @@ -15,6 +17,11 @@ class InputSchema(BaseModel):
description='Kubernetes Namespace Where the Service exists',
title='K8S Namespace',
)
time_interval_to_check: int = Field(
24,
description='Time interval in hours. This time window is used to check if POD good OOMKilled. Default is 24 hours.',
title="Time Interval"
)



Expand All @@ -24,7 +31,7 @@ def k8s_get_oomkilled_pods_printer(output):
pprint.pprint(output)


def k8s_get_oomkilled_pods(handle, namespace: str = "") -> Tuple:
def k8s_get_oomkilled_pods(handle, namespace: str = "", time_interval_to_check=24) -> Tuple:
"""k8s_get_oomkilled_pods This function returns the pods that have OOMKilled event in the container last states
:type handle: Object
Expand All @@ -33,6 +40,10 @@ def k8s_get_oomkilled_pods(handle, namespace: str = "") -> Tuple:
:type namespace: str
:param namespace: (Optional)String, K8S Namespace as python string
:type time_interval_to_check: int
:param time_interval_to_check: (Optional) Integer, in hours, the interval within which the
state of the POD should be checked.
:rtype: Status, List of objects of pods, namespaces, and containers that are in OOMKilled state
"""
result = []
Expand Down Expand Up @@ -61,21 +72,33 @@ def k8s_get_oomkilled_pods(handle, namespace: str = "") -> Tuple:
if pods is None:
raise ApiException("No pods returned from the Kubernetes API.")

# Get Current Time in UTC
current_time = datetime.datetime.now(timezone.utc)
# Get time interval to check (or 24 hour) reference and convert to UTC
interval_time_to_check = current_time - datetime.timedelta(hours=time_interval_to_check)
interval_time_to_check = interval_time_to_check.replace(tzinfo=timezone.utc)


for pod in pods:
pod_name = pod.metadata.name
namespace = pod.metadata.namespace

# Ensure container_statuses is not None before iterating
container_statuses = pod.status.container_statuses
if container_statuses is None:
continue

# Check each pod for OOMKilled state
for container_status in container_statuses:
container_name = container_status.name
last_state = container_status.last_state
if last_state and last_state.terminated and last_state.terminated.reason == "OOMKilled":
result.append({"pod": pod_name, "namespace": namespace, "container": container_name})

termination_time = last_state.terminated.finished_at
termination_time = termination_time.replace(tzinfo=timezone.utc)
# If termination time is greater than interval_time_to_check meaning
# the POD has gotten OOMKilled in the last 24 hours, so lets flag it!
if termination_time and termination_time >= interval_time_to_check:
result.append({"pod": pod_name, "namespace": namespace, "container": container_name})

return (False, result) if result else (True, None)

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
# Copyright (c) 2023 unSkript.com
# All rights reserved.
#
import datetime
from datetime import timezone
from typing import Tuple
from pydantic import BaseModel, Field
from kubernetes import client
from kubernetes.client.rest import ApiException

# Constants used in this file
INTERVAL_TO_CHECK = 24 # In hours

class InputSchema(BaseModel):
namespace: str = Field(
Expand Down Expand Up @@ -54,10 +58,29 @@ def k8s_get_pods_with_high_restart(handle, namespace: str = '', threshold: int =
raise Exception(f"Error occurred while accessing Kubernetes API: {e}")

retval = []

# It is not enough to check if the restart count is more than the threshold
# we should check if the last time the pod got restarted is not within the 24 hours.
# If it is, then we need to flag it. If not, it could be that the pod restarted at
# some time, but have been stable since then.

# Lets take current time and reference time that is 24 hours ago.
current_time = datetime.datetime.now(timezone.utc)
interval_time_to_check = current_time - datetime.timedelta(hours=INTERVAL_TO_CHECK)
interval_time_to_check = interval_time_to_check.replace(tzinfo=timezone.utc)

for pod in pods:
for container_status in pod.status.container_statuses or []:
restart_count = container_status.restart_count
last_state = container_status.last_state

if restart_count > threshold:
retval.append({'name': pod.metadata.name, 'namespace': pod.metadata.namespace})
if last_state and last_state.terminated:
termination_time = last_state.terminated.finished_at
termination_time = termination_time.replace(tzinfo=timezone.utc)
# We compare if the termination time is within the last 24 hours, if yes
# then we need to add it to the retval and return the list back
if termination_time and termination_time >= interval_time_to_check:
retval.append({'name': pod.metadata.name, 'namespace': pod.metadata.namespace})

return (False, retval) if retval else (True, None)
6 changes: 3 additions & 3 deletions unskript-ctl/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ def main(self):
diag_commands = self.get_diagnostic_commands_for_failed_checks()

if not diag_commands:
print("ERROR: No diagnostic command found. Please define them in the YAML configuration file")
sys.exit(0)
print("Skipping Diagnostics: No diagnostic command found. You can define them in the YAML configuration file")
return

diag_outputs = self.execute_diagnostics(diag_commands)

if diag_outputs:
diag_file = os.path.join(self.args.output_dir_path, 'diagnostics.yaml')
self.write_to_yaml_file(diag_outputs, diag_file)
else:
print("ERROR: Nothing to write, diagnostic outputs are empty!")
print("WARNING: Nothing to write, diagnostic outputs are empty!")


def main(args):
Expand Down
18 changes: 17 additions & 1 deletion unskript-ctl/templates/last_cell_content.j2
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from unskript.legos.utils import CheckOutput, CheckOutputStatus
global w
global _logger

all_outputs = []
other_outputs = []
if _logger:
_logger.debug(f"ERRORED CHECKS ARE: {w.errored_checks}")
_logger.debug(f"TIMED OUT CHECKS ARE: {w.timeout_checks}")

try:
if 'w' in globals():
if w.check_run:
Expand Down Expand Up @@ -31,15 +36,26 @@ try:
"error": err_msg,
"id": str(_id)
})

if other_outputs:
for _other in other_outputs:
for _output in all_outputs:
# Lets eliminate duplicate entries in the output
# We could have double accounted failed and error timeout
# case
if _other.get('id') == _output.get('id'):
_output = _other
_output.update(_other)
if _logger:
_logger.debug(f"FOUND DUPLICATE FOR {_other.get('id')}")

existing_ids = set(output.get('id') for output in all_outputs)
unique_other_outputs = [other_output for other_output in other_outputs if other_output.get('id') not in existing_ids]
if unique_other_outputs:
all_outputs.extend(unique_other_outputs)

if not all_outputs:
all_outputs = other_outputs

for _output in all_outputs:
print(json.dumps(_output))
else:
Expand Down
24 changes: 16 additions & 8 deletions unskript-ctl/unskript_ctl_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, **kwargs):
# Prioritized checks to uuid mapping
self.prioritized_checks_to_id_mapping = {}
self.map_entry_function_to_check_name = {}
self.map_check_name_to_connector = {}

for k,v in self.checks_globals.items():
os.environ[k] = json.dumps(v)
Expand Down Expand Up @@ -178,10 +179,10 @@ def display_check_result(self, checks_output):

_action_uuid = payload.get('id')
if _action_uuid:
c_name = self.connector_types[idx] + ':' + self.prioritized_checks_to_id_mapping[_action_uuid]
#c_name = self.connector_types[idx] + ':' + self.prioritized_checks_to_id_mapping[_action_uuid]
p_check_name = self.prioritized_checks_to_id_mapping[_action_uuid]
else:
c_name = self.connector_types[idx] + ':' + self.check_names[idx]
#c_name = self.connector_types[idx] + ':' + self.check_names[idx]
p_check_name = self.check_names[idx]
if p_check_name in self.check_entry_functions:
p_check_name = self.map_entry_function_to_check_name.get(p_check_name)
Expand All @@ -195,10 +196,12 @@ def display_check_result(self, checks_output):
checks_per_priority_per_result_list[priority]['PASS'].append([
p_check_name,
ids[idx],
self.connector_types[idx]]
# self.connector_types[idx]]
self.map_check_name_to_connector[p_check_name]]
)
elif ids and CheckOutputStatus(payload.get('status')) == CheckOutputStatus.FAILED:
failed_objects = payload.get('objects')
c_name = self.map_check_name_to_connector[p_check_name] + ':' + p_check_name
failed_result[c_name] = failed_objects
result_table.append([
p_check_name,
Expand All @@ -210,14 +213,15 @@ def display_check_result(self, checks_output):
checks_per_priority_per_result_list[priority]['FAIL'].append([
p_check_name,
ids[idx],
self.connector_types[idx]
# self.connector_types[idx]
self.map_check_name_to_connector[p_check_name]
])
elif ids and CheckOutputStatus(payload.get('status')) == CheckOutputStatus.RUN_EXCEPTION:
if payload.get('error') is not None:
failed_objects = payload.get('error')
if isinstance(failed_objects, str) is True:
failed_objects = [failed_objects]
# c_name = self.connector_types[idx] + ':' + self.check_names[idx]
c_name = self.map_check_name_to_connector[p_check_name] + ':' + p_check_name
failed_result[c_name] = failed_objects
failed_result_available = True
error_msg = payload.get('error') if payload.get('error') else self.parse_failed_objects(failed_object=failed_objects)
Expand All @@ -231,7 +235,8 @@ def display_check_result(self, checks_output):
# self.check_names[idx],
p_check_name,
ids[idx],
self.connector_types[idx]
# self.connector_types[idx]
self.map_check_name_to_connector[p_check_name]
])
except Exception as e:
self.logger.error(e)
Expand Down Expand Up @@ -383,6 +388,7 @@ def get_first_cell_content(self, list_of_checks: list):
if len(list_of_checks) == 0:
return None
self.check_uuids, self.check_names, self.connector_types, self.check_entry_functions = self._common.get_code_cell_name_and_uuid(list_of_actions=list_of_checks)
self.map_check_name_to_connector = dict(zip(self.check_names, self.connector_types))
first_cell_content = self._common.get_first_cell_content()

if self.checks_globals and len(self.checks_globals):
Expand All @@ -401,7 +407,9 @@ def get_first_cell_content(self, list_of_checks: list):
first_cell_content += f'''w = Workflow(env, secret_store_cfg, None, global_vars=globals(), check_uuids={self.check_uuids})''' + '\n'
# temp_map = {key: value for key, value in zip(self.check_entry_functions, self.check_uuids)}
temp_map = dict(zip(self.check_entry_functions, self.check_uuids))
first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}'''
first_cell_content += f'''w.check_uuid_entry_function_map = {temp_map}''' + '\n'
first_cell_content += '''w.errored_checks = {}''' + '\n'
first_cell_content += '''w.timeout_checks = {}''' + '\n'

return first_cell_content

Expand Down Expand Up @@ -911,4 +919,4 @@ def create_checks_for_matrix_argument(self, list_of_actions: list):
if self.info_globals and len(self.info_globals):
action_list = self._common.create_checks_for_matrix_argument(actions=list_of_actions,
matrix=self.matrix)
return action_list
return action_list

0 comments on commit 69e49e5

Please sign in to comment.