-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsuperquicktest.py
39 lines (34 loc) · 1.96 KB
/
superquicktest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# This script can be run to pull the last hour of parking data through
# the process_data script and (with the argument push_to_CKAN = True
# in that function call), cause the resulting data to be pushed to
# the CKAN instance specified by modules and settings called by process_data.
# The main() function returns a Boolean indicating whether this operation
# succeeded or failed. In this way, this function can still be called by
# some kind of pipeline/job manager that can send out notifications if a
# particular ETL job fails.
import sys, pytz
from datetime import datetime, timedelta
import process_data
def main(use_cache):
pgh = pytz.timezone('US/Eastern')
slot_width = process_data.DEFAULT_TIMECHUNK.seconds
slot_start = process_data.round_time(datetime.now(pgh) - timedelta(minutes=10), slot_width)
halting_time = datetime.now(pgh) #process_data.round_time(datetime.now(pgh), slot_width)
slot_start = pgh.localize(process_data.beginning_of_day(datetime(2012,7,23,0,0)))
# slot_start = pgh.localize(process_data.beginning_of_day(datetime(2016,9,29,0,0)))
halting_time = slot_start+timedelta(hours = 24)
slot_start = pgh.localize(datetime(2013,1,23,11,45,18))
halting_time = slot_start+timedelta(seconds=1) # This is now catching zero transactions,
# probably because of a change in reference field.
halting_time = slot_start+timedelta(seconds=10)
script_start = datetime.now()
print("Started processing at {}.".format(script_start))
success = process_data.main(use_cache=use_cache, output_to_csv = True, push_to_CKAN = False, slot_start = slot_start, halting_time = halting_time, threshold_for_uploading = 100, timechunk = timedelta(seconds=1))
print("Started processing at {} and finished at {}.".format(script_start,datetime.now()))
return success
if __name__ == '__main__':
use_cache = False
if len(sys.argv) > 1:
if sys.argv[1] == 'use_cache':
use_cache = True
main(use_cache=use_cache)