Skip to content

Commit

Permalink
[core] Extract decompressBlock method in SortLookupStoreReader
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 2, 2024
1 parent 3c82082 commit 039046a
Showing 1 changed file with 32 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public byte[] lookup(byte[] key) throws IOException {
return null;
}

private BlockIterator getNextBlock() throws IOException {
private BlockIterator getNextBlock() {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock =
Expand Down Expand Up @@ -134,42 +134,41 @@ private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
blockCache.getBlock(
blockHandle.offset(),
blockHandle.size(),
bytes -> {
MemorySegment block = MemorySegment.wrap(bytes);
int crc32cCode = crc32c(block, blockTrailer.getCompressionType());
checkArgument(
blockTrailer.getCrc32c() == crc32cCode,
String.format(
"Expected CRC32C(%d) but found CRC32C(%d) for file(%s)",
blockTrailer.getCrc32c(), crc32cCode, filePath));

// decompress data
BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(
blockTrailer.getCompressionType());
if (compressionFactory == null) {
return bytes;
} else {
MemorySliceInput compressedInput =
MemorySlice.wrap(block).toInput();
byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
BlockDecompressor decompressor =
compressionFactory.getDecompressor();
int uncompressedLength =
decompressor.decompress(
block.getHeapMemory(),
compressedInput.position(),
compressedInput.available(),
uncompressed,
0);
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
},
bytes -> decompressBlock(bytes, blockTrailer),
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
}

private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) {
MemorySegment compressed = MemorySegment.wrap(compressedBytes);
int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType());
checkArgument(
blockTrailer.getCrc32c() == crc32cCode,
String.format(
"Expected CRC32C(%d) but found CRC32C(%d) for file(%s)",
blockTrailer.getCrc32c(), crc32cCode, filePath));

// decompress data
BlockCompressionFactory compressionFactory =
BlockCompressionFactory.create(blockTrailer.getCompressionType());
if (compressionFactory == null) {
return compressedBytes;
} else {
MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput();
byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
BlockDecompressor decompressor = compressionFactory.getDecompressor();
int uncompressedLength =
decompressor.decompress(
compressed.getHeapMemory(),
compressedInput.position(),
compressedInput.available(),
uncompressed,
0);
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
}

@Override
public void close() throws IOException {
if (bloomFilter != null) {
Expand Down

0 comments on commit 039046a

Please sign in to comment.