forked from DataTalksClub/mlops-zoomcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_function.py
72 lines (50 loc) · 1.78 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import json
import boto3
import base64
import mlflow
kinesis_client = boto3.client('kinesis')
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'ride_predictions')
RUN_ID = os.getenv('RUN_ID')
logged_model = f's3://mlflow-models-alexey/1/{RUN_ID}/artifacts/model'
# logged_model = f'runs:/{RUN_ID}/model'
model = mlflow.pyfunc.load_model(logged_model)
TEST_RUN = os.getenv('TEST_RUN', 'False') == 'True'
def prepare_features(ride):
features = {}
features['PU_DO'] = '%s_%s' % (ride['PULocationID'], ride['DOLocationID'])
features['trip_distance'] = ride['trip_distance']
return features
def predict(features):
pred = model.predict(features)
return float(pred[0])
def lambda_handler(event, context):
# print(json.dumps(event))
predictions_events = []
for record in event['Records']:
encoded_data = record['kinesis']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
ride_event = json.loads(decoded_data)
# print(ride_event)
ride = ride_event['ride']
ride_id = ride_event['ride_id']
features = prepare_features(ride)
prediction = predict(features)
prediction_event = {
'model': 'ride_duration_prediction_model',
'version': '123',
'prediction': {
'ride_duration': prediction,
'ride_id': ride_id
}
}
if not TEST_RUN:
kinesis_client.put_record(
StreamName=PREDICTIONS_STREAM_NAME,
Data=json.dumps(prediction_event),
PartitionKey=str(ride_id)
)
predictions_events.append(prediction_event)
return {
'predictions': predictions_events
}