Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for bitshuffle #286

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
("H5Z_FILTER_SNAPPY", 32003, "snappy"),
("H5Z_FILTER_LZ4", 32004, "lz4"),
("H5Z_FILTER_LZ4HC", 32005, "lz4hc"),
("H5Z_FILTER_BITSHUFFLE", 32008, "bitshuffle"),
("H5Z_FILTER_ZSTD", 32015, "zstd"),
)

Expand Down Expand Up @@ -78,6 +79,7 @@ def getFilterItem(key):
"""
Return filter code, id, and name, based on an id, a name or a code.
"""

if key == "deflate":
key = "gzip" # use gzip as equivalent
for item in FILTER_DEFS:
Expand Down Expand Up @@ -123,14 +125,15 @@ def getCompressionFilter(dset_json):
def getShuffleFilter(dset_json):
"""Return shuffle filter, or None"""
filters = getFilters(dset_json)
FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE")
for filter in filters:
try:
if filter["class"] == "H5Z_FILTER_SHUFFLE":
log.debug(f"Shuffle filter is used: {filter}")
return filter
except KeyError:
if "class" not in filter:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class in FILTER_CLASSES:
return filter

log.debug("Shuffle filter not used")
return None

Expand All @@ -149,17 +152,21 @@ def getFilterOps(app, dset_json, item_size):
filter_ops = {}

shuffleFilter = getShuffleFilter(dset_json)
if shuffleFilter:
filter_ops["use_shuffle"] = True
if shuffleFilter and item_size != "H5T_VARIABLE":
shuffle_name = shuffleFilter["name"]
if shuffle_name == "shuffle":
filter_ops["shuffle"] = 1 # use regular shuffle
elif shuffle_name == "bitshuffle":
filter_ops["shuffle"] = 2 # use bitshuffle
else:
log.warn(f"unexpected shuffleFilter: {shuffle_name}")
filter_ops["shuffle"] = 0 # no shuffle
else:
filter_ops["shuffle"] = 0 # no shuffle

if compressionFilter:
if compressionFilter["class"] == "H5Z_FILTER_DEFLATE":
filter_ops["compressor"] = "zlib" # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops["use_shuffle"] = False
else:
if "name" in compressionFilter:
filter_ops["compressor"] = compressionFilter["name"]
Expand All @@ -170,10 +177,7 @@ def getFilterOps(app, dset_json, item_size):
else:
filter_ops["level"] = int(compressionFilter["level"])

if filter_ops:
filter_ops["item_size"] = item_size
if item_size == "H5T_VARIABLE":
filter_ops["use_shuffle"] = False
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save
return filter_ops
Expand Down
206 changes: 128 additions & 78 deletions hsds/util/storUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import time
import json
import zlib
import numpy as np
import numcodecs as codecs
from bitshuffle import bitshuffle, bitunshuffle
from aiohttp.web_exceptions import HTTPInternalServerError


from .. import hsds_logger as log
from .s3Client import S3Client

Expand All @@ -44,6 +45,9 @@ def FileClient(app):

from .. import config

BYTE_SHUFFLE = 1
BIT_SHUFFLE = 2


def getCompressors():
"""return available compressors"""
Expand All @@ -65,6 +69,7 @@ def getCompressors():
def getSupportedFilters(include_compressors=True):
"""return list of other supported filters"""
filters = [
"bitshuffle",
"shuffle",
"fletcher32",
]
Expand All @@ -85,18 +90,118 @@ def getBloscThreads():
return nthreads


def _shuffle(element_size, chunk):
shuffler = codecs.Shuffle(element_size)
arr = shuffler.encode(chunk)
def _shuffle(codec, data, item_size=4):
if codec == 1:
# byte shuffle, use numcodecs Shuffle
shuffler = codecs.Shuffle(item_size)
arr = shuffler.encode(data)
elif codec == 2:
# bit shuffle, use bitshuffle packge
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
arr = bitshuffle(data)
else:
log.error(f"Unexpected codec: {codec} for _shuffle")
raise ValueError()
return arr.tobytes()


def _unshuffle(element_size, chunk):
shuffler = codecs.Shuffle(element_size)
arr = shuffler.decode(chunk)
def _unshuffle(codec, data, item_size=4):
if codec == 1:
# byte shuffle, use numcodecs Shuffle
shuffler = codecs.Shuffle(item_size)
arr = shuffler.decode(data)
elif codec == 2:
# bit shuffle, use bitshuffle
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
arr = bitunshuffle(data)

return arr.tobytes()


def _uncompress(data, compressor=None, shuffle=0, item_size=4):
""" Uncompress the provided data using compessor and/or shuffle """
log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})")
if compressor:
if compressor in ("gzip", "deflate"):
# blosc referes to this as zlib
compressor = "zlib"
# first check if this was compressed with blosc
# returns typesize, isshuffle, and memcopied
blosc_metainfo = codecs.blosc.cbuffer_metainfo(data)
if blosc_metainfo[0] > 0:
log.info(f"blosc compressed data for {len(data)} bytes")
try:
blosc = codecs.Blosc()
udata = blosc.decode(data)
log.info(f"blosc uncompressed to {len(udata)} bytes")
data = udata
if shuffle == BYTE_SHUFFLE:
shuffle = 0 # blosc will unshuffle the bytes for us
except Exception as e:
msg = f"got exception: {e} using blosc decompression"
log.error(msg)
raise HTTPInternalServerError()
elif compressor == "zlib":
# data may have been compressed without blosc,
# try using zlib directly
log.info(f"using zlib to decompress {len(data)} bytes")
try:
udata = zlib.decompress(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
except zlib.error as zlib_error:
log.info(f"zlib_err: {zlib_error}")
log.error("unable to uncompress data with zlib")
raise HTTPInternalServerError()
else:
msg = f"don't know how to decompress data in {compressor} "
log.error(msg)
raise HTTPInternalServerError()
if shuffle:
start_time = time.time()
data = _unshuffle(shuffle, data, item_size=item_size)
finish_time = time.time()
elapsed = finish_time - start_time
msg = f"unshuffled {len(data)} bytes, {(elapsed):.2f} elapsed"
log.debug(msg)

return data


def _compress(data, compressor=None, clevel=5, shuffle=0, item_size=4):
log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})")
if shuffle == 2:
# bit shuffle the data before applying the compressor
log.debug("bitshuffling data")
data = _shuffle(shuffle, data, item_size=item_size)
shuffle = 0 # don't do any blosc shuffling

if compressor:
if compressor in ("gzip", "deflate"):
# blosc referes to this as zlib
compressor = "zlib"
cdata = None
# try with blosc compressor
try:
blosc = codecs.Blosc(cname=compressor, clevel=clevel, shuffle=shuffle)
cdata = blosc.encode(data)
msg = f"compressed from {len(data)} bytes to {len(cdata)} bytes "
msg += f"using filter: {blosc.cname} with level: {blosc.clevel}"
log.info(msg)
except Exception as e:
log.error(f"got exception using blosc encoding: {e}")
raise HTTPInternalServerError()

if cdata is not None:
data = cdata # used compress data

return data


def _getStorageClient(app):
"""get storage client s3 or azure blob"""

Expand Down Expand Up @@ -217,58 +322,6 @@ async def getStorJSONObj(app, key, bucket=None):
return json_dict


def _uncompress(data, compressor=None, shuffle=0):
""" Uncompress the provided data using compessor and/or shuffle """
if compressor:
# compressed chunk data...

# first check if this was compressed with blosc
# returns typesize, isshuffle, and memcopied
blosc_metainfo = codecs.blosc.cbuffer_metainfo(data)
if blosc_metainfo[0] > 0:
log.info(f"blosc compressed data for {len(data)} bytes")
try:
blosc = codecs.Blosc()
udata = blosc.decode(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
shuffle = 0 # blosc will unshuffle the bytes for us
except Exception as e:
msg = f"got exception: {e} using blosc decompression"
log.error(msg)
raise HTTPInternalServerError()
elif compressor == "zlib":
# data may have been compressed without blosc,
# try using zlib directly
log.info(f"using zlib to decompress {len(data)} bytes")
try:
udata = zlib.decompress(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
except zlib.error as zlib_error:
log.info(f"zlib_err: {zlib_error}")
log.error("unable to uncompress data with zlib")
raise HTTPInternalServerError()
else:
msg = f"don't know how to decompress data in {compressor} "
log.error(msg)
raise HTTPInternalServerError()

if shuffle > 0:
log.debug(f"shuffle is {shuffle}")
start_time = time.time()
unshuffled = _unshuffle(shuffle, data)
if unshuffled is not None:
log.debug(f"unshuffled to {len(unshuffled)} bytes")
data = unshuffled
finish_time = time.time()
elapsed = finish_time - start_time
msg = f"unshuffled {len(data)} bytes, {(elapsed):.2f} elapsed"
log.debug(msg)

return data


async def getStorBytes(app,
key,
filter_ops=None,
Expand All @@ -294,15 +347,20 @@ async def getStorBytes(app,
log.info(msg)

shuffle = 0
item_size = 4
compressor = None
if filter_ops:
log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}")
if "use_shuffle" in filter_ops and filter_ops["use_shuffle"]:
if filter_ops.get("shuffle") == "shuffle":
shuffle = filter_ops["item_size"]
log.debug("using shuffle filter")
elif filter_ops.get("shuffle") == "bitshuffle":
shuffle = 2
log.debug("using bitshuffle filter")
if "compressor" in filter_ops:
compressor = filter_ops["compressor"]
log.debug(f"using compressor: {compressor}")
item_size = filter_ops["item_size"]

kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length}

Expand Down Expand Up @@ -336,7 +394,8 @@ async def getStorBytes(app,
m = n + chunk_location.length
log.debug(f"getStorBytes - extracting chunk from data[{n}:{m}]")
h5_bytes = data[n:m]
h5_bytes = _uncompress(h5_bytes, compressor=compressor, shuffle=shuffle)
kwargs = {"compressor": compressor, "shuffle": shuffle, "item_size": item_size}
h5_bytes = _uncompress(h5_bytes, **kwargs)
if len(h5_bytes) != h5_size:
msg = f"expected chunk index: {chunk_location.index} to have size: "
msg += f"{h5_size} but got: {len(h5_bytes)}"
Expand All @@ -353,7 +412,7 @@ async def getStorBytes(app,
# chunk_bytes got updated, so just return None
return None
else:
data = _uncompress(data, compressor=compressor, shuffle=shuffle)
data = _uncompress(data, compressor=compressor, shuffle=shuffle, item_size=item_size)
return data


Expand All @@ -365,32 +424,23 @@ async def putStorBytes(app, key, data, filter_ops=None, bucket=None):
bucket = app["bucket_name"]
if key[0] == "/":
key = key[1:] # no leading slash
shuffle = -1 # auto-shuffle
shuffle = 0
clevel = 5
cname = None # compressor name
item_size = 4
if filter_ops:
if "compressor" in filter_ops:
cname = filter_ops["compressor"]
if "use_shuffle" in filter_ops and not filter_ops["use_shuffle"]:
shuffle = 0 # client indicates to turn off shuffling
if "shuffle" in filter_ops:
shuffle = filter_ops["shuffle"]
if "level" in filter_ops:
clevel = filter_ops["level"]
item_size = filter_ops["item_size"]
msg = f"putStorBytes({bucket}/{key}), {len(data)} bytes shuffle: {shuffle}"
msg += f" compressor: {cname} level: {clevel}"
msg += f" compressor: {cname} level: {clevel}, item_size: {item_size}"
log.info(msg)

if cname:
try:
blosc = codecs.Blosc(cname=cname, clevel=clevel, shuffle=shuffle)
cdata = blosc.encode(data)
# TBD: add cname in blosc constructor
msg = f"compressed from {len(data)} bytes to {len(cdata)} bytes "
msg += f"using filter: {blosc.cname} with level: {blosc.clevel}"
log.info(msg)
data = cdata
except Exception as e:
log.error(f"got exception using blosc encoding: {e}")
raise HTTPInternalServerError()
data = _compress(data, compressor=cname, clevel=clevel, shuffle=shuffle, item_size=item_size)

rsp = await client.put_object(key, data, bucket=bucket)

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"aiohttp_cors",
"aiofiles",
"azure-storage-blob",
"bitshuffle",
"botocore",
"cryptography",
"numcodecs",
Expand Down
2 changes: 1 addition & 1 deletion testall.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

PYTHON_CMD = "python" # change to "python3" if "python" invokes python version 2.x

unit_tests = ('array_util_test', 'chunk_util_test', 'domain_util_test',
unit_tests = ('array_util_test', 'chunk_util_test', 'compression_test', 'domain_util_test',
'dset_util_test', 'hdf5_dtype_test', 'id_util_test', 'lru_cache_test',
'shuffle_test', 'rangeget_util_test')

Expand Down
Loading
Loading