-
Notifications
You must be signed in to change notification settings - Fork 1
/
task_rg_compliance.py
132 lines (109 loc) · 5.02 KB
/
task_rg_compliance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Tasks to perform on identities, options are
EnforceCompliance (deletes on flag true or reports on non compliant RGs)
DeleteGroups (deletes specified groups from specified sub)
"groupCompliance" : {
"taskOutputDirectory" : "./logs/group_compliance",
"availableTasks" : {
"EnforceCompliance" : "Remove groups not meeting a specific rule set",
"DeleteGroups" : "Non automatable, fill in sub ID and groups to remove"
},
"active_tasks" : {
"EnforceCompliance" : {
"required_tags": ["alias"],
"ignored" : ["cleanupservice", "defaultresourcegroup","networkwatcherrg", "visualstudioonline-", "cloud-shell-storage-"],
"delete_on_missing" : false
},
"DeleteGroups" : {
"subscription" : "YOUR_SUB_ID",
"groups" : [
"devops-test-15449",
"devops-test-24407",
"devops-test-29027",
"devops-test-4715"
]
}
}
},
"""
import os
import json
from microsoft.submaintenance import AzGroupCompliance
from microsoft.submaintenance.utils import(
Configuration,
AzLoginUtils,
PathUtils,
AzResourceGroupUtils
)
CREDENTIALS_FILE = "./credentials.json"
CONFIGURATION_FILE = "./configuration.json"
# Ensure a login and switch to SP if requested
try:
AzLoginUtils.validate_login(CREDENTIALS_FILE)
except Exception as ex:
print(str(ex))
quit()
# Load configuration and create instance of identities
configuration = Configuration(CONFIGURATION_FILE)
allowed_tasks = [
"enforcecompliance",
"deletegroups"]
# Validate the minimum on the configuration
if not hasattr(configuration, "subscriptions") or len(configuration.subscriptions) == 0:
raise Exception("Update configuration.json with sub ids")
if not hasattr(configuration, "groupCompliance"):
raise Exception("Update configuration.json groupCompliance section")
if not configuration.groupCompliance["taskOutputDirectory"]:
raise Exception("Update configuration.json groupCompliance.taskOutputDirectory section")
# Create output path for all tasks
task_output_path = PathUtils.ensure_path(configuration.groupCompliance["taskOutputDirectory"])
delete_groups_flag_set = False
delete_groups_flag = False
# Iterate tasks
group_compliance = AzGroupCompliance()
for task_name in configuration.groupCompliance["active_tasks"]:
if task_name.lower() not in allowed_tasks:
print("Unknown task {} skipping...".format(task_name))
task_settings = configuration.groupCompliance["active_tasks"][task_name]
if task_name.lower() == allowed_tasks[0]:
print("Performing GroupCompliance task")
if "required_tags" not in task_settings:
raise Exception("Must have required_tags in groupCompliance.active_tasks.EnforceCompliance in configuration")
if "ignored" not in task_settings:
raise Exception("Must have ignored in groupCompliance.active_tasks.EnforceCompliance in configuration")
if "delete_on_missing" not in task_settings:
raise Exception("Must have delete_on_missing in groupCompliance.active_tasks.EnforceCompliance in configuration")
if not delete_groups_flag_set:
delete_groups_flag_set = True
if configuration.automation:
print("Automation will bypass asking for permission....")
delete_groups_flag = True
elif task_settings["delete_on_missing"]:
delete_groups_flag = True
resp = input("Delete untagged groups (Y/y)? > ")
if resp.lower() != "y":
delete_groups_flag = False
for sub_id in configuration.subscriptions:
filtered_groups = group_compliance.get_filtered_groups(
sub_id,
task_settings["ignored"],
task_settings["required_tags"]
)
if delete_groups_flag:
for group in filtered_groups.untagged:
AzResourceGroupUtils.delete_group(group, sub_id)
file_path = os.path.join(task_output_path, "{}.json".format(sub_id))
with open(file_path, "w") as output_file:
output_file.writelines(json.dumps(filtered_groups.__dict__, indent=4))
if task_name.lower() == allowed_tasks[1]:
print("Performing DeleteGroups task")
print(json.dumps(task_settings, indent=4))
if "subscription" not in task_settings:
raise Exception("Must have subscription in groupCompliance.active_tasks.DeleteGroups in configuration")
if "groups" not in task_settings:
raise Exception("Must have groups in groupCompliance.active_tasks.DeleteGroups in configuration")
if len(task_settings["groups"]) == 0:
print("No groups were identified to be deleted")
else:
for group in task_settings["groups"]:
AzResourceGroupUtils.delete_group(group, task_settings["subscription"])