-
Notifications
You must be signed in to change notification settings - Fork 2
/
aws_auth.py
138 lines (121 loc) · 4.91 KB
/
aws_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import kopf
import yaml
import queue
import time
import threading
from kubernetes.client.rest import ApiException
from lib import (
AuthMappingList,
get_config_map,
get_protected_mapping,
update_config_map,
write_config_map,
write_protected_mapping,
write_last_handled_mapping,
get_last_handled_mapping,
get_result_message,
Worker,
Event,
EventType
)
from lib.constants import *
check_not_protected = lambda body, **_: body["metadata"]["name"] not in SYSTEM_MAPPINGS
cm_is_aws_auth = lambda body, **_: body["metadata"]["name"] == "aws-auth"
last_handled_filter = (
lambda body, **_: body["metadata"]["name"] == "aws-auth-last-handled"
)
# kopf.config.WatchersConfig.watcher_retry_delay = 1
@kopf.on.startup()
def startup(logger, settings: kopf.OperatorSettings, memo: kopf.Memo, **kwargs):
# set api watching delay to 1s
settings.watching.reconnect_backoff = 1
if os.getenv(USE_PROTECTED_MAPPING) == "true":
kopf.login_via_client(logger=logger, **kwargs)
pm = get_protected_mapping()
if pm is None:
# get current configmap and save values in protected mapping
auth_config_map = get_config_map()
role_mappings = AuthMappingList(data=auth_config_map.data)
logger.info(role_mappings)
write_protected_mapping(logger, role_mappings.get_values())
logger.info("Startup: {0}".format(pm))
memo.event_queue = queue.Queue()
memo.event_thread = Worker(memo.event_queue, logger)
memo.event_thread.start()
memo.event_queue.put("Starting Operator ...")
@kopf.on.cleanup()
def stop_background_worker(memo: kopf.Memo, **_):
memo.event_queue.put("Finishing background task ...")
memo.event_thread.shutdown_flag.set()
memo.event_thread.join()
@kopf.on.create(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def create_fn(logger, spec, name, meta, memo: kopf.Memo, **kwargs):
if not spec or "mappings" not in spec:
return get_result_message(f"invalid schema {spec}")
mappings_new = AuthMappingList(spec["mappings"])
if overwrites_protected_mapping(logger, mappings_new):
return get_result_message("overwriting protected mapping not possible")
memo.event_queue.put(
Event(event_type=EventType.CREATE, object_name=name, mappings=mappings_new)
)
return get_result_message("Processing")
@kopf.on.update(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def update_fn(logger, spec, old, new, diff, name, memo: kopf.Memo, **kwargs):
if not new or "spec" not in new:
return get_result_message(f"invalid schema {new}")
if "mappings" not in new["spec"]:
new_role_mappings = AuthMappingList()
else:
new_role_mappings = AuthMappingList(new["spec"]["mappings"])
if not old or "spec" not in old or "mappings" not in old["spec"]:
old_role_mappings = AuthMappingList()
else:
old_role_mappings = AuthMappingList(old["spec"]["mappings"])
if overwrites_protected_mapping(logger, new_role_mappings):
raise kopf.PermanentError("Overwriting protected mapping not possible!")
memo.event_queue.put(
Event(
event_type=EventType.UPDATE,
object_name=name,
mappings=new_role_mappings,
old_mappings=old_role_mappings,
)
)
return get_result_message("Processing")
@kopf.on.delete(CRD_GROUP, CRD_VERSION, CRD_NAME, when=check_not_protected)
def delete_fn(logger, spec, meta, name, memo: kopf.Memo, **kwarg):
if not spec or "mappings" not in spec:
return get_result_message(f"invalid schema {spec}")
mappings_delete = AuthMappingList(spec["mappings"])
if overwrites_protected_mapping(logger, mappings_delete):
raise kopf.PermanentError("Overwriting protected mapping not possible!")
memo.event_queue.put(
Event(event_type=EventType.DELETE, object_name=name, mappings=mappings_delete)
)
return get_result_message("Processing")
@kopf.on.event(
"",
"v1",
"configmaps",
when=cm_is_aws_auth,
)
def log_config_map_change(logger, body, **kwargs):
lm = get_last_handled_mapping()
if lm is not None:
old_mappings = AuthMappingList(lm["spec"]["mappings"])
new_mappings = AuthMappingList(data=body["data"])
change = list(old_mappings.diff(new_mappings))
logger.info(f"Change to aws-auth configmap: {change}")
else:
logger.error(f"last mapping not found: {body}")
def overwrites_protected_mapping(logger, check_mapping: AuthMappingList) -> bool:
if os.getenv(USE_PROTECTED_MAPPING) == "true":
pm = get_protected_mapping()
logger.info(f"Protected mapping: {pm}")
if pm is not None:
protected_mapping = AuthMappingList(pm["spec"]["mappings"])
if check_mapping in protected_mapping:
logger.error("Overiding protected Entries not allowed!")
return True
return False