-
Notifications
You must be signed in to change notification settings - Fork 1
/
convert_state_file_to_diagram_generator_input.py
executable file
·204 lines (152 loc) · 7.81 KB
/
convert_state_file_to_diagram_generator_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import hashlib
import json
from schema_terraform_state_file import check_state_file_against_schema
from schema_diagram_generator_input import check_input_json_against_schema
def process_json(input_json):
if isinstance(input_json, str):
data = json.loads(input_json)
elif isinstance(input_json, dict):
data = input_json
else:
raise TypeError('input_json must be a str or dict')
check_state_file_against_schema(data)
resources = data['values']['root_module']['resources']
output = {
"prefix_lists": [],
"cidr_blocks": [],
"ipv6_cidr_blocks": [],
"security_groups": [],
}
prefix_lists = {}
security_groups = {}
# merge resources in child modules into root module
if 'child_modules' in data['values']['root_module']:
for child_module in data['values']['root_module']['child_modules']:
resources = resources + child_module['resources']
# get all prefix lists and security groups
for resource in resources:
if resource['type'] == 'aws_ec2_managed_prefix_list':
# check if index is set; if so, adjust name
if resource['index'] is not None:
resource['name'] = f"{resource['name']}.{resource['index']}"
prefix_lists[resource['name']] = {
"id": resource['values']['id'],
"name": resource['values']['name'],
"account_id": resource['values']['owner_id'],
"entries": resource['values']['entry'],
"not_defined_in_state": False
}
elif resource['type'] == 'aws_security_group':
# check if index is set; if so, adjust name
if 'index' in resource and resource['index'] is not None:
resource['name'] = f"{resource['name']}.{resource['index']}"
security_groups[resource['name']] = {
"id": resource['values']['id'],
"name": resource['values']['name'],
"account_id": resource['values']['owner_id'],
"ingress_rules": [],
"egress_rules": [],
"not_defined_in_state": False
}
print(resources)
# loop through security groups and add rules
security_group_rules = {}
for resource in resources:
if resource['type'] == 'aws_security_group':
def process_rules(rule_type, input_rule):
extracted_security_group_ids = []
for sg_id in input_rule['security_groups']:
if '/' in sg_id:
sg_id = sg_id.split('/')[-1]
extracted_security_group_ids.append(sg_id)
if input_rule['self'] is True:
extracted_security_group_ids.append(resource['values']['id'])
rule = {
"security_group_id": resource['values']['id'],
"type": rule_type,
"description": input_rule['description'],
"from_port": input_rule['from_port'],
"to_port": input_rule['to_port'],
"protocol": input_rule['protocol'],
"source_security_group_ids": extracted_security_group_ids,
"source_security_group_ids_raw": input_rule['security_groups'],
"cidr_blocks": input_rule['cidr_blocks'],
"ipv6_cidr_blocks": input_rule['ipv6_cidr_blocks'],
"prefix_list_ids": input_rule['prefix_list_ids']
}
return rule
# loop through ingress rule and create rule objects for each
for ingress_rule in resource['values']['ingress']:
rule = process_rules("ingress", ingress_rule)
sg_rule_id = hashlib.sha256(json.dumps(rule).encode('utf-8')).hexdigest()
if sg_rule_id not in security_group_rules:
security_group_rules[sg_rule_id] = rule
for egress_rule in resource['values']['egress']:
rule = process_rules("egress", egress_rule)
sg_rule_id = hashlib.sha256(json.dumps(rule).encode('utf-8')).hexdigest()
if sg_rule_id not in security_group_rules:
security_group_rules[sg_rule_id] = rule
output['prefix_lists'] = list(prefix_lists.values())
output['security_groups'] = list(security_groups.values())
for rule in security_group_rules.values():
if rule['type'] == 'ingress':
for security_group in output['security_groups']:
if security_group['id'] == rule['security_group_id']:
security_group["ingress_rules"].append(rule)
elif rule['type'] == 'egress':
for security_group in output['security_groups']:
if security_group['id'] == rule['security_group_id']:
security_group["egress_rules"].append(rule)
# extract cidr blocks and ipv6 cidr blocks from security group rules
output['cidr_blocks'].extend(ingress_rule['cidr_blocks'])
output['ipv6_cidr_blocks'].extend(ingress_rule['ipv6_cidr_blocks'])
# add unknown prefix lists to prefix list list
for prefix_list_id in rule['prefix_list_ids']:
if not any(prefix_list['id'] == prefix_list_id for prefix_list in output['prefix_lists']):
output['prefix_lists'].append({
"id": prefix_list_id,
"name": None,
"account_id": None,
"entries": None,
"not_defined_in_state": True
})
# add unknown security groups to security group list
for security_group_id in rule['source_security_group_ids']:
#print(rule)
if not any(security_group['id'] == security_group_id for security_group in output['security_groups']):
# print("security group " + security_group_id + " not found")
parent_sg_id = rule['security_group_id']
sg_id = security_group_id
# get account id from parent
for security_group in output['security_groups']:
if security_group['id'] == parent_sg_id:
account_id = security_group['account_id']
break
# get raw security group id from rule
for raw_sg_id in rule['source_security_group_ids_raw']:
if security_group_id in raw_sg_id:
sg_id_raw = raw_sg_id
break
if '/' in sg_id_raw:
sg_id = sg_id_raw.split('/')[-1]
account_id = sg_id_raw.split('/')[0]
output['security_groups'].append({
"id": sg_id,
"name": sg_id,
"account_id": account_id,
"ingress_rules": [],
"egress_rules": [],
"not_defined_in_state": True
})
# Remove duplicates from cidr_blocks and ipv6_cidr_blocks lists
output['cidr_blocks'] = list(set(output['cidr_blocks']))
output['ipv6_cidr_blocks'] = list(set(output['ipv6_cidr_blocks']))
# remove all raw security group ids from all rules
for security_group in output['security_groups']:
for ingress_rule in security_group['ingress_rules']:
ingress_rule.pop('source_security_group_ids_raw', None)
for egress_rule in security_group['egress_rules']:
egress_rule.pop('source_security_group_ids_raw', None)
# print(output)
check_input_json_against_schema(output)
return output