diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java index c984dafdd5c3..742cc1a60bbe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java @@ -169,6 +169,9 @@ private byte[] getValue(long offset) throws IOException { @Override public void close() throws IOException { + if (bloomFilter != null) { + bloomFilter.close(); + } inputView.close(); inputView = null; } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java index 62c4131e541b..244d7f9dd7e4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java @@ -52,7 +52,8 @@ public SortLookupStoreFactory( @Override public SortLookupStoreReader createReader(File file, Context context) throws IOException { - return new SortLookupStoreReader(comparator, file, (SortContext) context, cacheManager); + return new SortLookupStoreReader( + comparator, file, blockSize, (SortContext) context, cacheManager); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index 5dec3c06ebcc..04c9d287c779 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -20,12 +20,13 @@ import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.compression.BlockDecompressor; +import org.apache.paimon.io.PageFileInput; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.LookupStoreReader; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceInput; -import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.FileBasedBloomFilter; import org.apache.paimon.utils.MurmurHashUtils; import javax.annotation.Nullable; @@ -53,13 +54,16 @@ public class SortLookupStoreReader implements LookupStoreReader { private final FileChannel fileChannel; private final String filePath; private final long fileSize; + private final CacheManager cacheManager; private final BlockIterator indexBlockIterator; - @Nullable private final BloomFilter bloomFilter; + @Nullable private FileBasedBloomFilter bloomFilter; + @Nullable private PageFileInput fileInput; public SortLookupStoreReader( Comparator comparator, File file, + int blockSize, SortContext context, CacheManager cacheManager) throws IOException { @@ -68,21 +72,21 @@ public SortLookupStoreReader( this.fileChannel = new FileInputStream(file).getChannel(); this.filePath = file.getAbsolutePath(); this.fileSize = context.fileSize(); + this.cacheManager = cacheManager; Footer footer = readFooter(); this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator(); - this.bloomFilter = readBloomFilter(footer.getBloomFilterHandle()); - } - - private BloomFilter readBloomFilter(@Nullable BloomFilterHandle bloomFilterHandle) - throws IOException { - BloomFilter bloomFilter = null; - if (bloomFilterHandle != null) { - MemorySegment segment = read(bloomFilterHandle.offset(), bloomFilterHandle.size()); - bloomFilter = new BloomFilter(bloomFilterHandle.expectedEntries(), segment.size()); - bloomFilter.setMemorySegment(segment, 0); + BloomFilterHandle handle = footer.getBloomFilterHandle(); + if (handle != null) { + this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null); + this.bloomFilter = + new FileBasedBloomFilter( + fileInput, + cacheManager, + handle.expectedEntries(), + handle.offset(), + handle.size()); } - return bloomFilter; } private Footer readFooter() throws IOException { @@ -177,6 +181,10 @@ private BlockReader readBlock(BlockHandle blockHandle) throws IOException { @Override public void close() throws IOException { this.fileChannel.close(); + if (bloomFilter != null) { + bloomFilter.close(); + fileInput.close(); + } // TODO clear cache too } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java index 7b885a5e6f7a..3d8751774cd1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java @@ -25,18 +25,21 @@ import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.MemorySegment; +import java.io.Closeable; +import java.io.IOException; + import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Util to apply a built bloom filter . */ -public class FileBasedBloomFilter { +public class FileBasedBloomFilter implements Closeable { private final PageFileInput input; private final CacheManager cacheManager; private final BloomFilter filter; private final long readOffset; private final int readLength; - + private final CacheKey cacheKey; private int accessCount; public FileBasedBloomFilter( @@ -52,6 +55,7 @@ public FileBasedBloomFilter( this.readOffset = readOffset; this.readLength = readLength; this.accessCount = 0; + this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength); } public boolean testHash(int hash) { @@ -61,7 +65,7 @@ public boolean testHash(int hash) { if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) { MemorySegment segment = cacheManager.getPage( - CacheKey.forPosition(input.file(), readOffset, readLength), + cacheKey, key -> input.readPosition(readOffset, readLength), new BloomFilterCallBack(filter)); filter.setMemorySegment(segment, 0); @@ -75,6 +79,11 @@ BloomFilter bloomFilter() { return filter; } + @Override + public void close() throws IOException { + cacheManager.invalidPage(cacheKey); + } + /** Call back for cache manager. */ private static class BloomFilterCallBack implements CacheCallback { diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java index 970219e7b4f7..8bda49005c79 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java @@ -59,7 +59,8 @@ public void testProbe() throws IOException { Arrays.stream(inputs) .forEach(i -> Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue()); - cacheManager.cache().invalidateAll(); + filter.close(); + Assertions.assertThat(cacheManager.cache().asMap()).isEmpty(); Assertions.assertThat(filter.bloomFilter().getMemorySegment()).isNull(); }