-
Notifications
You must be signed in to change notification settings - Fork 6
/
tasks.py
151 lines (131 loc) · 5.12 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from invoke import task, exceptions
import dwetl.database_credentials
import pdb
@task
def sheets_to_json(c):
"""
Generates the JSON files in the table_config/ subdirectory from Google Sheets.
"""
c.run('cd scripts; python sheetstojson.py', pty=True)
@task
def database_reset(c):
"""
Resets the application database. WARNING: This will DESTORY any existing data in the database.
"""
db_settings = dwetl.database_credentials.db_settings()
db_user = db_settings['DB_USER']
db_name = db_settings['DB_NAME']
db_host = db_settings['DB_HOST_NAME']
db_port = db_settings['DB_PORT']
db_password = db_settings['DB_PASSWORD']
reset_database(c, db_host, db_name, db_port, db_user, db_password)
@task
def test_database_reset(c):
"""
Resets the test database.
WARNING: This task will DESTROY any existing data in the test
database.
"""
test_db_settings = dwetl.database_credentials.test_db_settings()
test_db_user = test_db_settings['TEST_DB_USER']
test_db_name = test_db_settings['TEST_DB_NAME']
test_db_host = test_db_settings['TEST_DB_HOST_NAME']
test_db_port = test_db_settings['TEST_DB_PORT']
test_db_password = test_db_settings['TEST_DB_PASSWORD']
reset_database(c, test_db_host, test_db_name, test_db_port, test_db_user, test_db_password)
@task
def run_migration(c, sql_file):
"""
runs sql file for db migration
"""
db_settings = dwetl.database_credentials.db_settings()
db_user = db_settings['DB_USER']
db_host = db_settings['DB_HOST_NAME']
db_port = db_settings['DB_PORT']
db_password = db_settings['DB_PASSWORD']
db_name = db_settings['DB_NAME']
pg_password=f'PGPASSWORD={db_password} '
psql_cmd = f'psql -U {db_user} -h {db_host} -p {db_port} -d {db_name} -f {sql_file}'
if c.run(pg_password + psql_cmd):
print('-----------')
print('SQL migration successful.')
else:
raise Exception()
print('An error has occurred')
@task
def run_test_migration(c, sql_file):
"""
runs sql file for test db migration
"""
test_db_settings = dwetl.database_credentials.test_db_settings()
db_user = test_db_settings['TEST_DB_USER']
db_name = test_db_settings['TEST_DB_NAME']
db_host = test_db_settings['TEST_DB_HOST_NAME']
db_port = test_db_settings['TEST_DB_PORT']
db_password = test_db_settings['TEST_DB_PASSWORD']
pg_password=f'PGPASSWORD={db_password} '
psql_cmd = f'psql -U {db_user} -h {db_host} -p {db_port} -d {db_name} -f {sql_file}'
if c.run(pg_password + psql_cmd):
print('-----------')
print('SQL migration successful.')
else:
raise Exception()
print('An error has occurred')
@task
def update_db_ddl(c):
"""
Creates a ddl from the entire db in usmai_dw_etl.sql
"""
db_settings = dwetl.database_credentials.db_settings()
db_user = db_settings['DB_USER']
db_host = db_settings['DB_HOST_NAME']
db_port = db_settings['DB_PORT']
db_password = db_settings['DB_PASSWORD']
db_name = db_settings['DB_NAME']
pg_password=f'PGPASSWORD={db_password} '
psql_cmd = f'pg_dump -U {db_user} -h {db_host} -p {db_port} {db_name} -Fp --create --clean --schema-only -f ddl/usmai_dw_etl.sql'
if c.run(pg_password + psql_cmd):
print('-----------')
print('Updated usmai_dw_etl.sql')
else:
raise Exception()
print('An error has occurred')
# Task helper function
def reset_database(context, db_host, db_name, db_port, db_user, db_password):
pg_password=f'PGPASSWORD={db_password} '
psql_cmd = f'psql -U {db_user} -d postgres --host={db_host} --port={db_port}'
ask_for_confirmation = True
if (db_host == 'localhost' or db_host == '127.0.0.1') and db_port == '5432':
# Assume localhost:5432 and 127.0.0.1:5432 are local Postgres
ask_for_confirmation = True
if ask_for_confirmation:
confirm = input(f"Are you sure you want to reset {db_name} {db_host}:{db_port}? (y/n) ")
confirm = confirm.lower()
if not((confirm == 'y') or (confirm == 'yes')):
print("Database has not been reset.")
exit()
print(f'Resetting {db_name} database at {db_host}:{db_port}')
print('-----------')
print('Terminating sessions and dropping database')
if db_name == 'usmai_dw_etl_test':
terminate_sessions = psql_cmd + ' -f ddl/drop_db_test.sql'
else:
terminate_sessions = psql_cmd + ' -f ddl/drop_db.sql'
print('\t' + terminate_sessions)
if context.run(pg_password + terminate_sessions):
print('-----------')
print('Sessions terminated')
else:
raise Exception()
print('An error has occurred')
# If resetting the test dabase,
# create a test db DDL from the usmai_dw_etl.sql to keep it up to date.
if db_name == 'usmai_dw_etl_test':
print('Creating database from usmai_dw_etl_test.sql DDL')
load_ddl = psql_cmd + ' -f ddl/usmai_dw_etl_test.sql'
else:
load_ddl = psql_cmd + ' -f ddl/usmai_dw_etl.sql'
print(load_ddl)
if context.run(pg_password + load_ddl):
print('-----------')
print('Database reset')