diff --git a/asdf/_block/io.py b/asdf/_block/io.py index 3a42bed77..e38c4377d 100644 --- a/asdf/_block/io.py +++ b/asdf/_block/io.py @@ -105,7 +105,7 @@ def read_block_header(fd, offset=None): return validate_block_header(header) -def read_block_data(fd, header, offset=None, memmap=False): +def read_block_data(fd, header, offset=None, memmap=False, out=None): """ Read (or memory map) data for an ASDF block. @@ -127,6 +127,9 @@ def read_block_data(fd, header, offset=None, memmap=False): does not support memmapping the data will not be memmapped (and no error will be raised). + out : ndarray, optional + Destination array for read block data. + Returns ------- data : ndarray or memmap @@ -148,13 +151,14 @@ def read_block_data(fd, header, offset=None, memmap=False): compression = mcompression.validate(header["compression"]) if compression: # compressed data will not be memmapped - data = mcompression.decompress(fd, used_size, header["data_size"], compression) + data = mcompression.decompress(fd, used_size, header["data_size"], compression, out=out) fd.fast_forward(header["allocated_size"] - header["used_size"]) else: if memmap and fd.can_memmap(): data = fd.memmap_array(offset, used_size) ff_bytes = header["allocated_size"] else: + # TODO update to read into out data = fd.read_into_array(used_size) ff_bytes = header["allocated_size"] - header["used_size"] if (header["flags"] & constants.BLOCK_FLAG_STREAMED) and fd.seekable(): @@ -164,7 +168,7 @@ def read_block_data(fd, header, offset=None, memmap=False): return data -def read_block(fd, offset=None, memmap=False, lazy_load=False): +def read_block(fd, offset=None, memmap=False, lazy_load=False, out=None): """ Read a block (header and data) from an ASDF file. @@ -187,6 +191,9 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False): Return a callable that when called will read the block data. This option is ignored for a non-seekable file. + out : ndarray, optional + Destination array for read block data. + Returns ------- offset : int @@ -216,13 +223,15 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False): # setup a callback to later load the data fd_ref = weakref.ref(fd) - def callback(): + def callback(out=None): + # out here can be different since this callback might be called + # at a later point fd = fd_ref() if fd is None or fd.is_closed(): msg = "ASDF file has already been closed. Can not get the data." raise OSError(msg) position = fd.tell() - data = read_block_data(fd, header, offset=data_offset, memmap=memmap) + data = read_block_data(fd, header, offset=data_offset, memmap=memmap, out=out) fd.seek(position) return data @@ -232,7 +241,7 @@ def callback(): else: fd.fast_forward(header["allocated_size"]) else: - data = read_block_data(fd, header, offset=None, memmap=memmap) + data = read_block_data(fd, header, offset=None, memmap=memmap, out=out) return offset, header, data_offset, data diff --git a/asdf/_block/reader.py b/asdf/_block/reader.py index 60abd9d34..c1a97ae0f 100644 --- a/asdf/_block/reader.py +++ b/asdf/_block/reader.py @@ -33,24 +33,37 @@ def close(self): def loaded(self): return self._data is not None - def load(self): + def load(self, out=None): """ Load the block data (if it is not already loaded). + Parameters + ---------- + out : ndarray, optional + Destination array for read block data. + Raises ------ OSError If attempting to load from a closed file. """ - if self.loaded: + if self.loaded and out is None: return + if out is not None: + # FIXME document this + lazy_load = False + memmap = False + else: + lazy_load = self.lazy_load + memmap = self.memmap fd = self._fd() if fd is None or fd.is_closed(): msg = "Attempt to load block from closed file" raise OSError(msg) position = fd.tell() + # FIXME this re-reads the header every time _, self._header, self.data_offset, self._data = bio.read_block( - fd, offset=self.offset, memmap=self.memmap, lazy_load=self.lazy_load + fd, offset=self.offset, memmap=memmap, lazy_load=lazy_load, out=out ) fd.seek(position) diff --git a/asdf/_compression.py b/asdf/_compression.py index 89076e313..91b3b1893 100644 --- a/asdf/_compression.py +++ b/asdf/_compression.py @@ -235,7 +235,7 @@ def to_compression_header(compression): return compression -def decompress(fd, used_size, data_size, compression, config=None): +def decompress(fd, used_size, data_size, compression, config=None, out=None): """ Decompress binary data in a file @@ -257,12 +257,16 @@ def decompress(fd, used_size, data_size, compression, config=None): Any kwarg parameters to pass to the underlying decompression function + out : numpy.array, optional + Array in which to store decompressed data + Returns ------- array : numpy.array A flat uint8 containing the decompressed data. """ - buffer = np.empty((data_size,), np.uint8) + if out is None: + out = np.empty((data_size,), np.uint8) compression = validate(compression) decoder = _get_compressor(compression) @@ -270,13 +274,13 @@ def decompress(fd, used_size, data_size, compression, config=None): config = {} blocks = fd.read_blocks(used_size) # data is a generator - len_decoded = decoder.decompress(blocks, out=buffer.data, **config) + len_decoded = decoder.decompress(blocks, out=out.data, **config) if len_decoded != data_size: msg = "Decompressed data wrong size" raise ValueError(msg) - return buffer + return out def compress(fd, data, compression, config=None):