diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index 39997888ce92..6dbfe130e3bb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -106,7 +106,7 @@ public byte[] lookup(byte[] key) throws IOException { return null; } - private BlockIterator getNextBlock() throws IOException { + private BlockIterator getNextBlock() { // index block handle, point to the key, value position. MemorySlice blockHandle = indexBlockIterator.next().getValue(); BlockReader dataBlock = @@ -134,42 +134,41 @@ private BlockReader readBlock(BlockHandle blockHandle, boolean index) { blockCache.getBlock( blockHandle.offset(), blockHandle.size(), - bytes -> { - MemorySegment block = MemorySegment.wrap(bytes); - int crc32cCode = crc32c(block, blockTrailer.getCompressionType()); - checkArgument( - blockTrailer.getCrc32c() == crc32cCode, - String.format( - "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", - blockTrailer.getCrc32c(), crc32cCode, filePath)); - - // decompress data - BlockCompressionFactory compressionFactory = - BlockCompressionFactory.create( - blockTrailer.getCompressionType()); - if (compressionFactory == null) { - return bytes; - } else { - MemorySliceInput compressedInput = - MemorySlice.wrap(block).toInput(); - byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; - BlockDecompressor decompressor = - compressionFactory.getDecompressor(); - int uncompressedLength = - decompressor.decompress( - block.getHeapMemory(), - compressedInput.position(), - compressedInput.available(), - uncompressed, - 0); - checkArgument(uncompressedLength == uncompressed.length); - return uncompressed; - } - }, + bytes -> decompressBlock(bytes, blockTrailer), index); return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator); } + private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) { + MemorySegment compressed = MemorySegment.wrap(compressedBytes); + int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType()); + checkArgument( + blockTrailer.getCrc32c() == crc32cCode, + String.format( + "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", + blockTrailer.getCrc32c(), crc32cCode, filePath)); + + // decompress data + BlockCompressionFactory compressionFactory = + BlockCompressionFactory.create(blockTrailer.getCompressionType()); + if (compressionFactory == null) { + return compressedBytes; + } else { + MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput(); + byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; + BlockDecompressor decompressor = compressionFactory.getDecompressor(); + int uncompressedLength = + decompressor.decompress( + compressed.getHeapMemory(), + compressedInput.position(), + compressedInput.available(), + uncompressed, + 0); + checkArgument(uncompressedLength == uncompressed.length); + return uncompressed; + } + } + @Override public void close() throws IOException { if (bloomFilter != null) {