-
Notifications
You must be signed in to change notification settings - Fork 1
/
pyunlocode.py
247 lines (217 loc) · 7.86 KB
/
pyunlocode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python
#
# Create on : 2015/04/19
#
# @author : Falldog
#
import os
import parser
import sqlite3
from os.path import join
CURDIR = os.path.abspath(os.path.dirname(__file__))
CSVDIR = join(CURDIR, 'csv')
DB_PATH = join(CURDIR, 'unlocode.db')
class PyUnLocode():
"""
Download from : http://www.unece.org/cefact/codesfortrade/codes_index.html
Column Spec : http://www.unece.org/fileadmin/DAM/cefact/locode/Service/LocodeColumn.htm
"""
def __init__(self):
self.conn = None
def init(self, db_path=None):
if not db_path:
db_path = DB_PATH
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row # access query result as dict
c = self.conn.cursor()
c.executescript('''
CREATE TABLE IF NOT EXISTS country (
code text,
name text,
PRIMARY KEY (code)
);
CREATE TABLE IF NOT EXISTS subdivision (
country_code text,
subdivision_code text,
name text,
PRIMARY KEY (country_code, subdivision_code)
);
CREATE TABLE IF NOT EXISTS location (
country_code text,
location_code text,
name text,
subdivision text,
status text,
iata text,
coordinate text,
remark text,
is_port int,
is_airport int,
is_road_terminal int,
is_rail_terminal int,
is_postal_exchange_office int,
is_border_cross int,
PRIMARY KEY (country_code, location_code)
);
CREATE UNIQUE INDEX IF NOT EXISTS location_index ON location(
country_code,
location_code,
name,
is_port,
is_airport
);
''')
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
self.conn = None
def get_all_country(self):
c = self.conn.cursor()
c.execute('SELECT * FROM country')
r = c.fetchall()
c.close()
return r
def get_all_subdivision(self):
c = self.conn.cursor()
c.execute('SELECT * FROM subdivision')
r = c.fetchall()
c.close()
return r
def get_all_location(self):
c = self.conn.cursor()
c.execute('SELECT * FROM location')
r = c.fetchall()
c.close()
return r
def get_country_name(self, code):
""" return None if could not found """
c = self.conn.cursor()
c.execute('SELECT name FROM country WHERE code = ?', (code,))
r = c.fetchone()
c.close()
return r[0] if r else None
def get_iata_location(self, code):
"""
IATA location may not be defined as airport
reference : https://en.wikipedia.org/wiki/UN/LOCODE
"""
if len(code) != 3:
raise ValueError
c = self.conn.cursor()
c.execute("""
SELECT country_code, location_code, name
FROM location
WHERE (location_code=? AND is_airport=1) OR (iata=? AND is_airport=1)
""",
(code, code))
r = c.fetchall()
c.close()
return r
def get_location_name(self, country_code, location_code):
""" return None if could not found """
c = self.conn.cursor()
c.execute('SELECT name FROM location WHERE country_code = ? AND location_code = ?', (country_code, location_code))
r = c.fetchone()
c.close()
return r[0] if r else None
def search_country_name_like(self, name):
""" return [] if could not found """
c = self.conn.cursor()
c.execute('SELECT * FROM country WHERE name LIKE "%%%s%%"' % name)
ret = c.fetchall()
c.close()
return ret
def search_location_name_like(self, name):
""" return [] if could not found """
c = self.conn.cursor()
name = name.replace("'", "''")
c.execute("SELECT * FROM location WHERE name LIKE '%%%s%%'" % name)
ret = c.fetchall()
c.close()
return ret
def search_port_name_like(self, name):
""" return [] if could not found """
c = self.conn.cursor()
name = name.replace("'", "''")
c.execute("""
SELECT country_code, location_code, name, subdivision, is_port
FROM location
WHERE name LIKE '%%%s%%' AND is_port=1
""" % name)
ret = c.fetchall()
c.close()
return ret
def gen_from_csv(self):
c = self.conn.cursor()
p_code = parser.CodeParser()
p_sub = parser.SubdivisionParser()
for filename in os.listdir(CSVDIR):
if os.path.splitext(filename)[1] != '.csv':
continue
if 'UNLOCODE' in filename:
p_code.parse(c, join(CSVDIR, filename))
elif 'Subdivision' in filename:
p_sub.parse(c, join(CSVDIR, filename))
else:
print 'skip unknow file : %s' % filename
self.conn.commit()
c.close()
def analytics(self, country=None):
if country:
country_limit = " AND country_code='%s'" % country
country_limit_where = " WHERE country_code='%s'" % country
else:
country_limit = ''
country_limit_where = ''
c = self.conn.cursor()
c.execute('SELECT COUNT(*) FROM country')
country_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM subdivision' + country_limit_where)
subdivision_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location' + country_limit_where)
location_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_airport=1' + country_limit)
airport_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_port=1' + country_limit)
port_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_road_terminal=1' + country_limit)
road_terminal_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_rail_terminal=1' + country_limit)
rail_terminal_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_postal_exchange_office=1' + country_limit)
postal_exchange_office_count = c.fetchone()[0]
c.execute('SELECT COUNT(*) FROM location WHERE is_border_cross=1' + country_limit)
border_cross_count = c.fetchone()[0]
c.close()
print '============= BEGIN ============='
print 'country count = %d' % country_count
if country:
print '*** search country : "%s" ***' % country
print 'subdivision count = %d' % subdivision_count
print 'location count = %d' % location_count
print 'port count = %d' % port_count
print 'airport count = %d' % airport_count
print 'road terminal count = %d' % road_terminal_count
print 'rail terminal count = %d' % rail_terminal_count
print 'postal exchange office count = %d' % postal_exchange_office_count
print 'border cross count = %d' % border_cross_count
print '============= END ============='
def main():
try:
u = PyUnLocode()
u.init()
u.gen_from_csv()
u.analytics()
u.analytics('TW')
print u.get_country_name('US')
print u.get_location_name('TW', 'TPE')
r = u.search_location_name_like('LOS ANGELES')
for c in r:
print "code:%s%s name:%s" % (c['country_code'], c['location_code'], c['name'])
u.close()
except:
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()