forked from DataDog/datadog-serverless-functions
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_function.py
377 lines (330 loc) · 12.5 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import gzip
import json
import os
import re
import time
import base64
import random
from io import BufferedReader, BytesIO
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
import botocore
import boto3
DD_SITE = os.getenv("DD_SITE", default="datadoghq.com")
def _datadog_keys():
if "kmsEncryptedKeys" in os.environ:
KMS_ENCRYPTED_KEYS = os.environ["kmsEncryptedKeys"]
kms = boto3.client("kms")
# kmsEncryptedKeys should be created through the Lambda's encryption
# helpers and as such will have the EncryptionContext
return json.loads(
kms.decrypt(
CiphertextBlob=base64.b64decode(KMS_ENCRYPTED_KEYS),
EncryptionContext={
"LambdaFunctionName": os.environ["AWS_LAMBDA_FUNCTION_NAME"]
},
)["Plaintext"]
)
if "DD_API_KEY_SECRET_ARN" in os.environ:
SECRET_ARN = os.environ["DD_API_KEY_SECRET_ARN"]
DD_API_KEY = boto3.client("secretsmanager").get_secret_value(
SecretId=SECRET_ARN
)["SecretString"]
return {"api_key": DD_API_KEY}
if "DD_API_KEY_SSM_NAME" in os.environ:
SECRET_NAME = os.environ["DD_API_KEY_SSM_NAME"]
DD_API_KEY = boto3.client("ssm").get_parameter(
Name=SECRET_NAME, WithDecryption=True
)["Parameter"]["Value"]
return {"api_key": DD_API_KEY}
if "DD_KMS_API_KEY" in os.environ:
ENCRYPTED = os.environ["DD_KMS_API_KEY"]
# For interop with other DD Lambdas taking in DD_KMS_API_KEY, we'll
# optionally try the EncryptionContext associated with this Lambda.
try:
DD_API_KEY = boto3.client("kms").decrypt(
CiphertextBlob=base64.b64decode(ENCRYPTED),
EncryptionContext={
"LambdaFunctionName": os.environ["AWS_LAMBDA_FUNCTION_NAME"]
},
)["Plaintext"]
except botocore.exceptions.ClientError:
DD_API_KEY = boto3.client("kms").decrypt(
CiphertextBlob=base64.b64decode(ENCRYPTED),
)["Plaintext"]
if type(DD_API_KEY) is bytes:
# If the CiphertextBlob was encrypted with AWS CLI, we
# need to re-encode this in base64
try:
DD_API_KEY = DD_API_KEY.decode("utf-8")
except UnicodeDecodeError as e:
print(
"INFO DD_KMS_API_KEY: Could not decode key in utf-8, encoding in b64. Exception:",
e,
)
DD_API_KEY = base64.b64encode(DD_API_KEY)
DD_API_KEY = DD_API_KEY.decode("utf-8")
except Exception as e:
print("ERROR DD_KMS_API_KEY Unknown exception decoding key:", e)
return {"api_key": DD_API_KEY}
if "DD_API_KEY" in os.environ:
DD_API_KEY = os.environ["DD_API_KEY"]
return {"api_key": DD_API_KEY}
raise ValueError(
"Datadog API key is not defined, see documentation for environment variable options"
)
# Preload the keys so we can bail out early if they're misconfigured
datadog_keys = _datadog_keys()
print("INFO Lambda function initialized, ready to send metrics")
def _process_rds_enhanced_monitoring_message(ts, message, account, region):
instance_id = message["instanceID"]
host_id = message["instanceResourceID"]
tags = [
"dbinstanceidentifier:%s" % instance_id,
"aws_account:%s" % account,
"engine:%s" % message["engine"],
]
# metrics generation
# uptime: "54 days, 1:53:04" to be converted into seconds
uptime = 0
uptime_msg = re.split(" days?, ", message["uptime"]) # edge case "1 day 1:53:04"
if len(uptime_msg) == 2:
uptime += 24 * 3600 * int(uptime_msg[0])
uptime_day = uptime_msg[-1].split(":")
uptime += 3600 * int(uptime_day[0])
uptime += 60 * int(uptime_day[1])
uptime += int(uptime_day[2])
stats.gauge("aws.rds.uptime", uptime, timestamp=ts, tags=tags, host=host_id)
stats.gauge(
"aws.rds.virtual_cpus",
message["numVCPUs"],
timestamp=ts,
tags=tags,
host=host_id,
)
if "loadAverageMinute" in message:
stats.gauge(
"aws.rds.load.1",
message["loadAverageMinute"]["one"],
timestamp=ts,
tags=tags,
host=host_id,
)
stats.gauge(
"aws.rds.load.5",
message["loadAverageMinute"]["five"],
timestamp=ts,
tags=tags,
host=host_id,
)
stats.gauge(
"aws.rds.load.15",
message["loadAverageMinute"]["fifteen"],
timestamp=ts,
tags=tags,
host=host_id,
)
for namespace in ["cpuUtilization", "memory", "tasks", "swap"]:
for key, value in message.get(namespace, {}).items():
stats.gauge(
"aws.rds.%s.%s" % (namespace.lower(), key),
value,
timestamp=ts,
tags=tags,
host=host_id,
)
for network_stats in message.get("network", []):
if "interface" in network_stats:
network_tag = ["interface:%s" % network_stats.pop("interface")]
else:
network_tag = []
for key, value in network_stats.items():
stats.gauge(
"aws.rds.network.%s" % key,
value,
timestamp=ts,
tags=tags + network_tag,
host=host_id,
)
for disk_stats in message.get("diskIO", []):
disk_tag = []
if "device" in disk_stats:
disk_tag.append("%s:%s" % ("device", disk_stats.pop("device")))
for key, value in disk_stats.items():
stats.gauge(
"aws.rds.diskio.%s" % key,
value,
timestamp=ts,
tags=tags + disk_tag,
host=host_id,
)
for fs_stats in message.get("fileSys", []):
fs_tag = []
for tag_key in ["name", "mountPoint"]:
if tag_key in fs_stats:
fs_tag.append("%s:%s" % (tag_key, fs_stats.pop(tag_key)))
for key, value in fs_stats.items():
stats.gauge(
"aws.rds.filesystem.%s" % key,
value,
timestamp=ts,
tags=tags + fs_tag,
host=host_id,
)
for process_stats in message.get("processList", []):
process_tag = []
for tag_key in ["name", "id"]:
if tag_key in process_stats:
process_tag.append("%s:%s" % (tag_key, process_stats.pop(tag_key)))
for key, value in process_stats.items():
stats.gauge(
"aws.rds.process.%s" % key,
value,
timestamp=ts,
tags=tags + process_tag,
host=host_id,
)
for pd_stats in message.get("physicalDeviceIO", []):
pd_tag = []
if "device" in pd_stats:
pd_tag.append("%s:%s" % ("device", pd_stats.pop("device")))
for key, value in pd_stats.items():
stats.gauge(
"aws.rds.physicaldeviceio.%s" % key,
value,
timestamp=ts,
tags=tags + pd_tag,
host=host_id,
)
for disks_stats in message.get("disks", []):
disks_tag = []
if "name" in disks_stats:
disks_tag.append("%s:%s" % ("name", disks_stats.pop("name")))
for key, value in disks_stats.items():
stats.gauge(
"aws.rds.disks.%s" % key,
value,
timestamp=ts,
tags=tags + disks_tag,
host=host_id,
)
if "system" in message:
for key, value in message["system"].items():
stats.gauge(
"aws.rds.system.%s" % key,
value,
timestamp=ts,
tags=tags,
host=host_id,
)
def extract_json_objects(input_string):
"""
Extract JSON objects if the log_event["message"] is not properly formatted like this:
{"a":2}{"b":{"c":3}}
Supports JSON with a depth of 6 at maximum (recursion requires regex package)
"""
in_string, open_brackets, json_objects, start = False, 0, [], 0
for idx, char in enumerate(input_string):
# Ignore escaped quotes
if char == '"' and (idx == 0 or input_string[idx - 1] != "\\"):
in_string = not in_string
elif char == "{" and not in_string:
open_brackets += 1
elif char == "}" and not in_string:
open_brackets -= 1
if open_brackets == 0:
json_objects += [input_string[start : idx + 1]]
start = idx + 1
return json_objects
def lambda_handler(event, context):
"""Process a RDS enhanced monitoring DATA_MESSAGE,
coming from CLOUDWATCH LOGS
"""
# event is a dict containing a base64 string gzipped
with gzip.GzipFile(
fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"]))
) as decompress_stream:
data = b"".join(BufferedReader(decompress_stream))
event = json.loads(data)
account = event["owner"]
region = context.invoked_function_arn.split(":", 4)[3]
log_events = event["logEvents"]
for log_event in log_events:
ts = log_event["timestamp"] / 1000
# Try to parse all objects as JSON before going into processing
# In case one of the json.loads operation fails, revert to previous behavior
json_objects = []
try:
messages = extract_json_objects(log_event["message"])
for json_object in messages:
json_objects += [json.loads(json_object)]
except:
json_objects += [json.loads(log_event["message"])]
for message in json_objects:
_process_rds_enhanced_monitoring_message(ts, message, account, region)
stats.flush()
return {"Status": "OK"}
# Helpers to send data to Datadog, inspired from https://github.com/DataDog/datadogpy
class Stats(object):
def __init__(self, base=2, cap=30, max_attempts=5):
self.series = []
self.base = base
self.cap = cap
self.max_attempts = max_attempts
def _backoff(self, n):
v = min(self.cap, pow(2, n) * self.base)
return random.uniform(0, v)
def gauge(self, metric, value, timestamp=None, tags=None, host=None):
base_dict = {
"metric": metric,
"points": [(int(timestamp or time.time()), value)],
"type": "gauge",
"tags": tags,
}
if host:
base_dict.update({"host": host})
self.series.append(base_dict)
def flush(self):
metrics_dict = {
"series": self.series,
}
self.series = []
creds = urlencode(datadog_keys)
data = json.dumps(metrics_dict).encode("ascii")
url = "%s?%s" % (
datadog_keys.get("api_host", "https://app.%s/api/v1/series" % DD_SITE),
creds,
)
req = Request(url, data, {"Content-Type": "application/json"})
for attempt in range(1, self.max_attempts + 1):
try:
with urlopen(req) as response:
print(
"INFO Submitted data with status: {}".format(response.getcode())
)
except HTTPError as e:
if e.code in (500, 502, 503, 504):
if attempt == self.max_attempts:
print(
"ERROR Exceeded max number of retries, dropping data: {}".format(
e.read()
)
)
break
t = self._backoff(attempt)
print("ERROR {}. Retrying in {} seconds...".format(e.read(), t))
time.sleep(t)
else:
print(
"ERROR {}, not retrying with status {}".format(e.read(), e.code)
)
break
except Exception as e:
print("ERROR Dropping data: {}".format(e))
break
stats = Stats()