Skip to content

Commit

Permalink
[core] Manage the bloomfilter buffer in cache manager (apache#4403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Oct 30, 2024
1 parent 1948716 commit 8e4de02
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ private byte[] getValue(long offset) throws IOException {

@Override
public void close() throws IOException {
if (bloomFilter != null) {
bloomFilter.close();
}
inputView.close();
inputView = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public SortLookupStoreFactory(

@Override
public SortLookupStoreReader createReader(File file, Context context) throws IOException {
return new SortLookupStoreReader(comparator, file, (SortContext) context, cacheManager);
return new SortLookupStoreReader(
comparator, file, blockSize, (SortContext) context, cacheManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.io.PageFileInput;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileBasedBloomFilter;
import org.apache.paimon.utils.MurmurHashUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -53,13 +54,16 @@ public class SortLookupStoreReader implements LookupStoreReader {
private final FileChannel fileChannel;
private final String filePath;
private final long fileSize;
private final CacheManager cacheManager;

private final BlockIterator indexBlockIterator;
@Nullable private final BloomFilter bloomFilter;
@Nullable private FileBasedBloomFilter bloomFilter;
@Nullable private PageFileInput fileInput;

public SortLookupStoreReader(
Comparator<MemorySlice> comparator,
File file,
int blockSize,
SortContext context,
CacheManager cacheManager)
throws IOException {
Expand All @@ -68,21 +72,21 @@ public SortLookupStoreReader(
this.fileChannel = new FileInputStream(file).getChannel();
this.filePath = file.getAbsolutePath();
this.fileSize = context.fileSize();
this.cacheManager = cacheManager;

Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator();
this.bloomFilter = readBloomFilter(footer.getBloomFilterHandle());
}

private BloomFilter readBloomFilter(@Nullable BloomFilterHandle bloomFilterHandle)
throws IOException {
BloomFilter bloomFilter = null;
if (bloomFilterHandle != null) {
MemorySegment segment = read(bloomFilterHandle.offset(), bloomFilterHandle.size());
bloomFilter = new BloomFilter(bloomFilterHandle.expectedEntries(), segment.size());
bloomFilter.setMemorySegment(segment, 0);
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle != null) {
this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null);
this.bloomFilter =
new FileBasedBloomFilter(
fileInput,
cacheManager,
handle.expectedEntries(),
handle.offset(),
handle.size());
}
return bloomFilter;
}

private Footer readFooter() throws IOException {
Expand Down Expand Up @@ -177,6 +181,10 @@ private BlockReader readBlock(BlockHandle blockHandle) throws IOException {
@Override
public void close() throws IOException {
this.fileChannel.close();
if (bloomFilter != null) {
bloomFilter.close();
fileInput.close();
}
// TODO clear cache too
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySegment;

import java.io.Closeable;
import java.io.IOException;

import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util to apply a built bloom filter . */
public class FileBasedBloomFilter {
public class FileBasedBloomFilter implements Closeable {

private final PageFileInput input;
private final CacheManager cacheManager;
private final BloomFilter filter;
private final long readOffset;
private final int readLength;

private final CacheKey cacheKey;
private int accessCount;

public FileBasedBloomFilter(
Expand All @@ -52,6 +55,7 @@ public FileBasedBloomFilter(
this.readOffset = readOffset;
this.readLength = readLength;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength);
}

public boolean testHash(int hash) {
Expand All @@ -61,7 +65,7 @@ public boolean testHash(int hash) {
if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) {
MemorySegment segment =
cacheManager.getPage(
CacheKey.forPosition(input.file(), readOffset, readLength),
cacheKey,
key -> input.readPosition(readOffset, readLength),
new BloomFilterCallBack(filter));
filter.setMemorySegment(segment, 0);
Expand All @@ -75,6 +79,11 @@ BloomFilter bloomFilter() {
return filter;
}

@Override
public void close() throws IOException {
cacheManager.invalidPage(cacheKey);
}

/** Call back for cache manager. */
private static class BloomFilterCallBack implements CacheCallback {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void testProbe() throws IOException {

Arrays.stream(inputs)
.forEach(i -> Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());
cacheManager.cache().invalidateAll();
filter.close();
Assertions.assertThat(cacheManager.cache().asMap()).isEmpty();
Assertions.assertThat(filter.bloomFilter().getMemorySegment()).isNull();
}

Expand Down

0 comments on commit 8e4de02

Please sign in to comment.