Skip to content

Commit

Permalink
WIP: decompress into array
Browse files Browse the repository at this point in the history
  • Loading branch information
braingram committed Oct 10, 2024
1 parent d8f3dcb commit 63a8c89
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
21 changes: 15 additions & 6 deletions asdf/_block/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def read_block_header(fd, offset=None):
return validate_block_header(header)


def read_block_data(fd, header, offset=None, memmap=False):
def read_block_data(fd, header, offset=None, memmap=False, out=None):
"""
Read (or memory map) data for an ASDF block.
Expand All @@ -127,6 +127,9 @@ def read_block_data(fd, header, offset=None, memmap=False):
does not support memmapping the data will not be memmapped (and
no error will be raised).
out : ndarray, optional
Destination array for read block data.
Returns
-------
data : ndarray or memmap
Expand All @@ -148,13 +151,14 @@ def read_block_data(fd, header, offset=None, memmap=False):
compression = mcompression.validate(header["compression"])
if compression:
# compressed data will not be memmapped
data = mcompression.decompress(fd, used_size, header["data_size"], compression)
data = mcompression.decompress(fd, used_size, header["data_size"], compression, out=out)
fd.fast_forward(header["allocated_size"] - header["used_size"])
else:
if memmap and fd.can_memmap():
data = fd.memmap_array(offset, used_size)
ff_bytes = header["allocated_size"]
else:
# TODO update to read into out
data = fd.read_into_array(used_size)
ff_bytes = header["allocated_size"] - header["used_size"]
if (header["flags"] & constants.BLOCK_FLAG_STREAMED) and fd.seekable():
Expand All @@ -164,7 +168,7 @@ def read_block_data(fd, header, offset=None, memmap=False):
return data


def read_block(fd, offset=None, memmap=False, lazy_load=False):
def read_block(fd, offset=None, memmap=False, lazy_load=False, out=None):
"""
Read a block (header and data) from an ASDF file.
Expand All @@ -187,6 +191,9 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False):
Return a callable that when called will read the block data. This
option is ignored for a non-seekable file.
out : ndarray, optional
Destination array for read block data.
Returns
-------
offset : int
Expand Down Expand Up @@ -216,13 +223,15 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False):
# setup a callback to later load the data
fd_ref = weakref.ref(fd)

def callback():
def callback(out=None):
# out here can be different since this callback might be called
# at a later point
fd = fd_ref()
if fd is None or fd.is_closed():
msg = "ASDF file has already been closed. Can not get the data."
raise OSError(msg)
position = fd.tell()
data = read_block_data(fd, header, offset=data_offset, memmap=memmap)
data = read_block_data(fd, header, offset=data_offset, memmap=memmap, out=out)
fd.seek(position)
return data

Expand All @@ -232,7 +241,7 @@ def callback():
else:
fd.fast_forward(header["allocated_size"])
else:
data = read_block_data(fd, header, offset=None, memmap=memmap)
data = read_block_data(fd, header, offset=None, memmap=memmap, out=out)
return offset, header, data_offset, data


Expand Down
19 changes: 16 additions & 3 deletions asdf/_block/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,37 @@ def close(self):
def loaded(self):
return self._data is not None

def load(self):
def load(self, out=None):
"""
Load the block data (if it is not already loaded).
Parameters
----------
out : ndarray, optional
Destination array for read block data.
Raises
------
OSError
If attempting to load from a closed file.
"""
if self.loaded:
if self.loaded and out is None:
return
if out is not None:
# FIXME document this
lazy_load = False
memmap = False
else:
lazy_load = self.lazy_load
memmap = self.memmap
fd = self._fd()
if fd is None or fd.is_closed():
msg = "Attempt to load block from closed file"
raise OSError(msg)
position = fd.tell()
# FIXME this re-reads the header every time
_, self._header, self.data_offset, self._data = bio.read_block(
fd, offset=self.offset, memmap=self.memmap, lazy_load=self.lazy_load
fd, offset=self.offset, memmap=memmap, lazy_load=lazy_load, out=out
)
fd.seek(position)

Expand Down
12 changes: 8 additions & 4 deletions asdf/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def to_compression_header(compression):
return compression


def decompress(fd, used_size, data_size, compression, config=None):
def decompress(fd, used_size, data_size, compression, config=None, out=None):
"""
Decompress binary data in a file
Expand All @@ -257,26 +257,30 @@ def decompress(fd, used_size, data_size, compression, config=None):
Any kwarg parameters to pass to the underlying decompression
function
out : numpy.array, optional
Array in which to store decompressed data
Returns
-------
array : numpy.array
A flat uint8 containing the decompressed data.
"""
buffer = np.empty((data_size,), np.uint8)
if out is None:
out = np.empty((data_size,), np.uint8)

compression = validate(compression)
decoder = _get_compressor(compression)
if config is None:
config = {}

blocks = fd.read_blocks(used_size) # data is a generator
len_decoded = decoder.decompress(blocks, out=buffer.data, **config)
len_decoded = decoder.decompress(blocks, out=out.data, **config)

if len_decoded != data_size:
msg = "Decompressed data wrong size"
raise ValueError(msg)

return buffer
return out


def compress(fd, data, compression, config=None):
Expand Down

0 comments on commit 63a8c89

Please sign in to comment.