-
Notifications
You must be signed in to change notification settings - Fork 2
/
lambda2.py
84 lines (66 loc) · 2.41 KB
/
lambda2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import boto3
import json
print('Loading function')
dynamo = boto3.client('dynamodb')
def respond(err, res=None):
return {
'statusCode': '400' if err else '200',
'body': err.message if err else json.dumps(res),
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
}
def lambda_handler(event, context):
'''Demonstrates a simple HTTP endpoint using API Gateway. You have full
access to the request and response payload, including headers and
status code.
To scan a DynamoDB table, make a GET request with the TableName as a
query string parameter. To put, update, or delete an item, make a POST,
PUT, or DELETE request respectively, passing in the payload to the
DynamoDB API as a JSON body.
'''
# print("Received event: " + json.dumps(event, indent=2))
operations = {
'DELETE': lambda dynamo, x: dynamo.delete_item(**x),
'GET': lambda dynamo, x: dynamo.scan(**x),
'POST': lambda dynamo, x: dynamo.put_item(**x),
'PUT': lambda dynamo, x: dynamo.update_item(**x),
}
operation = event['httpMethod']
if operation in operations:
payload = event['queryStringParameters'] if operation == 'GET' else json.loads(event['body'])
return respond(None, operations[operation](dynamo, payload))
else:
return respond(ValueError('Unsupported method "{}"'.format(operation)))
###### DO NOT COPY TO AWS LAMBDA CONSOLE FROM HERE
print('--------------------GET event test')
get_event = {
"httpMethod": "GET",
"queryStringParameters": {
"TableName": "ccbda-example"
}
}
result = lambda_handler(get_event, None)
print('--------------------RESULT')
print(json.dumps(result, indent=2))
print('--------------------RESULT body')
print(json.dumps(json.loads(result['body']), indent=2))
print('--------------------POST event test')
myvar = {
'TableName': 'ccbda-example',
'Item': {
'thingid': {
'S': 'no idea'
}
}
}
post_event = {
"httpMethod": "POST",
"body": json.dumps(myvar, separators=(',', ':'))
}
result = lambda_handler(post_event, None)
print('--------------------RESULT')
print(json.dumps(result, indent=2))
print('--------------------RESULT body')
print(json.dumps(json.loads(result['body']), indent=2))