-
Notifications
You must be signed in to change notification settings - Fork 1
/
task_identity.py
147 lines (112 loc) · 5.39 KB
/
task_identity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
Tasks to perform on identities, options are
ReportAssignments (list out all assignments on a sub to a file)
ClearUserAssignments (at sub level)
ClearInvalidPrincipals (SP's assigned but not found in AD)
Configuration needs to have the following (example)
"identity" : {
"taskOutputDirectory" : "./identity_summary",
"availableTasks" : {
"ReportAssignments" : "Report all assignments to a file",
"ClearUserAssignments" : "Remove all sub level user assignments",
"ClearInvalidPrincipals" : "Remove any SP that does not have an AAD entry"
},
"active_tasks" : {
"ReportAssignments" : null,
"ClearUserAssignments" : {
"deleteAssignments" : false
},
"ClearInvalidPrincipals" : null
}
},
"""
import os
import json
from microsoft.submaintenance import AzIdentities
from microsoft.submaintenance.utils import(
Configuration,
AzLoginUtils,
PathUtils
)
CREDENTIALS_FILE = "./credentials.json"
CONFIGURATION_FILE = "./configuration.json"
# Ensure a login and switch to SP if requested
try:
AzLoginUtils.validate_login(CREDENTIALS_FILE)
except Exception as ex:
print(str(ex))
quit()
# Load configuration and create instance of identities
configuration = Configuration(CONFIGURATION_FILE)
az_identities = AzIdentities()
allowed_tasks = [
"reportassignments",
"clearuserassignments",
"clearinvalidprincipals"
]
# Validate the minimum on the configuration
if not hasattr(configuration, "subscriptions") or len(configuration.subscriptions) == 0:
raise Exception("Update configuration.json with sub ids")
if not hasattr(configuration, "identity"):
raise Exception("Update configuration.json identity section")
if not configuration.identity["taskOutputDirectory"]:
raise Exception("Update configuration.json identity.taskOutputDirectory section")
# Create output path for all tasks
task_output_path = PathUtils.ensure_path(configuration.identity["taskOutputDirectory"])
# Iterate tasks
for task_name in configuration.identity["active_tasks"]:
if task_name.lower() not in allowed_tasks:
print("Unknown task {} skipping...".format(task_name))
task_settings = configuration.identity["active_tasks"][task_name]
if task_name.lower() == allowed_tasks[0]:
print("Performing ReportAssignments task")
for sub_id in configuration.subscriptions:
role_summary = az_identities.get_role_summary(sub_id)
role_defs = {}
for principal in role_summary:
for assignment in role_summary[principal]:
if assignment["roleDefinitionName"] not in role_defs:
role_defs[assignment["roleDefinitionName"]] = {}
if assignment["principalType"] not in role_defs[assignment["roleDefinitionName"]]:
role_defs[assignment["roleDefinitionName"]][assignment["principalType"]] = 0
role_defs[assignment["roleDefinitionName"]][assignment["principalType"]] += 1
file_path = os.path.join(task_output_path, "{}_categories.json".format(sub_id))
with open(file_path, "w") as output_file:
output_file.writelines(json.dumps(role_defs, indent=4))
file_path = os.path.join(task_output_path, "{}_all.json".format(sub_id))
with open(file_path, "w") as output_file:
output_file.writelines(json.dumps(role_summary, indent=4))
if task_name.lower() == allowed_tasks[1]:
print("Clear user sub level assignments")
stats = {"Found" : 0, "Removed" : 0, "Details" : {}}
for sub_id in configuration.subscriptions:
user_assignments = az_identities.get_users_sub_scope(sub_id)
stats["Found"] += len(user_assignments)
if "deleteAssignments" not in task_settings:
raise Exception("Must have deleteAssignments in identity.active_tasks.ClearUserAssignments in configuration")
perform_delete = task_settings["deleteAssignments"]
if configuration.automation:
perform_delete = True
for user_assignment in user_assignments:
if sub_id not in stats["Details"]:
stats["Details"][sub_id] = []
stats["Details"][sub_id].append("{}-{}".format(
user_assignment.principalName,
user_assignment.roleDefinitionName
)
)
if perform_delete:
stats["Removed"] += 1
# user_assignment.delete()
stats["Details"][sub_id] = sorted(stats["Details"][sub_id])
file_path = os.path.join(task_output_path, "sub_scoped_users.json")
with open(file_path, "w") as output_file:
output_file.writelines(json.dumps(stats, indent=4))
if task_name.lower() == allowed_tasks[2]:
print("Clear invalid service principals")
stats = {}
for sub_id in configuration.subscriptions:
stats[sub_id] = az_identities.clear_invalid_principals(sub_id)
file_path = os.path.join(task_output_path, "invalid_principals.json")
with open(file_path, "w") as output_file:
output_file.writelines(json.dumps(stats, indent=4))