-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhdf5_gz2zstd.py
107 lines (91 loc) · 3.55 KB
/
hdf5_gz2zstd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import gzip
import lzma
import numpy as np
import pandas as pd
import zstandard as zstd
import time
import os
"""
HDF5 storage GZ to ZSTD (custom dict) repack utility
"""
# os.environ['PYTHON_ZSTANDARD_IMPORT_POLICY'] = "cffi"
import h5py
class Hdf5HtmlIter(object):
def __init__(self, ds, max=None):
self._ds = ds
self._cur = 0
self._max = max
def __iter__(self):
return self
def __next__(self):
if self._max is not None:
if self._cur >= self._max:
raise StopIteration
try:
gz = self._ds[self._cur]
except ValueError:
raise StopIteration
self._cur += 1
return gzip.decompress(gz.tobytes())
def ds_append(ds, data, expand_by=100):
try:
recno = ds.attrs["lastrec"] + 1
except KeyError:
recno = 0
dslen = ds.len()
# print(f"recno {recno} ds.len() {dslen}")
if recno >= dslen:
ds.resize([dslen + expand_by])
ds[recno] = data
ds.attrs["lastrec"] = recno
def main():
zdict = r"custom_zstd_dict"
with open(zdict, "rb") as f:
zdict = f.read()
zdict = zstd.ZstdCompressionDict(zdict)
zc = zstd.ZstdCompressor(level=12, dict_data=zdict,
threads=6,
# write_content_size=False,
write_checksum=True,
write_dict_id=False)
zd = zstd.ZstdDecompressor(dict_data=zdict)
def process(write=True, max=None):
stats = []
with h5py.File(r"html_gz.h5", "r") as rf:
with h5py.File(r"html_zstd.h5", "w") as wf:
rds = rf["/html_gz"]
# sort of varbinary(), somewhat tricky but usable
wds = wf.create_dataset('html_zstd', (0,), maxshape=(None,), dtype=h5py.special_dtype(vlen=np.uint8))
t0 = time.perf_counter()
records = 0
for gz in rds:
records += 1
t1 = time.perf_counter()
html = gzip.decompress(gz.tobytes())
zs = zc.compress(html)
zs = np.frombuffer(zs, dtype=np.uint8)
hlen = len(html)
glen = len(gz)
zlen = len(zs)
t2 = time.perf_counter()
stats.append([hlen / 1024, hlen / glen, hlen / zlen, (t2 - t1) * 1000])
if write:
ds_append(wds, zs)
if records % 250 == 0:
df = pd.DataFrame(columns=["html", "gz", "zstd", "ms"], data=stats)
df["gz/zstd"] = df["zstd"] / df["gz"]
df = df.agg("mean")
dt = time.perf_counter() - t0
rec_per_sec = records / dt
remain = rds.len() - records
print(f"html: {df.loc['html']:.1f}k,",
", ".join([f"{k:}: {df.loc[k]:>.2f}" for k in ("gz", "zstd", "gz/zstd")]))
print(
f"{records} records processed in {dt / 60:.1f} min, {rec_per_sec:.0f} rec/s, {remain} remaining, ETA {remain / rec_per_sec / 60:.1f} min")
stats = []
if max is not None:
if records > max:
break
process() # max=5000)
if __name__ == '__main__':
main()