This repository has been archived by the owner on Jan 26, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
225 lines (191 loc) · 8.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Define source field names.
import functools
import os
import pandas as pd
from covid.constants import PATH_TO_SERVICE_ACCOUNT_KEY
from covid.extract import DATE_SOURCE_FIELD
from covid.extract import extract_cdc_ili_data
from covid.extract import extract_covidtracking_historical_data
from covid.load import get_sheets_client
from covid.load import post_dataframe_to_google_sheets
from covid.load_utils import sleep_and_log
from covid.transform import CRITERIA_1_SUMMARY_COLUMNS
from covid.transform import CRITERIA_2_SUMMARY_COLUMNS
from covid.transform import CRITERIA_5_SUMMARY_COLUMNS
from covid.transform import CRITERIA_6_SUMMARY_COLUMNS
from covid.transform import CRITERIA_COMBINED_SUMMARY_COLUMNS
from covid.transform import LAST_RAN_FIELD
from covid.transform import LAST_UPDATED_FIELD
from covid.transform import STATE_FIELD
from covid.transform import transform_cdc_ili_data
from covid.transform import transform_covidtracking_data
from covid.transform_utils import calculate_state_summary
# Define the names of the tabs to upload to.
FOR_WEBSITE_TAB_NAME = "For Website"
ALL_STATE_DATA_TAB_NAME = "All State Data"
WORK_IN_PROGRESS_NY_ONLY_TAB_NAME = f"{ALL_STATE_DATA_TAB_NAME} (NY Only)"
STATE_SUMMARY_TAB_NAME = "State Summary"
CDC_CRITERIA_1_GOOGLE_WORKBOOK_KEY = "17L2TUH03_43YDoqBoDq13HMZG1V92kw49ESIuQNTHbU"
CDC_CRITERIA_2_GOOGLE_WORKBOOK_KEY = "1OrpOScYpPchM2Ug9fXdQRIl2Rlc_HCQ2KEdJm2fgh_s"
CDC_CRITERIA_3_GOOGLE_WORKBOOK_KEY = "10mBKVrDVL63vcBORo3tMBTEnR9DNW7xQ0XpEO29yG20"
CDC_CRITERIA_5_GOOGLE_WORKBOOK_KEY = "1Jqf6JAm03iM_tSZx6z3gEWOC1l27lri_hZgsJOiQ5Bw"
CDC_CRITERIA_6_GOOGLE_WORKBOOK_KEY = "11NX0rXhwTRahIJASMGUCqnaVH_FUtlyQuWrzgWEf4zc"
CDC_CRITERIA_SUMMARY_GOOGLE_WORKBOOK_KEY = (
"1Lprw-UYnr6DX0rgS1fh2-mxuZWFAXv_ezuDQ2L8sF60"
)
# Define custom workbook keys.
POLICY_VS_TREND_CHARTS_DATA_WORKBOOK_KEY = (
"1gRexnz4AJAIYJ6c5fQXR6ps2EDs7JPHu-CU7-5qE3EI"
)
# Note: if you'd like to run the full pipeline, you'll need to generate a service account keyfile for an account
# that has been given write access to the Google Sheet.
def extract_transform_and_load_covid_data(post_to_google_sheets=True):
"""Runs the entire pipeline to produce data for Covid Exit Strategy data sources.
Workbooks are found in: https://drive.google.com/drive/u/1/folders/15j1iyyJtJ8BmK3y-HO6cLp-7R7nAoSml.
Args:
post_to_google_sheets (bool): whether or not to attempt to post to google sheets; set to False for faster
debugging of data processing
"""
print("Starting to ETL...")
client, credentials = get_sheets_client(
credential_file_path=os.path.abspath(PATH_TO_SERVICE_ACCOUNT_KEY)
)
# TODO(lbrown): Un-comment these when we find a path forward for CDC bed data.
# cdc_beds_current_df = extract_cdc_beds_current_data()
# cdc_beds_historical_df = extract_cdc_beds_historical_data(credentials=credentials)
# transformed_cdc_beds_df = transform_cdc_beds_data(
# cdc_beds_current_df=cdc_beds_current_df,
# cdc_beds_historical_df=cdc_beds_historical_df,
# )
# Upload category 3A data.
# criteria_3_summary_df = calculate_state_summary(
# transformed_df=transformed_cdc_beds_df, columns=CRITERIA_3_SUMMARY_COLUMNS
# )
# post_dataframe_to_google_sheets(
# df=criteria_3_summary_df,
# workbook_key=CDC_CRITERIA_3_GOOGLE_WORKBOOK_KEY,
# tab_name=STATE_SUMMARY_TAB_NAME,
# credentials=credentials,
# )
#
# sleep_and_log()
#
# post_dataframe_to_google_sheets(
# df=transformed_cdc_beds_df,
# workbook_key=CDC_CRITERIA_3_GOOGLE_WORKBOOK_KEY,
# tab_name="Historical Data",
# credentials=credentials,
# )
covidtracking_df = extract_covidtracking_historical_data()
cdc_ili_df = extract_cdc_ili_data()
transformed_cdc_ili_df = transform_cdc_ili_data(ili_df=cdc_ili_df)
transformed_covidtracking_df = transform_covidtracking_data(
covidtracking_df=covidtracking_df
)
# Upload Criteria 1 workbook for all states.
criteria_1_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_1_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=criteria_1_summary_df,
workbook_key=CDC_CRITERIA_1_GOOGLE_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Upload Criteria 2 workbook for all states.
criteria_2_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_2_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=criteria_2_summary_df,
workbook_key=CDC_CRITERIA_2_GOOGLE_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Upload Criteria 5 workbook
# Upload all data tab for Criteria 5.
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=transformed_cdc_ili_df,
workbook_key=CDC_CRITERIA_5_GOOGLE_WORKBOOK_KEY,
tab_name=ALL_STATE_DATA_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Upload state summary tab for Criteria 5.
criteria_5_summary_df = calculate_state_summary(
transformed_df=transformed_cdc_ili_df, columns=CRITERIA_5_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=criteria_5_summary_df,
workbook_key=CDC_CRITERIA_5_GOOGLE_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Upload state summary tab for Criteria 6.
criteria_6_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_6_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=criteria_6_summary_df,
workbook_key=CDC_CRITERIA_6_GOOGLE_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Merge all the summary data frames so that we can create a single summary sheet.
combined_df = functools.reduce(
# Use an inner join so that you'll only get entities that are represented in all criteria.
# E.g., you'll not include Guam, NYC, etc which are reported separately in some of the sources.
lambda left, right: pd.merge(
left, right, on=[STATE_FIELD], how="inner", suffixes=["", "_y"]
),
[
criteria_1_summary_df,
criteria_2_summary_df,
# TODO(lbrown): Un-comment this when we find a path forward for CDC bed data.
# criteria_3_summary_df,
criteria_5_summary_df,
criteria_6_summary_df,
],
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=combined_df.loc[:, CRITERIA_COMBINED_SUMMARY_COLUMNS],
workbook_key=CDC_CRITERIA_SUMMARY_GOOGLE_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
sleep_and_log()
# Calculate and upload state summary tab for Policy vs. Trend Charts.
policy_vs_trend_df = pd.concat(
[
transformed_covidtracking_df.loc[
:, [LAST_RAN_FIELD, LAST_UPDATED_FIELD, STATE_FIELD, DATE_SOURCE_FIELD]
],
transformed_covidtracking_df.filter(regex="pvt-*").rename(
lambda column: column.replace("pvt-", ""), axis="columns"
),
],
axis=1,
)
policy_vs_trend_summary_df = calculate_state_summary(
transformed_df=policy_vs_trend_df
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=policy_vs_trend_summary_df,
workbook_key=POLICY_VS_TREND_CHARTS_DATA_WORKBOOK_KEY,
tab_name=STATE_SUMMARY_TAB_NAME,
credentials=credentials,
)
if __name__ == "__main__":
# Note: for faster debugging during development, you can set `post_to_google_sheets` to `False`.
extract_transform_and_load_covid_data(post_to_google_sheets=True)