-
Notifications
You must be signed in to change notification settings - Fork 0
/
xtract_tabular_main.py
621 lines (486 loc) · 21.2 KB
/
xtract_tabular_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
import pandas as pd
import os
import xlrd
import csv
import math
import argparse
import time
import multiprocessing as mp
# Minimum number of rows to analyze.
# Minimum number of mode values to include for non-numeric metadata.
MIN_ROWS = 5
MODE_COUNT = 10
def execute_extractor(filename, chunksize=10000, parallel=False):
"""Get metadata from .csv files.
Put more detailed explanation here.
Parameters:
filename (file path): Path to .csv file.
Returns:
grand_mdata (put type here): Place description here.
"""
t0 = time.time()
grand_mdata = {"physical": {}, "numeric": {}, "nonnumeric": {}}
if filename.endswith(".xls") or filename.endswith(".xlsx"):
filename = excel_to_csv(filename)
# TODO: Try catching everything through line_count and excepting w/ utf-8 open?
with open(filename, 'r') as data2:
# Step 1. Quick scan for number of lines in file.
line_count = 0
try:
for _ in data2: # TODO: unicode error?
line_count += 1
except UnicodeDecodeError as e:
return {'tabular': {'error': "e"}, 'extract_time': time.time() - t0}
try:
delimiter = get_delimiter(filename, line_count)
except TypeError as e:
meta = {'tabular': {'error': e}, 'extract_time': time.time() - t0}
return meta
except IndexError as e:
meta = {'tabular': {'error': e}, 'extract_time': time.time() - t0}
return meta
# Step 3. Isolate the header data.
header_info = get_header_info(data2, delim=delimiter)
freetext_offset = header_info[0] # TODO: see below. Return 'None'?
header_col_labels = header_info[1]
# TODO: Added left half of if-statement to quell following error:
# "TypeError '>' not supported b/t instances of NoneType and Int
if freetext_offset is not None and freetext_offset > 0:
data2.seek(0)
grand_mdata["physical"]["preamble"] = data2.readlines()[
:freetext_offset]
if header_col_labels is not None:
grand_mdata["physical"]["headers"] = header_col_labels
else:
grand_mdata["physical"]["headers"] = None
if freetext_offset == None:
freetext_offset=0
grand_mdata["physical"]["data_rows"] = line_count - freetext_offset
grand_mdata["physical"]["total_rows"] = line_count
# Step 4. Create dataframes from structured data.
dataframes = get_dataframes(filename, chunksize=chunksize,
delim=delimiter,
skip_rows=freetext_offset + 1)
data2.close()
# Extract values from dataframe in parallel
if parallel:
df_metadata = parallel_df_extraction(dataframes, header_col_labels)
else:
df_metadata = []
for chunk in dataframes:
df_metadata.append(extract_dataframe_metadata(chunk, header_col_labels))
# Now REDUCE metadata by iterating over dataframes.
# Numeric grand aggregates
g_means = {}
g_three_max = {}
g_three_min = {}
g_num_rows = {}
# Nonnumeric grand aggregates
g_modes = {}
for md_piece in df_metadata:
# First, we want to aggregate our individual pieces of
# NUMERIC metadata
if "numeric" in md_piece:
col_list = md_piece["numeric"]
# For every column-level dict of numeric data...
for col in col_list:
# TODO: Create rolling update of mean, maxs, mins, Should, in theory, be faster
# row_counts.
if col["col_id"] not in g_means:
g_means[col["col_id"]] = col["metadata"]["mean"]
g_three_max[col["col_id"]] = {}
g_three_max[col["col_id"]]["max_n"] = col["metadata"][
"max_n"]
g_three_min[col["col_id"]] = {}
g_three_min[col["col_id"]]["min_n"] = col["metadata"][
"min_n"]
g_num_rows[col["col_id"]] = {}
g_num_rows[col["col_id"]]["num_rows"] = col["metadata"][
"num_rows"]
else:
g_means[col["col_id"]] += col["metadata"]["mean"]
g_three_max[col["col_id"]]["max_n"].extend(
col["metadata"]["max_n"])
g_three_min[col["col_id"]]["min_n"].extend(
col["metadata"]["min_n"])
g_num_rows[col["col_id"]]["num_rows"] += col["metadata"][
"num_rows"]
if "nonnumeric" in md_piece:
col_list = md_piece["nonnumeric"]
# Do the 'reduce' part of mapreduce.
for col in col_list: # 0 b/c held as single-elem list.
for k in col:
col_modes = col[k]['top3_modes']
if k not in g_modes:
g_modes[k] = {'topn_modes': {}}
for mode_key in col_modes:
pass
if mode_key not in g_modes:
g_modes[k]['topn_modes'][mode_key] = {}
g_modes[k]['topn_modes'][mode_key] = col_modes[
mode_key]
else:
try:
g_modes[k]['topn_modes'][mode_key] += col_modes[mode_key]
except:
pass
nonnum_count = len(g_modes)
num_count = len(g_means)
grand_mdata["physical"]["total_cols"] = nonnum_count + num_count
# Just use the g_means key, because its keys must appear in all
# summary stats anyways.
for col_key in g_means:
grand_mdata["numeric"][col_key] = {}
grand_mdata["numeric"][col_key]["mean"] = float(
g_means[col_key] / g_num_rows[col_key]["num_rows"])
grand_mdata["numeric"][col_key]["num_rows"] = g_num_rows[col_key][
"num_rows"]
sorted_max = sorted(g_three_max[col_key]["max_n"], reverse=True)
sorted_min = sorted(g_three_min[col_key]["min_n"], reverse=False)
grand_mdata["numeric"][col_key]["max_n"] = sorted_max[:3]
grand_mdata["numeric"][col_key]["min_n"] = sorted_min[:3]
for col_key in g_modes:
all_modes = g_modes[col_key]['topn_modes']
top_modes = sorted(all_modes, key=all_modes.get, reverse=True)[
:MODE_COUNT]
grand_mdata["nonnumeric"][col_key] = {}
grand_mdata["nonnumeric"][col_key]['topn_modes'] = top_modes
meta = {"tabular": grand_mdata}
t1 = time.time()
meta.update({"extract time": (t1 - t0)})
return meta
def extract_dataframe_metadata(df, header):
"""Extracts metadata from Panda dataframe.
Extracts the number of rows, three largest values, three smallest
values, and mean of each column of Panda dataframe. Additionally
extracts mode of non-numeric data values.
Parameters:
df (Panda dataframe): Panda dataframe of .csv file.
header (list(str)): List of fields of column headers.
Return:
df_metadata (dictionary(str : tuple)): Dictionary containing tuple
of numeric and non-numeric metadata from Panda dataframe.
"""
# Get only the numeric columns in data frame.
ndf = df._get_numeric_data()
# Get only the string columns in data frame.
sdf = df.select_dtypes(include=[object])
ndf_tuples = []
for col in ndf:
largest = df.nlargest(3, columns=col, keep='first')
smallest = df.nsmallest(3, columns=col, keep='first')
the_mean = ndf[col].mean()
col_maxs = largest[col]
col_mins = smallest[col]
maxn = []
minn = []
for maxnum in col_maxs:
maxn.append(maxnum)
for minnum in col_mins:
minn.append(minnum)
if header is not None:
ndf_tuple = {"col_id": header[col],
"metadata": {"num_rows": len(ndf), "min_n": minn,
"max_n": maxn, "mean": the_mean}}
else:
ndf_tuple = {"col_id": "__{}__".format(col),
"metadata": {"num_rows": len(ndf), "min_n": minn,
"max_n": maxn, "mean": the_mean}}
ndf_tuples.append(ndf_tuple)
# TODO: Repeated column names? They would just overwrite.
nonnumeric_metadata = []
top_modes = {}
# Now get the nonnumeric data tags.
for col in sdf:
# Mode tags represent the three most prevalent values from each
# paged dataframe.
nonnumeric_top_3_df = sdf[col].value_counts().head(3)
col_modes = {}
for row in nonnumeric_top_3_df.iteritems():
col_modes[row[0]] = row[1]
if header is not None:
top_modes[header[col]] = {"top3_modes": col_modes}
else:
top_modes["__{}__".format(col)] = {"top3_modes": col_modes}
nonnumeric_metadata.append(top_modes)
df_metadata = {"numeric": ndf_tuples, "nonnumeric": nonnumeric_metadata}
return df_metadata
def parallel_df_extraction(df, header, parallel=mp.cpu_count()):
"""Extracts dataframe metadata in parallel.
Parameters:
df (Panda dataframe): Panda dataframe.
header (list(str)): List of fields in header of data columns.
parallel (int): Number of processes to create to process dataframes.
It is recommended that parallel <= number of CPU cores.
Returns:
combined_df_metadata (dictionary(str : tuple)): Dictionary
containing tuple of numeric and non-numeric metadata from Panda
dataframe.
"""
pools = mp.Pool(processes=parallel)
df_metadata = []
for chunk in df:
df_metadata = [pools.apply_async(extract_dataframe_metadata,
args=(chunk, header))]
combined_df_metadata = [p.get() for p in df_metadata]
pools.close()
pools.join()
return combined_df_metadata
def excel_to_csv(excel_file):
wb = xlrd.open_workbook(excel_file)
sh = wb.sheet_by_index(0)
try:
os.mkdir(os.path.join(os.getcwd(), 'excel_files'))
except:
pass
csv_file_name = os.path.join((os.path.join(os.getcwd(), 'excel_files'))
, os.path.splitext(os.path.basename(excel_file))[0] + ".csv")
csv_file = open(csv_file_name, 'w')
wr = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
for rownum in range(sh.nrows):
try:
wr.writerow(sh.row_values(rownum))
except:
pass
csv_file.close()
return csv_file_name
def get_delimiter(filename, numlines):
"""Finds delimiter in .csv file.
Parameters:
filename (file path): File path to .csv file.
numlines (int): Number of lines in .csv file.
Returns:
delims[0] (str): Delimiter of .csv file.
Raises:
Non-Uniform Delimiter: Raises if delimiter within file is not
constant.
No columns to parse from file: Raises if unable to turn .csv file
into Panda dataframe.
"""
# Step 1: Check whether filename can be converted to Panda dataframe
try:
pd.read_csv(filename, skiprows=numlines - MIN_ROWS,
error_bad_lines=False)
except pd.errors.EmptyDataError:
raise TypeError("No columns to parse from file")
# Step 2: Get the delimiter of the last n lines.
s = csv.Sniffer()
with open(filename, 'r') as fil:
i = 1
delims = []
for line in fil:
if numlines - MIN_ROWS > i > numlines - (MIN_ROWS + 3) and ('=' not in line):
if line.count('\t') > line.count(','):
return '\t'
else:
delims.append(s.sniff(line).delimiter)
i += 1
if delims.count(delims[0]) == len(delims):
return delims[0]
else:
raise TypeError("Non-Uniform Delimiter")
# TODO: Check why header and dataframe_size are called then hardcoded
def get_dataframes(filename, delim, chunksize, skip_rows=0):
"""Creates a Panda dataframe from a .csv file.
Parameters:
filename (file path): File path to .csv file.
header (str): Header for each column.
delim (str): Delimiter of .csv file.
skip_rows (int): Number of rows to skip over in .csv file to avoid
preamble.
dataframe_size (int): Size of each dataframe.
Returns:
iter_csv (Panda dataframe): Panda dataframe of filename.
"""
iter_csv = pd.read_csv(filename, sep=delim, chunksize=chunksize,
header=None, skiprows=skip_rows,
error_bad_lines=False, iterator=True)
return iter_csv
def count_fields(dataframe):
""" Return the number of columns in our dataframe"""
return dataframe.shape[1]
# Currently assuming short freetext headers.
def get_header_info(data, delim):
"""Retrieves length of preamble and headers of data columns in .csv
file.
Parameters:
data (.csv file): .csv file.
delim (str): Delimiter of data parameter (.csv file).
Returns:
preamble_length (int): Length of preamble.
header (list(str)): List of fields in header of data columns.
"""
data.seek(0)
# TODO: Why are we getting line count a second time?
line_count = 0
for _ in data:
line_count += 1
# Figure out the length of file via binary search)
if line_count >= 5: # Set min value or bin-search not useful.
# A. Get the length of the preamble.
preamble_length = _get_preamble(data, delim)
# B. Determine whether the next line is a freetext header
data.seek(0)
header = None
for i, line in enumerate(data):
if preamble_length is None:
header = None
break
if i == preamble_length:
has_header = is_header_row(fields(line, delim))
if has_header: # == True
header = fields(line, delim)
else:
header = None
elif i > preamble_length:
break
return preamble_length, header
def is_header_row(row):
"""Determines whether a row is a header row by checking whether all
fields are non-numeric.
Parameters:
row (list(str)): List of fields (str) in a row.
Returns:
(bool): Whether row is a header row.
"""
for field in row:
if is_number(field):
return False
return True
def _get_preamble(data, delim):
"""Finds the line number of the last line of free-text preamble of
.csv file.
Parameters:
data (.csv file): .csv file with preamble.
delim (str): Delimiter of data parameter (.csv file).
Return:
last_preamble_line_num (int): Line number of the last line of
preamble.
Restrictions:
Currently can only find last line of preamble when delimiter is a
comma or tab.
"""
data.seek(0)
max_nonzero_row = None
max_nonzero_line_count = None
last_preamble_line_num = None
# Get number of delimited columns in last nonempty row and row
# number
delim_counts = {}
for i, line in enumerate(data):
cur_line_field_count = len(line.split(delim))
if cur_line_field_count != 0:
delim_counts[i] = cur_line_field_count
max_nonzero_row = i
max_nonzero_line_count = cur_line_field_count
# Now if the last three values are all the same...
if (delim_counts[max_nonzero_row - MIN_ROWS] == delim_counts[max_nonzero_row - MIN_ROWS + 1]
== delim_counts[max_nonzero_row - MIN_ROWS + 2]):
# Now binary-search from the end to find the last row with that
# number of columns.
starting_row = math.floor((max_nonzero_row - 2)/2)
last_preamble_line_num = _last_preamble_line_bin_search(
delim_counts,
max_nonzero_line_count,
starting_row,
upper_bd=0,
lower_bd=max_nonzero_row - 2) # TODO: why -2? Guess: avoid EOF \n character.
return last_preamble_line_num
def _last_preamble_line_bin_search(field_cnt_dict, target_field_num, cur_row,
upper_bd=None, lower_bd=None):
"""Performs binary search to find the last line number of preamble.
Performs a binary search on dictionary to find the last line number
of preamble. Preamble is differentiated from data by comparing
the number of delimiters in each line.
Parameters:
field_cnt_dict (dictionary(int : int): Dictionary of line number
paired with number of delimiters in line.
target_field_num (int): Number of delimiters in non-preamble lines
(lines of data).
cur_row (float or int): Current line number in binary search.
upper_bd (int): Upper boundary of binary search.
lower_bd (int): Lower boundary of binary search.
Returns:
(int): Line number of last line of preamble.
Restrictions:
Currently can only perform a binary search when delimiter is a comma
or tab.
"""
cur_row = math.floor(cur_row)
# NOTE: To debug any preamble/header issues, start here and print the inputs.
# This is a BAND-AID: This is because both lower and upper bound can somehow get below the current_val.
if abs(cur_row - upper_bd) < 1 and abs(cur_row - lower_bd) < 1:
print(f"Termination condition! Current row: {cur_row}")
return cur_row + 1
# Check current row and next two to see if they are all the target value.
if field_cnt_dict[cur_row] == field_cnt_dict[cur_row + 1] == field_cnt_dict[cur_row + 2] == target_field_num:
# If so, then we want to move up in the file.
new_cur_row = cur_row - math.floor((cur_row - upper_bd) / 2)
# If we're in the first row, we should return here.
if cur_row == 1 and field_cnt_dict[cur_row - 1] \
== field_cnt_dict[cur_row] \
== target_field_num:
return 0
elif cur_row == 1 and field_cnt_dict[cur_row - 1] != target_field_num:
return 1
else:
recurse = _last_preamble_line_bin_search(field_cnt_dict,
target_field_num,
new_cur_row,
upper_bd=upper_bd,
lower_bd=cur_row)
return recurse
elif field_cnt_dict[cur_row] == field_cnt_dict[cur_row + 1] == target_field_num:
return cur_row + 1
# If not, then we want to move down in the file.
else:
new_cur_row = cur_row + math.floor((lower_bd - cur_row) / 2)
if cur_row == new_cur_row:
return cur_row + 1
recurse = _last_preamble_line_bin_search(field_cnt_dict,
target_field_num, new_cur_row,
upper_bd=cur_row,
lower_bd=lower_bd)
return recurse
def fields(line, delim):
"""Splits a line along the delimiters into a list of fields.
Parameters:
line (str): Line from a .csv file.
delim (str): Delimiter of .csv file.
Returns:
fields (list(str)): List of individual fields.
"""
column_fields = [field.strip(' \n\r\t') for field in line.split(delim)]
return column_fields
def is_number(field):
"""Determines whether a string is numeric by attempting to cast it
as a float.
Parameters:
field (str): Field from a row of .csv file.
Returns:
(boolean): Whether field can be cast to a float.
# """
try:
float(field)
return True
except ValueError:
return False
if __name__ == "__main__":
"""Takes file paths from command line and returns metadata.
Arguments:
--path (File path): File path of .csv file.
Returns:
meta (insert type here): Metadata of .csv file.
t1 - t0 (float): Time it took to retrieve .csv metadata.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--path', help='File system path to file.',
required=True)
parser.add_argument('--chunksize',
help='Number of rows to process at once.',
required=False, type=int, default=10000)
parser.add_argument('--multiprocess',
required=False, default=False)
args = parser.parse_args()
meta = execute_extractor(args.path, chunksize=args.chunksize, parallel=args.multiprocess)
print(meta)