Skip to content

Commit

Permalink
Merge branch 'master' into update-checks-1
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-chandak-unskript authored Oct 11, 2023
2 parents 048cd7a + 20afca2 commit 2de5c06
Show file tree
Hide file tree
Showing 25 changed files with 750 additions and 0 deletions.
Binary file added Kafka/legos/kafka_check_lag_change/1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions Kafka/legos/kafka_check_lag_change/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[<img align="left" src="https://unskript.com/assets/favicon.png" width="100" height="100" style="padding-right: 5px">]
(https://unskript.com/assets/favicon.png)
<h1>Kafka check lag change</h1>

## Description
This action checks if the lag for consumer groups is not changing for a threshold number of hours.


## Lego Details
kafka_check_lag_change(handle, group_id: str= "", threshold: int=1)
handle: Object of type unSkript KAFKA Connector.
group_id: Consumer group ID.
threshold: The number of hours to check if the lag hasn't changed.


## Lego Input
This Lego takes inputs handle, group_id, threshold.

## Lego Output
Here is a sample output.
<img src="./1.png">

## See it in Action

You can see this Lego in action following this link [unSkript Live](https://us.app.unskript.io)
Empty file.
15 changes: 15 additions & 0 deletions Kafka/legos/kafka_check_lag_change/kafka_check_lag_change.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"action_title": "Kafka check lag change",
"action_description": "This action checks if the lag for consumer groups is not changing for a threshold number of hours.\n",
"action_type": "LEGO_TYPE_KAFKA",
"action_entry_function": "kafka_check_lag_change",
"action_needs_credential": true,
"action_output_type": "ACTION_OUTPUT_TYPE_LIST",
"action_is_check": true,
"action_next_hop": [
""
],
"action_next_hop_parameter_mapping": {},
"action_supports_iteration": true,
"action_supports_poll": true
}
104 changes: 104 additions & 0 deletions Kafka/legos/kafka_check_lag_change/kafka_check_lag_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
##
# Copyright (c) 2023 unSkript, Inc
# All rights reserved.
##
from typing import Tuple, Optional
from kafka.admin import KafkaAdminClient
from kafka import KafkaConsumer, TopicPartition
from pydantic import BaseModel, Field
from tabulate import tabulate
import time



class InputSchema(BaseModel):
group_id: Optional[str] = Field(
'',
description='Consumer group ID to which this consumer belongs',
title='Consumer group ID',
)
threshold: Optional[int] = Field(
3,
description="The number of hours to check if the lag hasn't changed.",
title='Threshold (in hours)',
)

# This would be a global or persisted store of previous lags at the last check.
# Format: { "topic-partition": [timestamp, lag] }
prev_lags = {}


def kafka_check_lag_change_printer(output):
status, issues = output

if status:
print("All consumer groups are maintaining their lags!")
else:
print("Lag issues detected:")
headers = ['Consumer Group', 'Topic', 'Partition', 'Description']
table_data = [(issue['consumer_group'], issue['topic'], issue['partition'], issue['description']) for issue in issues]
print(tabulate(table_data, headers=headers, tablefmt='grid'))


def kafka_check_lag_change(handle, group_id: str= "", threshold: int=1) -> Tuple:
"""
kafka_check_lag_change checks if the lag for consumer groups is not changing for X hours.
:param handle: Object of type unSkript KAFKA Connector.
:param group_id: Consumer group ID.
:param threshold: The number of hours to check if the lag hasn't changed.
:return: Tuple containing a status and an optional list of issues with lag.
"""

issues = []
current_time = time.time()
admin_client = KafkaAdminClient(bootstrap_servers=handle.config['bootstrap_servers'])

if group_id:
consumer_groups = [group_id]
else:
consumer_groups = [group[0] for group in admin_client.list_consumer_groups()]

for group in consumer_groups:
consumer = KafkaConsumer(bootstrap_servers=handle.config['bootstrap_servers'], group_id=group)

for topic in consumer.topics():
partitions = consumer.partitions_for_topic(topic)
for partition in partitions:
tp = TopicPartition(topic, partition)
end_offset = consumer.end_offsets([tp])[tp]
committed = consumer.committed(tp) or 0
lag = end_offset - committed

if lag == 0:
continue

key = f"{group}-{topic}-{partition}"
if key in prev_lags:
prev_timestamp, prev_lag = prev_lags[key]

# Only update timestamp in prev_lags if there's a change in the lag
if prev_lag != lag:
prev_lags[key] = (current_time, lag)
elif (current_time - prev_timestamp) >= threshold * 3600:
print(f"Issue detected with {key}. Adding to issues list.")
issues.append({
'consumer_group': group,
'topic': topic,
'partition': partition,
'description': f"Lag hasn't changed for {threshold} hours. Current Lag: {lag}"
})
else:
prev_lags[key] = (current_time, lag)

consumer.close()

if issues:
return (False, issues)
return (True, None)



Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions Kubernetes/legos/k8s_check_service_pvc_utilization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[<img align="left" src="https://unskript.com/assets/favicon.png" width="100" height="100" style="padding-right: 5px">]
(https://unskript.com/assets/favicon.png)
<h1>Check K8s service PVC utilization </h1>

## Description
This check fetches the PVC associated with a given service, determines its utilized size, and then compares it to its total capacity. If the used percentage exceeds the provided threshold, it triggers an alert.

## Lego Details
k8s_check_service_pvc_utilization(handle, service_name: str, namespace: str, threshold: int=80)
handle: Object of type unSkript K8S Connector.
service_name: The name of the service.
threshold: Percentage threshold for utilized PVC disk size. E.g., a 80% threshold checks if the utilized space exceeds 80% of the total PVC capacity.
namespace: The namespace in which the service resides.


## Lego Input
This Lego takes inputs handle, service_name, namespace, threshold.

## Lego Output
Here is a sample output.
<img src="./1.png">

## See it in Action

You can see this Lego in action following this link [unSkript Live](https://us.app.unskript.io)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"action_title": "Check K8s service PVC utilization ",
"action_description": "This check fetches the PVC associated with a given service, determines its utilized size, and then compares it to its total capacity. If the used percentage exceeds the provided threshold, it triggers an alert.",
"action_type": "LEGO_TYPE_K8S",
"action_entry_function": "k8s_check_service_pvc_utilization",
"action_needs_credential": true,
"action_output_type": "ACTION_OUTPUT_TYPE_LIST",
"action_is_check": true,
"action_next_hop": [
""
],
"action_next_hop_parameter_mapping": {},
"action_supports_iteration": true,
"action_supports_poll": true
}
Loading

0 comments on commit 2de5c06

Please sign in to comment.