Skip to content

Commit

Permalink
Merge pull request #8 from PeyGis/main
Browse files Browse the repository at this point in the history
Add ec2 instances to Port
  • Loading branch information
matarpeles authored Jun 26, 2023
2 parents 2dabe88 + daa19a1 commit 6f13008
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 341 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,4 @@ events/
samconfig.toml
packaged.yaml

internal/
dev/
8 changes: 2 additions & 6 deletions lambda_function/aws/resources/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ def __init__(self, resource_config, port_client, lambda_context, default_region)
self.regions = self.selector_aws.get("regions", [default_region])
self.regions_config = self.selector_aws.get("regions_config", {})
self.next_token = self.selector_aws.get("next_token", "")
self.mappings = (
self.resource_config.get("port", {}).get("entity", {}).get("mappings", [])
)
self.mappings = (self.resource_config.get("port", {}).get("entity", {}).get("mappings", []))
self.aws_entities = set()
self.skip_delete = False

def handle(self):
raise NotImplementedError("Subclasses should implement 'handle' function")

def handle_single_resource_item(self, region, resource_id, action_type="upsert"):
raise NotImplementedError(
"Subclasses should implement 'handle_single_resource_item' function"
)
raise NotImplementedError("Subclasses should implement 'handle_single_resource_item' function")

def _cleanup_regions(self, region):
self.regions.remove(region)
Expand Down
100 changes: 21 additions & 79 deletions lambda_function/aws/resources/cloudcontrol_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from concurrent.futures import ThreadPoolExecutor, as_completed

import boto3
from aws.resources.base_handler import BaseHandler
import consts
from aws.resources.base_handler import BaseHandler
from port.entities import create_entities_json, handle_entities

logger = logging.getLogger(__name__)
Expand All @@ -15,13 +15,9 @@ class CloudControlHandler(BaseHandler):
def handle(self):
for region in list(self.regions):
aws_cloudcontrol_client = boto3.client("cloudcontrol", region_name=region)
resources_models = self.regions_config.get(region, {}).get(
"resources_models", ["{}"]
)
resources_models = self.regions_config.get(region, {}).get("resources_models", ["{}"])
for resource_model in list(resources_models):
logger.info(
f"List kind: {self.kind}, region: {region}, resource_model: {resource_model}"
)
logger.info(f"List kind: {self.kind}, region: {region}, resource_model: {resource_model}")
self.next_token = "" if self.next_token is None else self.next_token
while self.next_token is not None:
list_resources_params = {
Expand All @@ -32,127 +28,73 @@ def handle(self):
list_resources_params["NextToken"] = self.next_token

try:
response = aws_cloudcontrol_client.list_resources(
**list_resources_params
)
response = aws_cloudcontrol_client.list_resources(**list_resources_params)
except Exception as e:
logger.error(
f"Failed list kind: {self.kind}, region: {region}, resource_model: {resource_model}; {e}"
)
logger.error(f"Failed list kind: {self.kind}, region: {region}, resource_model: {resource_model}; {e}")
self.skip_delete = True
self.next_token = None
break

self._handle_list_response(response, region)

self.next_token = response.get("NextToken")
if (
self.lambda_context.get_remaining_time_in_millis()
< consts.REMAINING_TIME_TO_REINVOKE_THRESHOLD
):
if self.lambda_context.get_remaining_time_in_millis() < consts.REMAINING_TIME_TO_REINVOKE_THRESHOLD:
# Lambda timeout is too close, should return checkpoint for next run
return self._handle_close_to_timeout(
resources_models, resource_model, region
)
return self._handle_close_to_timeout(resources_models, resource_model, region)

self._cleanup_resources_models(resources_models, resource_model, region)

self._cleanup_regions(region)

return {
"aws_entities": self.aws_entities,
"next_resource_config": None,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": None, "skip_delete": self.skip_delete}

def _handle_list_response(self, list_response, region):
resource_descriptions = list_response.get("ResourceDescriptions", [])
with ThreadPoolExecutor(max_workers=consts.MAX_UPSERT_WORKERS) as executor:
futures = [
executor.submit(
self.handle_single_resource_item,
region,
resource_desc.get("Identifier", ""),
)
for resource_desc in resource_descriptions
]
futures = [executor.submit(self.handle_single_resource_item, region, resource_desc.get("Identifier", "")) for resource_desc in resource_descriptions]
for completed_future in as_completed(futures):
result = completed_future.result()
self.aws_entities.update(result.get("aws_entities", set()))
self.skip_delete = (
result.get("skip_delete", False)
if not self.skip_delete
else self.skip_delete
)
self.skip_delete = result.get("skip_delete", False) if not self.skip_delete else self.skip_delete

def handle_single_resource_item(self, region, resource_id, action_type="upsert"):
entities = []
skip_delete = False
try:
resource_obj = {}
if action_type == "upsert":
logger.info(
f"Get resource for kind: {self.kind}, resource id: {resource_id}"
)
aws_cloudcontrol_client = boto3.client(
"cloudcontrol", region_name=region
)
resource_obj = json.loads(
aws_cloudcontrol_client.get_resource(
TypeName=self.kind, Identifier=resource_id
)
.get("ResourceDescription")
.get("Properties")
)
logger.info(f"Get resource for kind: {self.kind}, resource id: {resource_id}")
aws_cloudcontrol_client = boto3.client("cloudcontrol", region_name=region)
resource_obj = json.loads(aws_cloudcontrol_client.get_resource(TypeName=self.kind, Identifier=resource_id).get("ResourceDescription").get("Properties"))
elif action_type == "delete":
resource_obj = {
"identifier": resource_id
} # Entity identifier to delete
entities = create_entities_json(
resource_obj, self.selector_query, self.mappings, action_type
)
resource_obj = {"identifier": resource_id} # Entity identifier to delete
entities = create_entities_json(resource_obj, self.selector_query, self.mappings, action_type)
except Exception as e:
logger.error(
f"Failed to extract or transform resource id: {resource_id}, kind: {self.kind}, error: {e}"
)
logger.error(f"Failed to extract or transform resource id: {resource_id}, kind: {self.kind}, error: {e}")
skip_delete = True

aws_entities = handle_entities(entities, self.port_client, action_type)

return {"aws_entities": aws_entities, "skip_delete": skip_delete}

def _handle_close_to_timeout(
self, resources_models, current_resource_model, region
):
def _handle_close_to_timeout(self, resources_models, current_resource_model, region):
if self.next_token:
self.selector_aws["next_token"] = self.next_token
else:
self.selector_aws.pop("next_token", None)
resources_models = self._cleanup_resources_models(
resources_models, current_resource_model, region
)
resources_models = self._cleanup_resources_models(resources_models, current_resource_model, region)
if not resources_models:
self._cleanup_regions(region)
if not self.regions: # Nothing left to sync
return {
"aws_entities": self.aws_entities,
"next_resource_config": None,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": None, "skip_delete": self.skip_delete}

if "selector" not in self.resource_config:
self.resource_config["selector"] = {}
self.resource_config["selector"]["aws"] = self.selector_aws

return {
"aws_entities": self.aws_entities,
"next_resource_config": self.resource_config,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": self.resource_config, "skip_delete": self.skip_delete}

def _cleanup_resources_models(
self, resources_models, current_resource_model, region
):
def _cleanup_resources_models(self, resources_models, current_resource_model, region):
resources_models.remove(current_resource_model)
if region not in self.regions_config:
self.regions_config[region] = {}
Expand Down
76 changes: 15 additions & 61 deletions lambda_function/aws/resources/cloudformation_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,42 @@
class CloudFormationHandler(BaseHandler):
def handle(self):
for region in list(self.regions):
aws_cloudformation_client = boto3.client(
"cloudformation", region_name=region
)
aws_cloudformation_client = boto3.client("cloudformation", region_name=region)
logger.info(f"List CloudFormation Stack, region: {region}")
self.next_token = "" if self.next_token is None else self.next_token
while self.next_token is not None:
list_stacks_params = self.selector_aws.get("list_parameters", {})
if self.next_token:
list_stacks_params["NextToken"] = self.next_token
try:
response = aws_cloudformation_client.list_stacks(
**list_stacks_params
)
response = aws_cloudformation_client.list_stacks(**list_stacks_params)
except Exception as e:
logger.error(
f"Failed list CloudFormation Stack, region: {region},"
f" Parameters: {list_stacks_params}; {e}"
)
f" Parameters: {list_stacks_params}; {e}")
self.skip_delete = True
self.next_token = None
break

self._handle_list_response(response, region)

self.next_token = response.get("NextToken")
if (
self.lambda_context.get_remaining_time_in_millis()
< consts.REMAINING_TIME_TO_REINVOKE_THRESHOLD
):
if self.lambda_context.get_remaining_time_in_millis()< consts.REMAINING_TIME_TO_REINVOKE_THRESHOLD:
# Lambda timeout is too close, should return checkpoint for next run
return self._handle_close_to_timeout(region)

self._cleanup_regions(region)

return {
"aws_entities": self.aws_entities,
"next_resource_config": None,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": None, "skip_delete": self.skip_delete}

def _handle_list_response(self, list_response, region):
stacks = list_response.get("StackSummaries", [])
with ThreadPoolExecutor(max_workers=consts.MAX_UPSERT_WORKERS) as executor:
futures = [
executor.submit(
self.handle_single_resource_item, region, stack.get("StackId")
)
for stack in stacks
if stack["StackStatus"] != "DELETE_COMPLETE"
]
futures = [executor.submit(self.handle_single_resource_item, region, stack.get("StackId")) for stack in stacks if stack["StackStatus"] != "DELETE_COMPLETE"]
for completed_future in as_completed(futures):
result = completed_future.result()
self.aws_entities.update(result.get("aws_entities", set()))
self.skip_delete = (
result.get("skip_delete", False)
if not self.skip_delete
else self.skip_delete
)
self.skip_delete = result.get("skip_delete", False) if not self.skip_delete else self.skip_delete

def handle_single_resource_item(self, region, stack_id, action_type="upsert"):
entities = []
Expand All @@ -82,22 +60,10 @@ def handle_single_resource_item(self, region, stack_id, action_type="upsert"):
if action_type == "upsert":
logger.info(f"Get CloudFormation Stack, id: {stack_id}")

aws_cloudformation_client = boto3.client(
"cloudformation", region_name=region
)
stack_obj = aws_cloudformation_client.describe_stacks(
StackName=stack_id
).get("Stacks")[0]
stack_obj[
"StackResources"
] = aws_cloudformation_client.describe_stack_resources(
StackName=stack_id
).get(
"StackResources"
)
template = aws_cloudformation_client.get_template(
StackName=stack_id
).get("TemplateBody")
aws_cloudformation_client = boto3.client("cloudformation", region_name=region)
stack_obj = aws_cloudformation_client.describe_stacks(StackName=stack_id).get("Stacks")[0]
stack_obj["StackResources"] = aws_cloudformation_client.describe_stack_resources(StackName=stack_id).get("StackResources")
template = aws_cloudformation_client.get_template(StackName=stack_id).get("TemplateBody")

# Some templates return as nested OrderedDict, so we need to convert them
# to regular dicts using the json library and then to yaml strings for a clear yaml
Expand All @@ -112,14 +78,10 @@ def handle_single_resource_item(self, region, stack_id, action_type="upsert"):
elif action_type == "delete":
stack_obj = {"identifier": stack_id} # Entity identifier to delete

entities = create_entities_json(
stack_obj, self.selector_query, self.mappings, action_type
)
entities = create_entities_json(stack_obj, self.selector_query, self.mappings, action_type)

except Exception as e:
logger.error(
f"Failed to extract or transform CloudFormation Stack with id: {stack_id}, error: {e}"
)
logger.error(f"Failed to extract or transform CloudFormation Stack with id: {stack_id}, error: {e}")
skip_delete = True

aws_entities = handle_entities(entities, self.port_client, action_type)
Expand All @@ -133,18 +95,10 @@ def _handle_close_to_timeout(self, region):
self.selector_aws.pop("next_token", None)
self._cleanup_regions(region)
if not self.regions: # Nothing left to sync
return {
"aws_entities": self.aws_entities,
"next_resource_config": None,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": None, "skip_delete": self.skip_delete}

if "selector" not in self.resource_config:
self.resource_config["selector"] = {}
self.resource_config["selector"]["aws"] = self.selector_aws

return {
"aws_entities": self.aws_entities,
"next_resource_config": self.resource_config,
"skip_delete": self.skip_delete,
}
return {"aws_entities": self.aws_entities, "next_resource_config": self.resource_config, "skip_delete": self.skip_delete}
Loading

0 comments on commit 6f13008

Please sign in to comment.