-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapplication.py
147 lines (120 loc) · 5.38 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import flask
import neomodel
from flask import Flask, jsonify, request
from flask_api import status
import os
import json
import sys
import uuid
import redis
import datetime
from neomodel import config
from data_models import Person, Place, WentToPlaceRel, ContactWithRel
from create_user import create_new_user, ingestPushNotificationAndCreateID
from healthcheck import health_check
from push_notification import send_push_message
from utilities import make_response, unobfuscate, check_for_values_in_request, send_message, admin_protected
from report_sickness import reportSicknessSum, get_reported_symptoms
from redis_place_interactions import ingest_location_update, get_contacted_ids
from invite_and_signup import invite_new_user, sign_up
from database_access_helpers import (report_contact_between_individuals,
report_visited_place, retrieve_or_create_protouser_from_number,
retrieve_or_create_person_from_identifier, does_proto_user_exist, get_is_at_risk)
from redis_purge_place import purge_place,purge_all
application = Flask(__name__)
r = redis.Redis(host=os.getenv('REDIS_HOST'), port=os.getenv('REDIS_PORT'), db=0, password=os.getenv('REDIS_PWD'))
config.DATABASE_URL = os.getenv('NEO_DATABASE_URL')
@application.route('/', methods=['GET','POST'])
def get_health():
return health_check()
# Takes in an object with the push_token key set to the push notification key of the client
# Ingests the push token and associates it with a unique identifier alias
# returns an object with the unique identifier alias for that client.
@application.route('/request-identifier', methods=["POST"])
def ingestPushNotificationToken():
return ingestPushNotificationAndCreateID(r)
#Create New User
#Takes a request with a JSON body with a phoneNumber parameter
@application.route('/create_new_user', methods=['POST'])
def createNewUser():
return create_new_user()
#The application reports that a person has tested positive for coronavirus. From this application, they
#recieve a list of obfuscated ID's, the list of places a person has been to, and the diagnosis.
#We use this data to notify the contacted individuals, and then record the necessary relations in our database.
@application.route('/report_sickness_or_positive_test', methods=['POST'])
def reportSickness():
return reportSicknessSum(r)
#Confirmed Case Exposed - At Risk
#Exposed to N people with Symptoms - Cautious
#Not Exposed - Normal.
@application.route('/get_exposure_risk', methods=['POST'])
def getExposureRisk():
content = request.get_json()
necessary_values = ['identifier']
isGoodRequest = check_for_values_in_request(necessary_values, content)
if (isGoodRequest[0] == False):
return make_response({'response': 'bad request, please try again and specify the ' + str(isGoodRequest[1]) + ' parameter in the JSON request body.'}, status.HTTP_400_BAD_REQUEST)
identifier = content['identifier']
is_risky = get_is_at_risk(identifier)
if (is_risky == False):
toReturn = {
"exposure_risk":"NORMAL",
"rationale": "We have not detected that you have been in contact with a carrier of COVID-19.",
"guidance": "Continue to Comply with local quarantine and lockdown guidelines."
}
else:
toReturn = {
"exposure_risk":"AT_RISK",
"rationale": "We have detected that you have been in contact with a carrier of COVID-19, or someone who has reported symptoms.",
"guidance": "Continue to Comply with local quarantine and lockdown guidelines, but seek medical help if you begin to experience symptoms yourself."
}
return make_response(toReturn, http_status=status.HTTP_200_OK)
@application.route('/invite-new-user', methods=['POST'])
def inviteNewUser():
return invite_new_user(r)
@application.route('/signup', methods=['POST'])
def signUp():
return sign_up(r)
@application.route('/get-symptoms', methods=['POST'])
def getReportedSymptoms():
return get_reported_symptoms(r)
# Should be a POST w/ data as an object w/ keys {
# new_place_id,
# old_place_id,
# identifier
# }
# We should move the identifier from old_placed_id to new_place_id buckets.
#If there is no old_place_id, please enter "None"
@application.route('/update-location', methods=["POST"])
def ingestLocationUpdate():
return ingest_location_update(r)
# Takes a place_id and returns the list of all user identifiers currently in that place_id bucket
# as an array with key 'tokens' (see skeleton code below)
@application.route('/get-contacted-ids', methods=["POST"])
def getContactedIds():
return get_contacted_ids(r)
@application.route('/get-infected-places', methods=["POST"])
def get_infected_locations():
locationsList = Place.nodes.all()
json_list = []
for node in locationsList:
location = {
"latitude": node.gpsLAT,
"longitude":node.gpsLONG,
}
json_list.append(location)
return make_response({'locations':json_list}, status.HTTP_200_OK)
#Takes place_id and admin_token (as token in body) and
#removes all members
@application.route('/admin/purge-place',methods=['POST'])
@admin_protected
def purge_places(content):
return purge_place(r,content)
#admin_token (as token in body) and
#removes all sets from db
@application.route('/admin/purge-all',methods=['POST'])
@admin_protected
def purge(content):
return purge_all(r,content)
if __name__ == "__main__":
application.run(debug=False)