diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index c467deaf9114..5dec3c06ebcc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -25,6 +25,8 @@ import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.memory.MemorySliceInput; +import org.apache.paimon.utils.BloomFilter; +import org.apache.paimon.utils.MurmurHashUtils; import javax.annotation.Nullable; @@ -53,6 +55,7 @@ public class SortLookupStoreReader implements LookupStoreReader { private final long fileSize; private final BlockIterator indexBlockIterator; + @Nullable private final BloomFilter bloomFilter; public SortLookupStoreReader( Comparator comparator, @@ -68,7 +71,18 @@ public SortLookupStoreReader( Footer footer = readFooter(); this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator(); - // TODO read bloom filter block + this.bloomFilter = readBloomFilter(footer.getBloomFilterHandle()); + } + + private BloomFilter readBloomFilter(@Nullable BloomFilterHandle bloomFilterHandle) + throws IOException { + BloomFilter bloomFilter = null; + if (bloomFilterHandle != null) { + MemorySegment segment = read(bloomFilterHandle.offset(), bloomFilterHandle.size()); + bloomFilter = new BloomFilter(bloomFilterHandle.expectedEntries(), segment.size()); + bloomFilter.setMemorySegment(segment, 0); + } + return bloomFilter; } private Footer readFooter() throws IOException { @@ -79,6 +93,10 @@ private Footer readFooter() throws IOException { @Nullable @Override public byte[] lookup(byte[] key) throws IOException { + if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) { + return null; + } + MemorySlice keySlice = MemorySlice.wrap(key); // seek the index to the block containing the key indexBlockIterator.seekTo(keySlice); diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java index c222877c8602..a2299c68fe99 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java @@ -24,34 +24,60 @@ import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.LookupStoreFactory.Context; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BloomFilter; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Arrays; import java.util.Comparator; +import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link SortLookupStoreFactory}. */ +@ExtendWith(ParameterizedTestExtension.class) public class SortLookupStoreFactoryTest { private static final int VALUE_COUNT = 10_000_000; private static final int QUERY_COUNT = 10_000; private final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + private final boolean bloomFilterEnabled; + private final CompressOptions compress; @TempDir Path tempDir; private File file; + public SortLookupStoreFactoryTest(List var) { + this.bloomFilterEnabled = (Boolean) var.get(0); + this.compress = new CompressOptions((String) var.get(1), 1); + } + + @SuppressWarnings("unused") + @Parameters(name = "enableBf&compress-{0}") + public static List> getVarSeg() { + return Arrays.asList( + Arrays.asList(true, "none"), + Arrays.asList(false, "none"), + Arrays.asList(false, "lz4"), + Arrays.asList(true, "lz4"), + Arrays.asList(false, "zstd"), + Arrays.asList(true, "zstd")); + } + @BeforeEach public void before() throws Exception { file = new File(tempDir.toFile(), UUID.randomUUID().toString()); @@ -60,16 +86,17 @@ public void before() throws Exception { } } - @Test + @TestTemplate public void testNormal() throws IOException { SortLookupStoreFactory factory = new SortLookupStoreFactory( Comparator.naturalOrder(), new CacheManager(MemorySize.ofMebiBytes(1)), 1024, - CompressOptions.defaultOptions()); + compress); - SortLookupStoreWriter writer = factory.createWriter(file, null); + SortLookupStoreWriter writer = + factory.createWriter(file, createBloomFiler(bloomFilterEnabled)); for (int i = 0; i < VALUE_COUNT; i++) { byte[] bytes = toBytes(i); writer.put(bytes, bytes); @@ -82,10 +109,13 @@ public void testNormal() throws IOException { byte[] bytes = toBytes(query); assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(query); } + + assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull(); + reader.close(); } - @Test + @TestTemplate public void testIntKey() throws IOException { RowCompactedSerializer keySerializer = new RowCompactedSerializer(RowType.of(new IntType())); @@ -95,8 +125,9 @@ public void testIntKey() throws IOException { keySerializer.createSliceComparator(), new CacheManager(MemorySize.ofMebiBytes(1)), 64 * 1024, - CompressOptions.defaultOptions()); - SortLookupStoreWriter writer = factory.createWriter(file, null); + compress); + SortLookupStoreWriter writer = + factory.createWriter(file, createBloomFiler(bloomFilterEnabled)); for (int i = 0; i < VALUE_COUNT; i++) { byte[] bytes = toBytes(keySerializer, row, i); writer.put(bytes, toBytes(i)); @@ -114,9 +145,19 @@ public void testIntKey() throws IOException { throw new RuntimeException(e); } } + + assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull(); + reader.close(); } + private BloomFilter.Builder createBloomFiler(boolean enabled) { + if (!enabled) { + return null; + } + return BloomFilter.builder(100, 0.01); + } + private byte[] toBytes(RowCompactedSerializer serializer, GenericRow row, int i) { row.setField(0, i); return serializer.serializeToBytes(row);