-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathONSPD_ETL.py
92 lines (80 loc) · 2.75 KB
/
ONSPD_ETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#
# Open addresses ETL Common Library
# Open addresses ONSPD ETL tool
#
#
# Version 1.0 (Python) in progress
# Author John Murray
# Licence MIT
#
# Purpose Extract elements from ONSPD table
#
import ConfigParser
import csv
import glob
import MySQLdb
import string
from bulkinsert import *
# Read database configuration from config file
config = ConfigParser.ConfigParser()
config.read("oa_alpha_etl.cnf")
username = config.get('database', 'username')
password = config.get('database', 'password')
hostname = config.get('database', 'hostname')
database = config.get('database', 'database')
dbConn = MySQLdb.connect(host=hostname,user=username,passwd=password,db=database)
cur = dbConn.cursor()
query = "TRUNCATE TABLE `ONSPD`;"
cur.execute(query)
ONSPD_fields = ["pcds", "usertype", "EA", "NO", "osgrdind", "ctry", "current"]
ONSPD_bi = BulkInsert(cur,"ONSPD",ONSPD_fields)
query = "TRUNCATE TABLE `ONSPD_Changes`;"
cur.execute(query)
change_fields = ["curr_pcds", "term_pcds"]
change_bi = BulkInsert(cur,"ONSPD_Changes",change_fields)
ret_pcds = []
cur_pcds = []
for file in glob.glob("ONSPD*.csv"):
print file
nrecs = 0
csvfile = open(file, 'rb')
reader = csv.DictReader(csvfile, delimiter=',', quotechar='"')
reader.fieldnames = [field.strip() for field in reader.fieldnames]
for row in reader:
nrecs += 1
if (nrecs % 10000) == 0:
print "Records read: " + str(nrecs)
if 'oseast1m' in row:
if row['doterm'] > "":
current = '0'
if row['oseast1m'] > '':
ret_pcds.append((row['pcds'], row['oseast1m'], row['osnrth1m']))
else:
current = '1'
if row['oseast1m'] > '':
cur_pcds.append((row['pcds'], row['oseast1m'], row['osnrth1m']))
if row['oseast1m'] > '':
lines = [row['pcds'], row['usertype'], row['oseast1m'], row['osnrth1m'], row['osgrdind'], row['ctry'][0], current]
else:
lines = [row['pcds'], row['usertype'], "NULL", "NULL", row['osgrdind'], row['ctry'][0], current]
ONSPD_bi.addRow(lines)
print "Records read: " + str(nrecs)
csvfile.close()
ONSPD_bi.close()
dbConn.commit()
print "Writing changes to database"
nrecs = 0
nwrit = 0
for term_pc in ret_pcds:
nrecs += 1
query = "SELECT `pcds` FROM `ONSPD` WHERE `current` = '1' AND `EA` = '" + term_pc[1] + "' AND `NO` = '" + term_pc[2] + "';"
cur.execute(query)
if (nrecs % 10000) == 0:
print "Changes read: " + str(nrecs) + " written: " + str(nwrit)
if cur.rowcount == 1:
nwrit += 1
for row in cur.fetchall():
change_bi.addRow([row[0],term_pc[0]])
change_bi.close()
dbConn.commit()
dbConn.close()