-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcsv2parquet
executable file
·338 lines (308 loc) · 12.7 KB
/
csv2parquet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#!/usr/bin/env python3
import argparse
import atexit
import csv
import os
import shutil
import string
import subprocess
import sys
import tempfile
HELP='''
csv_input is a CSV file, whose first line defines the column names.
parquet_output is the Parquet output (i.e., directory in which one or
more Parquet files are written.)
When used, --column-map must be followed by an even number of
strings, constituting the key-value pairs mapping CSV to Parquet
column names:
csv2parquet data.csv data.parquet --column-map "CSV Column Name" "Parquet Column Name"
To provide types for columns, use --types:
csv2parquet data.csv data.parquet --types "CSV Column Name" "INT"
See documentation (README.md in the source repo) for more information.
'''.strip()
# True iff we are to preserve temporary files.
# Globally set to true in debug mode.
global preserve
preserve = False
DRILL_OVERRIDE_TEMPLATE = '''
drill.exec: {
sys.store.provider: {
# The following section is only required by LocalPStoreProvider
local: {
path: "${local_base}",
write: true
}
},
tmp: {
directories: ["${local_base}"],
filesystem: "drill-local:///"
},
sort: {
purge.threshold : 100,
external: {
batch.size : 4000,
spill: {
batch.size : 4000,
group.size : 100,
threshold : 200,
directories : [ "${local_base}/spill" ],
fs : "file:///"
}
}
},
}
'''
# exceptions
class CsvSourceError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
class DrillScriptError(CsvSourceError):
def __init__(self, returncode):
super().__init__(returncode)
self.returncode = returncode
class InvalidColumnNames(CsvSourceError):
pass
# classes
class Column:
def __init__(self, csv, parquet, type):
self.csv = csv
self.parquet = parquet
self.type = type
def __eq__(self, other):
return \
self.csv == other.csv and \
self.parquet == other.parquet and \
self.type == other.type
def line(self, index):
if self.type is None:
return 'columns[{}] as `{}`'.format(index, self.parquet)
# In Drill, if a SELECT query has both an OFFSET with a CAST,
# Drill will apply that cast even to columns that are
# skipped. For a headerless CSV file, we could just use
# something like:
#
# CAST(columns[{index}] as {type}) as `{parquet_name}`
#
# But if the header line is present, this causes the entire
# conversion to fail, because Drill attempts to cast the
# header (e.g., "Price") to the type (e.g., INT), triggering a
# fatal error. So instead we must do:
#
# CASE when columns[{index}]='{csv_name}' then CAST(NULL AS {type}) else ...
#
# I really don't like this, because it makes it possible for
# data corruption to hide. If a cell should contain a number,
# but instead contains a non-numeric string, that should be a
# loud, noisy error which is impossible to ignore. However, if
# that happens here, and you are so unlucky that the corrupted
# value happens to equal the CSV column name, then it is
# silently nulled out. This is admittedly very unlikely, but
# that's not the same as impossible. If you are reading this
# and have an idea for a better solution, please contact the
# author (see README.md).
return "CASE when columns[{index}]='{csv_name}' then CAST(NULL AS {type}) else CAST(columns[{index}] as {type}) end as `{parquet_name}`".format(
index=index, type=self.type, parquet_name=self.parquet, csv_name=self.csv)
class Columns:
def __init__(self, csv_columns: list, name_map: dict, type_map: dict):
self.csv_columns = csv_columns
self.name_map = name_map
self.type_map = type_map
self.items = []
invalid_names = []
for csv_name in self.csv_columns:
parquet_name = self.name_map.get(csv_name, csv_name)
type_name = self.type_map.get(csv_name, None)
if not is_valid_parquet_column_name(parquet_name):
invalid_names.append(parquet_name)
self.items.append(Column(csv_name, parquet_name, type_name))
if len(invalid_names) > 0:
raise InvalidColumnNames(invalid_names)
def __iter__(self):
return iter(self.items)
class CsvSource:
def __init__(self, path: str, name_map: dict = None, type_map: dict = None):
if name_map is None:
name_map = {}
if type_map is None:
type_map = {}
self.path = os.path.realpath(path)
self.headers = self._init_headers()
self.columns = Columns(self.headers, name_map, type_map)
def _init_headers(self):
with open(self.path, newline='') as handle:
csv_data = csv.reader(handle)
return next(csv_data)
class TempLocation:
_tempdir = None
def __init__(self):
drive, path = os.path.splitdrive(self.tempdir)
assert drive == '', 'Windows support not provided yet'
assert path.startswith('/tmp/'), self.tempdir
self.dfs_tmp_base = path[len('/tmp'):]
def dfs_tmp_path(self, path: str):
return os.path.join(self.dfs_tmp_base, path)
def full_path(self, path: str):
return os.path.join(self.tempdir, path)
@property
def tempdir(self):
if self._tempdir is None:
self._tempdir = tempfile.mkdtemp(prefix='/tmp/')
if preserve:
print('Preserving logs and intermediate files: ' + self._tempdir)
else:
atexit.register(shutil.rmtree, self._tempdir)
return self._tempdir
class DrillInstallation:
'''Create a temporary, custom Drill installation
Even in embedded mode, Drill runs in a stateful fashion, storing
its state in (by default) /tmp/drill. This poses a few problems
for running csv2parquet in ways that are (a) robust, and (b)
certain not to affect any other Drill users, human or machine.
This would be an easy problem to solve if I could create a custom
conf/drill-overrides.conf, and pass it to drill-embedded as a
command line option. However, as of Drill 1.4, the only way to do
such customization is to modify the actual drill-overrides.conf
file itself, before starting drill-embedded.
This class gets around this through an admittedly unorthodox hack:
creating a whole parallel Drill installation under a temporary
directory, with a customized conf/ directory, but reusing (via
symlinks) everything else in the main installation. This lets us
safely construct our own drill configuration, using its own
separate equivalent of /tmp/drill (etc.), and which is all cleaned
up after the script exits.
(Feature request to anyone on the Drill team reading this: Please
give drill-embedded a --with-override-file option!)
'''
def __init__(self, reference_executable:str=None):
self.location = TempLocation()
if reference_executable is None:
reference_executable = shutil.which('drill-embedded')
assert reference_executable is not None
self.reference_executable = reference_executable
self.reference_base, self.bindir = os.path.split(os.path.dirname(reference_executable))
self.install()
@property
def base(self):
return os.path.join(self.location.tempdir, 'drill')
@property
def local_base(self):
return os.path.join(self.location.tempdir, 'drill-local-base')
@property
def executable(self):
return os.path.join(self.base, self.bindir, 'drill-embedded')
def install(self):
# create required subdirs
for dirname in (self.base, self.local_base):
os.makedirs(dirname)
# link to reference
for item in os.scandir(self.reference_base):
if item.name == 'conf':
assert item.is_dir(), os.path.realpath(item)
continue
os.symlink(item.path, os.path.join(self.base, item.name))
# install config
conf_dir = os.path.join(self.base, 'conf')
os.makedirs(conf_dir)
with open(os.path.join(conf_dir, 'drill-override.conf'), 'w') as handle:
handle.write(string.Template(DRILL_OVERRIDE_TEMPLATE).substitute(
local_base=self.local_base))
def build_script(self, csv_source: CsvSource, parquet_output: str):
return DrillScript(self, csv_source, parquet_output)
class DrillScript:
def __init__(self, drill: DrillInstallation, csv_source: CsvSource, parquet_output: str):
self.drill = drill
self.csv_source = csv_source
self.parquet_output = parquet_output
def render(self):
return render_drill_script(
self.csv_source.columns,
self.drill.location.dfs_tmp_path('parquet_tmp_output'),
self.csv_source.path,
)
def run(self):
# execute drill script
script_path = os.path.join(self.drill.location.tempdir, 'script')
script_stdout = os.path.join(self.drill.location.tempdir, 'script_stdout')
script_stderr = os.path.join(self.drill.location.tempdir, 'script_stderr')
cmd = [
self.drill.executable,
'--run={}'.format(script_path),
]
with open(script_path, 'w') as handle:
handle.write(self.render())
with open(script_stdout, 'w') as stdout, open(script_stderr, 'w') as stderr:
proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
proc.wait()
if proc.returncode != 0:
raise DrillScriptError(proc.returncode)
# publish resulting output parquet file
shutil.move(self.drill.location.full_path('parquet_tmp_output'), self.parquet_output)
# helper functions
def get_args():
parser = argparse.ArgumentParser(
description='',
epilog=HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument('csv_input',
help='Path to input CSV file')
parser.add_argument('parquet_output',
help='Path to Parquet output')
parser.add_argument('--debug', default=False, action='store_true',
help='Preserve intermediate files and logs')
parser.add_argument('--column-map', nargs='*',
help='Map CSV header names to Parquet column names')
parser.add_argument('--types', nargs='*',
help='Map CSV header names to Parquet types')
args = parser.parse_args()
try:
args.column_map = list2dict(args.column_map)
except ValueError:
parser.error('--column-map requires an even number of arguments, as key-value pairs')
try:
args.types = list2dict(args.types)
except ValueError:
parser.error('--types requires an even number of arguments, as key-value pairs')
return args
def list2dict(items):
'''convert [a, b, c, d] to {a:b, c:d}'''
if items is None:
return {}
if len(items) % 2 != 0:
raise ValueError
return dict( (items[n], items[n+1])
for n in range(0, len(items)-1, 2) )
def is_valid_parquet_column_name(val):
return '.' not in val
def render_drill_script(columns: Columns, parquet_output: str, csv_input: str):
script = '''alter session set `store.format`='parquet';
CREATE TABLE dfs.tmp.`{}` AS
SELECT
'''.format(parquet_output)
column_lines = [column.line(n) for n, column in enumerate(columns)]
script += ',\n'.join(column_lines) + '\n'
script += 'FROM dfs.`{}`\n'.format(csv_input)
script += 'OFFSET 1\n'
return script
if __name__ == "__main__":
args = get_args()
if args.debug:
preserve = True
# Quick pre-check whether destination exists, so user doesn't have
# to wait long before we abort with a write error. There's a race
# condition because it can still be created between now and when
# we eventually try to write it, but this will catch the common case.
if os.path.exists(args.parquet_output):
sys.stderr.write('Output location "{}" already exists. Rename or delete before running again.\n'.format(args.parquet_output))
sys.exit(1)
csv_source = CsvSource(args.csv_input, args.column_map, args.types)
drill = DrillInstallation()
drill_script = drill.build_script(csv_source, args.parquet_output)
try:
drill_script.run()
except DrillScriptError as err:
sys.stderr.write('''FATAL: Drill script failed with error code {}. To troubleshoot, run
with --debug and inspect files script, script_stderr and script_stdout.
'''.format(err.returncode))
sys.exit(2)