-
Notifications
You must be signed in to change notification settings - Fork 0
/
geo_filter.py
55 lines (49 loc) · 1.94 KB
/
geo_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# noinspection PyPackageRequirements
import json
import os
import bz2
import multiprocessing
import coloredlogs, logging
import pickle
def worker(q, ):
while True:
path, file_name = q.get()
process_files(path, file_name)
q.task_done()
def process_files(path, file_name):
file_name_no_extension, file_extension = os.path.splitext(file_name)
out_file_name = os.path.join('processed', os.path.relpath(path, start='untarred'), file_name_no_extension)
os.makedirs(os.path.dirname(out_file_name), exist_ok=True)
with open(out_file_name, 'w') as out_file:
with open(os.path.join(path, file_name), 'rb') as file:
decompressed_string = bz2.decompress(file.read())
lines = decompressed_string.decode('utf-8').split('\n')
tweets = []
for line in lines:
try:
if not line.strip():
continue
tweet = json.loads(line)
try:
if tweet['place']['country_code']:
tweets.append(tweet)
except (KeyError, TypeError):
pass
except Exception as e:
logging.exception(e)
out_file.write(json.dumps(tweets))
logging.info('finished ' + os.path.join(path, file_name))
if __name__ == '__main__':
coloredlogs.install()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
q = multiprocessing.JoinableQueue()
os.makedirs('processed', exist_ok=True)
for i in range(os.cpu_count() - 1):
p = multiprocessing.Process(target=worker, args=(q, ), daemon=True)
p.start()
for path, dirs, files in os.walk('untarred'):
for file_name in files:
if file_name.endswith('.json.bz2'):
q.put((path, file_name))
q.join()
print('done, safe to ctrl-c if it does not exit automatically')