Skip to content

Commit

Permalink
Added the tag_cluster_resources policy
Browse files Browse the repository at this point in the history
  • Loading branch information
athiruma committed Nov 24, 2023
1 parent fdc5fee commit 09a1259
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ResourceTagAPIOperations:
"""

PAGINATION_TOKEN = 'PaginationToken'
ARRAY_SIZE = 20

def __init__(self, region_name: str):
self.__client = boto3.client('resourcegroupstaggingapi', region_name=region_name)
Expand Down Expand Up @@ -41,8 +42,31 @@ def tag_resources(self, resource_arn_list: list, update_tags_dict: dict):
:param update_tags_dict: {key: value}
:return:
"""
try:
self.__client.tag_resources(ResourceARNList=resource_arn_list, Tags=update_tags_dict)
except Exception as err:
logger.error(err)
raise err
chunked_array_list = [resource_arn_list[i:i + self.ARRAY_SIZE] for i in range(0, len(resource_arn_list),
self.ARRAY_SIZE)]
for array_list in chunked_array_list:
try:
self.__client.tag_resources(ResourceARNList=array_list, Tags=update_tags_dict)
logger.info(f"Updated the tags of the resources: {array_list} by tags: {update_tags_dict}")
except Exception as err:
logger.error(err)

def tag_resources_by_tag_key_value(self, tags: dict, tag_key: str, tag_value: str = '', dry_run: str = 'yes'):
"""
This method tags the resources based on tag_key and tag_value ( optional ) better to provide for easy filter
:param tags:
:type tags:
:param tag_key:
:type tag_key:
:param tag_value:
:type tag_value:
:return:
:rtype:
"""
resources_arn = []
resources = self.get_resources(tag_name=tag_key, tag_value=tag_value)
for resource in resources:
resources_arn.append(resource.get('ResourceARN'))
if dry_run == 'no':
self.tag_resources(resource_arn_list=resources_arn, update_tags_dict=tags)
return resources_arn
82 changes: 82 additions & 0 deletions cloud_governance/common/clouds/aws/utils/common_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from typing import Callable

from cloud_governance.exceptions.clouds.aws_exceptions import AWSPaginationVariableNotFound


def iterate_next_token(function: Callable, output_identifier: str, marker: str = 'NextToken', **kwargs):
"""
This method iterate over the function using the marker and return the result
:param function:
:param output_identifier:
:param marker:
:return:
"""
if marker in ('NextToken', 'Marker'):
resources_list = []
resources = function(**kwargs)
resources_list.extend(resources[output_identifier])
if 'Marker' in resources.keys():
marker = 'Marker'
if 'NextToken' in resources.keys():
marker = 'NextToken'
while marker in resources.keys():
if marker == 'NextToken':
resources = function(NextToken=resources[marker], **kwargs)
elif marker == 'Marker':
resources = function(Marker=resources[marker], **kwargs)
resources_list.extend(resources[output_identifier])
return resources_list
else:
raise AWSPaginationVariableNotFound("Accepted values are: NextToken, Marker")


def get_tag_name_and_value(tags: list, key: str, check_prefix: bool = False):
"""
This method returns the tag_key and tag_value, the operations would be case-insensitive
:param tags:
:type tags:
:param key:
:type key:
:param check_prefix:
:type check_prefix:
:return:
:rtype:
"""
for tag in tags:
if check_prefix:
if key.lower() in tag.get('Key').lower():
return tag.get('Key'), tag.get('Value')
else:
if key.lower() == tag.get('Key').lower():
return tag.get('Key'), tag.get('Value')
return '', ''


def convert_key_values_to_dict(tags: list):
"""
This method convert aws key values to dict
:param tags:
:type tags:
:return:
:rtype:
"""
tags_dict = {}
if tags:
for tag in tags:
tags_dict.update({tag.get('Key'): tag.get('Value')})
return tags_dict


def convert_dict_to_key_values(tags: dict):
"""
This method convert dict to aws key values
:param tags:
:type tags:
:return:
:rtype:
"""
key_values = []
if tags:
for key, value in tags.items():
key_values.append({'Key': key, 'Value': value})
return key_values
Empty file.
Empty file.
19 changes: 19 additions & 0 deletions cloud_governance/exceptions/clouds/aws_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@


class AWSExceptions(Exception):
"""
Base class for all aws custom exceptions
"""

def __init__(self, message: any = None):
self.message = f'Something went wrong' if not message else message
self.args = (message,)
super().__init__(self.message)


class AWSPaginationVariableNotFound(AWSExceptions):

def __init__(self, message: any = None):
super().__init__(message)


6 changes: 6 additions & 0 deletions cloud_governance/main/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def __init__(self):
'unused_nat_gateway',
'zombie_snapshots', 'skipped_resources',
'monthly_report']
self._environment_variables_dict['AWS_POLICY_RUNNERS'] = ['tag_cluster_resources']
self._environment_variables_dict['TAG_OPTIONAL_TAGS'] = EnvironmentVariables.get_boolean_from_environment('TAG_OPTIONAL_TAGS', False)
self._environment_variables_dict['OPTIONAL_TAGS'] = literal_eval(EnvironmentVariables.get_env('OPTIONAL_TAGS',
"['Owner', 'Manager', 'Project', 'Environment']"))
es_index = 'cloud-governance-policy-es-index'
self._environment_variables_dict['cost_policies'] = ['cost_explorer', 'cost_over_usage', 'cost_billing_reports',
'cost_explorer_payer_billings', 'spot_savings_analysis']
Expand Down Expand Up @@ -246,6 +250,8 @@ def get_aws_account_alias_name(self):
account_alias = iam_client.list_account_aliases()['AccountAliases']
if account_alias:
return account_alias[0].upper()
else:
return os.environ.get('account', '').upper()
except:
return os.environ.get('account', '').upper()

Expand Down
18 changes: 17 additions & 1 deletion cloud_governance/main/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from cloud_governance.common.clouds.aws.s3.s3_operations import S3Operations
from cloud_governance.policy.policy_operations.aws.zombie_cluster.validate_zombies import ValidateZombies
from cloud_governance.policy.policy_operations.aws.zombie_non_cluster.zombie_non_cluster_polices import ZombieNonClusterPolicies

from cloud_governance.policy.policy_runners.aws_policy_runner import AWSPolicyRunner

environment_variables_dict = environment_variables.environment_variables_dict
log_level = environment_variables_dict.get('log_level', 'INFO').upper()
Expand Down Expand Up @@ -201,6 +201,11 @@ def main():
if is_non_cluster_polices_runner:
non_cluster_polices_runner = ZombieNonClusterPolicies()

aws_policy_runners = None
is_aws_policy_runner = policy in environment_variables_dict.get('AWS_POLICY_RUNNERS')
if is_aws_policy_runner:
aws_policy_runners = AWSPolicyRunner()

ibm_classic_infrastructure_policy_runner = None
is_tag_ibm_classic_infrastructure_runner = policy in environment_variables_dict.get('ibm_policies')
if not is_tag_ibm_classic_infrastructure_runner:
Expand Down Expand Up @@ -230,6 +235,15 @@ def main():
if is_gcp_policy_runner:
gcp_cost_policy_runner = GcpPolicyRunner()

@logger_time_stamp
def run_aws_policy_runners():
"""
This method runs the aws policies
:return:
:rtype:
"""
aws_policy_runners.run()

@logger_time_stamp
def run_non_cluster_polices_runner():
"""
Expand Down Expand Up @@ -294,6 +308,8 @@ def run_gcp_policy_runner():
run_azure_policy_runner()
elif is_gcp_policy_runner:
run_gcp_policy_runner()
elif is_aws_policy_runner:
run_aws_policy_runners()
else:
if not policy:
logger.exception(f'Missing Policy name: "{policy}"')
Expand Down
152 changes: 152 additions & 0 deletions cloud_governance/policy/aws/tag_cluster_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from cloud_governance.common.clouds.aws.ec2.ec2_operations import EC2Operations
from cloud_governance.common.clouds.aws.resource_tagging_api.resource_tag_api_operations import ResourceTagAPIOperations
from cloud_governance.common.clouds.aws.utils.common_operations import get_tag_name_and_value, \
convert_key_values_to_dict
from cloud_governance.common.logger.init_logger import logger
from cloud_governance.common.logger.logger_time_stamp import logger_time_stamp
from cloud_governance.policy.policy_operations.aws.tagging.abstract_cluster_tagging_operations import \
AbstractClusterTaggingOperations


class TagClusterResources(AbstractClusterTaggingOperations):
"""
This class perform tagging on aws cluster resources that have an active EC2 instance
"""

def __init__(self, region_name: str = '', cluster_name: str = ''):
super().__init__(region_name=region_name, cluster_name=cluster_name)
self.__ec2_resource_type = 'ec2:instance'
self.__region_name = self._region_name

def __get_ec2_instances_list(self):
"""
This method returns the ec2 instances by region
:return:
:rtype:
"""
ec2_operations = EC2Operations(region=self.__region_name)
return ec2_operations.get_ec2_instance_list()

def __get_grouped_ec2_instances(self, key_name: str, check_prefix: False):
"""
This method returns the ec2 instances grouped by key_name
:param check_prefix:
:type check_prefix:
:param key_name:
:type key_name:
:return:
:rtype:
"""
ec2_resources = self.__get_ec2_instances_list()
ec2_resources_list = {}
ec2_resource_tags = {}
for ec2_resource in ec2_resources:
ec2_tags = ec2_resource.get('Tags', [])
if ec2_tags:
groped_tag, _ = get_tag_name_and_value(tags=ec2_tags, key=key_name, check_prefix=check_prefix)
cluster_id, _ = get_tag_name_and_value(tags=ec2_tags, key='cluster_id')
user, user_value = get_tag_name_and_value(tags=ec2_tags, key='User')
if groped_tag and (user_value == 'NA' or not cluster_id):
ec2_resources_list.setdefault(groped_tag, []).append(
{
'instance_id': ec2_resource.get('InstanceId'),
'launch_time': ec2_resource.get('LaunchTime'),
})
if groped_tag not in ec2_resource_tags:
ec2_resource_tags[groped_tag] = ec2_tags
return {
'resources': ec2_resources_list,
'tags': ec2_resource_tags
}

def __get_ec2_cluster_resources(self):
"""
This method returns the ec2 cluster resources
:return:
:rtype:
"""
return self.__get_grouped_ec2_instances(key_name=self._cluster_prefix, check_prefix=True)

def __tag_cluster_resources(self):
"""
This method tag all cluster resources by using the resource group service provided by aws
:return:
:rtype:
"""
updated_cluster_tags = {}
responses = self.__get_ec2_cluster_resources()
ec2_resources = responses.get('resources', {})
ec2_resources_tags = responses.get('tags', {})
if ec2_resources:
resource_group_tag_api_operations = ResourceTagAPIOperations(region_name=self.__region_name)
for cluster_tag, resources in ec2_resources.items():
add_new_tags = {'cluster_id': cluster_tag.split('/')[-1], 'Budget': self._account}
username = ''
if resources:
for resource in resources:
instance_id = resource.get('instance_id')
launch_time = resource.get('launch_time')
# Three places we can get the username, RunInstances, StartInstances, StopInstances
username = self.get_username(region_name=self.__region_name, start_time=launch_time,
resource_id=instance_id, resource_type='RunInstances',
tags=ec2_resources_tags[cluster_tag])
if not username:
username = self.get_username(region_name=self.__region_name, start_time=launch_time,
resource_id=instance_id, resource_type='StartInstances',
tags=ec2_resources_tags[cluster_tag])
if not username:
username = self.get_username(region_name=self.__region_name, start_time=launch_time,
resource_id=instance_id, resource_type='StopInstances',
tags=ec2_resources_tags[cluster_tag])
if username:
break
if username:
get_user_tags = self._iam_operations.get_user_tags(username=username)
if not get_user_tags:
get_user_tags = [{'Key': 'User', 'Value': username}]
tags_to_add = self._mandatory_tags
if self._tag_optional_tags:
tags_to_add.extend(self._optional_tags)
for mandatory_tag in tags_to_add:
_, tag_value = get_tag_name_and_value(tags=get_user_tags, key=mandatory_tag)
if tag_value:
add_new_tags.update({mandatory_tag: tag_value})
if mandatory_tag == 'Email' and not tag_value:
add_new_tags.update({mandatory_tag: f'{username}@{self._gmail_domain}'})
else:
add_new_tags.update({'Key': 'User', 'Value': 'NA'})
add_new_tags.update(self._fill_na_tags())
default_tags = convert_key_values_to_dict(tags=ec2_resources_tags[cluster_tag])
updated_tags = self._get_tags_to_update(default_tags=default_tags, new_tags=add_new_tags)
if updated_tags:
cluster_resources = resource_group_tag_api_operations.tag_resources_by_tag_key_value(
tags=updated_tags,
tag_key=cluster_tag,
dry_run=self._dry_run)
logger.info(f"Tags to be updated on ClusterId: {cluster_tag} are: {updated_tags} and "
f"the resources: {cluster_resources}")
updated_cluster_tags[cluster_tag] = {
'resources': cluster_resources,
'tags': updated_tags
}
return updated_cluster_tags

@logger_time_stamp
def run(self):
"""
This method starts the operations
:return:
:rtype:
"""
logger_message = "Running the Cluster tagging in the Region"
if self._run_active_regions:
resources_by_region = []
ec2_operations = EC2Operations(region='us-east-1')
regions = ec2_operations.get_active_regions()
for region in regions:
self.__region_name = region
logger.info(f"{logger_message}: {region}")
resources_by_region.append({region: self.__tag_cluster_resources()})
else:
logger.info(f"{logger_message}: {self.__region_name}")
return self.__tag_cluster_resources()
Loading

0 comments on commit 09a1259

Please sign in to comment.