This repository has been archived by the owner on Oct 28, 2022. It is now read-only.
forked from TierMobility/aws-auth-operator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_auth.py
154 lines (138 loc) · 6.28 KB
/
aws_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import kopf
import yaml
from kubernetes.client.rest import ApiException
from lib import (
AuthMappingList,
get_config_map,
get_protected_mapping,
update_config_map,
write_config_map,
write_protected_mapping,
write_last_handled_mapping,
get_last_handled_mapping,
get_result_message,
)
from lib.constants import *
check_not_protected = lambda body, **_: body["metadata"]["name"] not in SYSTEM_MAPPINGS
cm_is_aws_auth = lambda body, **_: body["metadata"]["name"] == "aws-auth"
# kopf.config.WatchersConfig.watcher_retry_delay = 1
@kopf.on.startup()
def startup(logger, settings: kopf.OperatorSettings, **kwargs):
# set api watching delay to 1s
settings.watching.reconnect_backoff = 1
if os.getenv(USE_PROTECTED_MAPPING) == "true":
kopf.login_via_client(logger=logger, **kwargs)
pm = get_protected_mapping()
if pm is None:
# get current configmap and save values in protected mapping
auth_config_map = get_config_map()
role_mappings = AuthMappingList(data=auth_config_map.data)
logger.info(role_mappings)
write_protected_mapping(logger, role_mappings.get_values())
logger.info("Startup: {0}".format(pm))
@kopf.on.create(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def create_fn(logger, spec, meta, **kwargs):
logger.info(f"And here we are! Creating: {spec}")
if not spec or "mappings" not in spec:
return get_result_message(f"invalid schema {spec}")
mappings_new = AuthMappingList(spec["mappings"])
if overwrites_protected_mapping(logger, mappings_new):
return get_result_message("overwriting protected mapping not possible")
try:
auth_config_map = get_config_map()
current_config_mapping = AuthMappingList(data=auth_config_map.data)
# save current config before change
write_last_handled_mapping(logger, current_config_mapping.get_values())
# add new roles
current_config_mapping.merge_mappings(mappings_new)
auth_config_map = update_config_map(
auth_config_map, current_config_mapping.get_data()
)
response = write_config_map(auth_config_map)
response_data = AuthMappingList(data=response.data)
if mappings_new not in response_data:
raise kopf.PermanentError("Add Roles failed")
except ApiException as e:
raise kopf.PermanentError(f"Exception: {e}")
return get_result_message("All good")
@kopf.on.update(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def update_fn(logger, spec, old, new, diff, **kwargs):
if not new or "spec" not in new:
return get_result_message(f"invalid schema {new}")
if "mappings" not in new["spec"]:
new_role_mappings = AuthMappingList()
else:
new_role_mappings = AuthMappingList(new["spec"]["mappings"])
if not old or "spec" not in old or "mappings" not in old["spec"]:
old_role_mappings = AuthMappingList()
else:
old_role_mappings = AuthMappingList(old["spec"]["mappings"])
if overwrites_protected_mapping(logger, new_role_mappings):
return get_result_message("overwriting protected mapping not possible")
try:
auth_config_map = get_config_map()
current_config_mapping = AuthMappingList(data=auth_config_map.data)
# save current config before change
write_last_handled_mapping(logger, current_config_mapping.get_values())
# remove old stuff first
current_config_mapping.remove_mappings(old_role_mappings)
# add new values
current_config_mapping.merge_mappings(new_role_mappings)
auth_config_map = update_config_map(
auth_config_map, current_config_mapping.get_data()
)
response = write_config_map(auth_config_map)
response_data = AuthMappingList(data=response.data)
if len(new_role_mappings) > 0 and new_role_mappings not in response_data:
raise kopf.PermanentError("Update Roles failed")
except ApiException as e:
raise kopf.PermanentError(f"Exception: {e}")
return get_result_message("All good")
@kopf.on.delete(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def delete_fn(logger, spec, meta, **kwarg):
logger.info(f"DELETING: {spec}")
if not spec or "mappings" not in spec:
return get_result_message(f"invalid schema {spec}")
mappings_delete = AuthMappingList(spec["mappings"])
if overwrites_protected_mapping(logger, mappings_delete):
kopf.PermanentError("Overwriting protected mapping not possible!")
try:
auth_config_map = get_config_map()
current_config_mapping = AuthMappingList(data=auth_config_map.data)
# save current config before change
write_last_handled_mapping(logger, current_config_mapping.get_values())
# remove old roles
current_config_mapping.remove_mappings(mappings_delete)
auth_config_map = update_config_map(
auth_config_map, current_config_mapping.get_data()
)
response = write_config_map(auth_config_map)
response_data = AuthMappingList(data=response.data)
if mappings_delete in response_data:
raise kopf.PermanentError("Delete Roles failed")
except ApiException as e:
raise kopf.PermanentError(f"Exception: {e}")
return get_result_message("All good")
@kopf.on.event(
"", "v1", "configmaps", when=cm_is_aws_auth,
)
def log_config_map_change(logger, body, **kwargs):
lm = get_last_handled_mapping()
if lm is not None:
old_mappings = AuthMappingList(lm["spec"]["mappings"])
new_mappings = AuthMappingList(data=body["data"])
change = list(old_mappings.diff(new_mappings))
logger.info(f"Change to aws-auth configmap: {change}")
else:
logger.error(f"last mapping not found: {body}")
def overwrites_protected_mapping(logger, check_mapping: AuthMappingList) -> bool:
if os.getenv(USE_PROTECTED_MAPPING) == "true":
pm = get_protected_mapping()
logger.info(f"Protected mapping: {pm}")
if pm is not None:
protected_mapping = AuthMappingList(pm["spec"]["mappings"])
if check_mapping in protected_mapping:
logger.error("Overiding protected Entries not allowed!")
return True
return False