diff --git a/docs/content/concepts/spec/fileindex.md b/docs/content/concepts/spec/fileindex.md
index 7134e021f71a..215a1c226f5d 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -94,6 +94,17 @@ Content of bloom filter index is simple:
This class use (64-bits) long hash. Store the num hash function (one integer) and bit set bytes only. Hash bytes type
(like varchar, binary, etc.) using xx hash, hash numeric type by [specified number hash](http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).
+## Column Index Bytes: DynamicBloomFilter
+
+Define `'file-index.dynamic-bloom-filter.columns'`.
+
+Content of dynamic bloom filter index is simple:
+- version 1 byte
+- numBloomFilter 4 bytes int
+- bloomFilterVectorSize 4 bytes int
+- numHashFunctions 4 bytes int
+- bloom filter bytes
+
## Column Index Bytes: Bitmap
Define `'file-index.bitmap.columns'`.
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndex.java
new file mode 100644
index 000000000000..435da407fa72
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndex.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.dynamicbloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.bloomfilter.FastHash;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.BloomFilter64;
+import org.apache.paimon.utils.DynamicBloomFilter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
+import static org.apache.paimon.fileindex.FileIndexResult.SKIP;
+
+public class DynamicBloomFilterFileIndex implements FileIndexer {
+
+ public static final int VERSION_1 = 1;
+
+ private static final int DEFAULT_ITEMS = 1_000_000;
+ private static final double DEFAULT_FPP = 0.1;
+ private static final int DEFAULT_MAX_ITEMS = 10_000_000;
+
+ private static final String ITEMS = "items";
+ private static final String FPP = "fpp";
+ private static final String MAX_ITEMS = "max_items";
+
+ private final DataType dataType;
+ private final int items;
+ private final double fpp;
+
+ private final int maxItems;
+
+ public DynamicBloomFilterFileIndex(DataType dataType, Options options) {
+ this.dataType = dataType;
+ this.items = options.getInteger(ITEMS, DEFAULT_ITEMS);
+ this.fpp = options.getDouble(FPP, DEFAULT_FPP);
+ this.maxItems = options.getInteger(MAX_ITEMS, DEFAULT_MAX_ITEMS);
+ }
+
+ @Override
+ public FileIndexWriter createWriter() {
+ return new DynamicBloomFilterFileIndex.Writer(dataType, items, fpp, maxItems);
+ }
+
+ @Override
+ public FileIndexReader createReader(SeekableInputStream inputStream, int start, int length) {
+ try {
+ inputStream.seek(start);
+ DataInput input = new DataInputStream(inputStream);
+ byte version = input.readByte();
+ if (version > VERSION_1) {
+ throw new RuntimeException(
+ String.format(
+ "read dynamicBloomFilter index file fail, "
+ + "your plugin version is lower than %d",
+ version));
+ }
+ int numBloomFilters = input.readInt();
+ int vectorSize = input.readInt();
+ int numHashFunctions = input.readInt();
+
+ BloomFilter64[] bloomFilter64s = new BloomFilter64[numBloomFilters];
+ for (int i = 0; i < numBloomFilters; i++) {
+ byte[] serializedBytes = new byte[vectorSize / Byte.SIZE];
+ input.readFully(serializedBytes);
+ BloomFilter64.BitSet bitSet = new BloomFilter64.BitSet(serializedBytes, 0);
+ bloomFilter64s[i] = new BloomFilter64(numHashFunctions, bitSet);
+ }
+ return new DynamicBloomFilterFileIndex.Reader(dataType, bloomFilter64s);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class Reader extends FileIndexReader {
+
+ private final DynamicBloomFilter dynamicBloomFilter;
+ private final FastHash hashFunction;
+
+ public Reader(DataType type, BloomFilter64[] bloomFilter64s) {
+ this.dynamicBloomFilter = new DynamicBloomFilter(bloomFilter64s);
+ this.hashFunction = FastHash.getHashFunction(type);
+ }
+
+ @Override
+ public FileIndexResult visitEqual(FieldRef fieldRef, Object key) {
+ return key == null || dynamicBloomFilter.testHash(hashFunction.hash(key))
+ ? REMAIN
+ : SKIP;
+ }
+ }
+
+ private static class Writer extends FileIndexWriter {
+
+ private final DynamicBloomFilter filter;
+ private final FastHash hashFunction;
+
+ public Writer(DataType type, int items, double fpp, int maxItems) {
+ this.filter = new DynamicBloomFilter(items, fpp, maxItems);
+ this.hashFunction = FastHash.getHashFunction(type);
+ }
+
+ @Override
+ public void write(Object key) {
+ if (key != null) {
+ filter.addHash(hashFunction.hash(key));
+ }
+ }
+
+ @Override
+ public byte[] serializedBytes() {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(bos);
+ BloomFilter64[] bloomFilterMatrix = filter.matrix();
+
+ // 1. write meta
+ out.writeByte(VERSION_1);
+ out.writeInt(bloomFilterMatrix.length);
+ // each bloom filter has same num of hashFunction and bitSet
+ out.writeInt(bloomFilterMatrix[0].getBitSet().bitSize());
+ out.writeInt(bloomFilterMatrix[0].getNumHashFunctions());
+
+ // 2. write each filter's bitset
+ for (BloomFilter64 filterMatrix : bloomFilterMatrix) {
+ byte[] serialized = new byte[filterMatrix.getBitSet().bitSize() / Byte.SIZE];
+ filterMatrix.getBitSet().toByteArray(serialized, 0, serialized.length);
+ out.write(serialized);
+ }
+ return bos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexFactory.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexFactory.java
new file mode 100644
index 000000000000..69dc7579d6fc
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.dynamicbloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.FileIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+public class DynamicBloomFilterFileIndexFactory implements FileIndexerFactory {
+
+ public static final String DYNAMIC_BLOOM_FILTER = "dynamic-bloom-filter";
+
+ @Override
+ public String identifier() {
+ return DYNAMIC_BLOOM_FILTER;
+ }
+
+ @Override
+ public FileIndexer create(DataType dataType, Options options) {
+ return new DynamicBloomFilterFileIndex(dataType, options);
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/DynamicBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/DynamicBloomFilter.java
new file mode 100644
index 000000000000..2dfbe3884dd9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DynamicBloomFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+/* This file is based on source code from the hudi Project (http://hudi.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Dynamic Bloom Filter. This is largely based of
+ * org.apache.hudi.common.bloom.InternalDynamicBloomFilter.
+ */
+public class DynamicBloomFilter {
+
+ /** Threshold for the maximum number of key to record in a dynamic Bloom filter row. */
+ private int items;
+
+ /** The number of keys recorded in the current standard active Bloom filter. */
+ private int currentNbRecord;
+
+ private int maxItems;
+ private boolean reachedMax = false;
+ private int curMatrixIndex = 0;
+ private double fpp;
+
+ /** The matrix of Bloom filter. */
+ private BloomFilter64[] matrix;
+
+ public DynamicBloomFilter(BloomFilter64[] bloomFilter64s) {
+ this.matrix = bloomFilter64s;
+ }
+
+ public DynamicBloomFilter(int items, double fpp, int maxItems) {
+ this.items = items;
+ this.currentNbRecord = 0;
+ this.maxItems = maxItems;
+ this.fpp = fpp;
+ matrix = new BloomFilter64[1];
+ matrix[0] = new BloomFilter64(items, fpp);
+ }
+
+ public void addHash(long hash64) {
+ BloomFilter64 bf = getActiveStandardBF();
+ if (bf == null) {
+ addRow();
+ bf = matrix[matrix.length - 1];
+ currentNbRecord = 0;
+ }
+ bf.addHash(hash64);
+ currentNbRecord++;
+ }
+
+ /** Adds a new row to this dynamic Bloom filter. */
+ private void addRow() {
+ BloomFilter64[] tmp = new BloomFilter64[matrix.length + 1];
+ System.arraycopy(matrix, 0, tmp, 0, matrix.length);
+ tmp[tmp.length - 1] = new BloomFilter64(items, fpp);
+ matrix = tmp;
+ }
+
+ public boolean testHash(long key) {
+ for (BloomFilter64 bloomFilter : matrix) {
+ if (bloomFilter.testHash(key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the active standard Bloom filter in this dynamic Bloom filter.
+ *
+ * @return BloomFilter64 The active standard Bloom filter. Null
otherwise.
+ */
+ private BloomFilter64 getActiveStandardBF() {
+ if (reachedMax) {
+ return matrix[curMatrixIndex++ % matrix.length];
+ }
+
+ if (currentNbRecord >= items && (matrix.length * items) < maxItems) {
+ return null;
+ } else if (currentNbRecord >= items && (matrix.length * items) >= maxItems) {
+ reachedMax = true;
+ return matrix[0];
+ }
+ return matrix[matrix.length - 1];
+ }
+
+ public BloomFilter64[] matrix() {
+ return matrix;
+ }
+}
diff --git a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
index 30c908c72381..b2f8c9dde230 100644
--- a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
+++ b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
@@ -15,4 +15,5 @@
org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
-org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
\ No newline at end of file
+org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
+org.apache.paimon.fileindex.dynamicbloomfilter.DynamicBloomFilterFileIndexFactory
\ No newline at end of file
diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexTest.java
new file mode 100644
index 000000000000..4395f99e5200
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/dynamicbloomfilter/DynamicBloomFilterFileIndexTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.dynamicbloomfilter;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.DynamicBloomFilter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/** Tests for {@link DynamicBloomFilterFileIndex}. */
+public class DynamicBloomFilterFileIndexTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testAddFindByRandom() {
+ DynamicBloomFilterFileIndex filter =
+ new DynamicBloomFilterFileIndex(
+ DataTypes.BYTES(),
+ new Options(
+ new HashMap() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ put("max_items", "50000");
+ }
+ }));
+ FileIndexWriter writer = filter.createWriter();
+ List testData = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ testData.add(random());
+ }
+
+ // test empty bytes
+ testData.add(new byte[0]);
+
+ testData.forEach(writer::write);
+
+ byte[] serializedBytes = writer.serializedBytes();
+ FileIndexReader reader =
+ filter.createReader(
+ new ByteArraySeekableStream(serializedBytes), 0, serializedBytes.length);
+
+ for (byte[] bytes : testData) {
+ Assertions.assertThat(reader.visitEqual(null, bytes).remain()).isTrue();
+ }
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ byte[] ra = random();
+ if (reader.visitEqual(null, ra).remain()) {
+ errorCount++;
+ }
+ }
+
+ // ffp should be less than 0.03
+ Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+ }
+
+ @Test
+ public void testAddFindByRandomLong() {
+ DynamicBloomFilterFileIndex filter =
+ new DynamicBloomFilterFileIndex(
+ DataTypes.BIGINT(),
+ new Options(
+ new HashMap() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ put("max_items", "50000");
+ }
+ }));
+ FileIndexWriter writer = filter.createWriter();
+ List testData = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ testData.add(RANDOM.nextLong());
+ }
+
+ testData.forEach(writer::write);
+
+ byte[] serializedBytes = writer.serializedBytes();
+ FileIndexReader reader =
+ filter.createReader(
+ new ByteArraySeekableStream(serializedBytes), 0, serializedBytes.length);
+
+ for (Long value : testData) {
+ Assertions.assertThat(reader.visitEqual(null, value).remain()).isTrue();
+ }
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ Long ra = RANDOM.nextLong();
+ if (reader.visitEqual(null, ra).remain()) {
+ errorCount++;
+ }
+ }
+
+ // ffp should be less than 0.03
+ Assertions.assertThat((double) errorCount / num).isLessThan(0.03);
+ }
+
+ @Test
+ public void testCompareFppWithBloomFilter() {
+ // When the amount of data written to the filter exceeds the number of items,
+ // the dynamic bloom filter has a smaller error rate than the bloom filter.
+
+ DynamicBloomFilterFileIndex dynamicFilter =
+ new DynamicBloomFilterFileIndex(
+ DataTypes.BIGINT(),
+ new Options(
+ new HashMap() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ put("max_items", "50000");
+ }
+ }));
+
+ BloomFilterFileIndex filter =
+ new BloomFilterFileIndex(
+ DataTypes.BIGINT(),
+ new Options(
+ new HashMap() {
+ {
+ put("items", "10000");
+ put("fpp", "0.02");
+ }
+ }));
+
+ FileIndexWriter bloomFilterWriter = filter.createWriter();
+ FileIndexWriter dynamicBloomFilterWriter = dynamicFilter.createWriter();
+
+ List testData = new ArrayList<>();
+
+ for (int i = 0; i < 20000; i++) {
+ testData.add(RANDOM.nextLong());
+ }
+ testData.forEach(bloomFilterWriter::write);
+ testData.forEach(dynamicBloomFilterWriter::write);
+
+ byte[] bloomFilterSerializedBytes = bloomFilterWriter.serializedBytes();
+ FileIndexReader filterReader =
+ filter.createReader(
+ new ByteArraySeekableStream(bloomFilterSerializedBytes),
+ 0,
+ bloomFilterSerializedBytes.length);
+
+ byte[] dynamicSerializedBytes = dynamicBloomFilterWriter.serializedBytes();
+ FileIndexReader dynamicFilterReader =
+ dynamicFilter.createReader(
+ new ByteArraySeekableStream(dynamicSerializedBytes),
+ 0,
+ dynamicSerializedBytes.length);
+
+ int errorCount = 0;
+ int num = 1000000;
+ for (int i = 0; i < num; i++) {
+ Long ra = RANDOM.nextLong();
+ if (filterReader.visitEqual(null, ra).remain()) {
+ errorCount++;
+ }
+ }
+ double bloomFilterError = (double) errorCount / num;
+
+ errorCount = 0;
+ for (int i = 0; i < num; i++) {
+ Long ra = RANDOM.nextLong();
+ if (dynamicFilterReader.visitEqual(null, ra).remain()) {
+ errorCount++;
+ }
+ }
+ double dynamicBloomFilterError = (double) errorCount / num;
+
+ // ffp should be less than 0.03
+ Assertions.assertThat(dynamicBloomFilterError).isLessThan(bloomFilterError);
+ }
+
+ @Test
+ public void testDynamicBloomFilterBoundSize() {
+
+ int maxItems = 1000;
+ int items = 100;
+ int batchSize = 100;
+ int count = 0;
+ int curFilterNum = 0;
+
+ DynamicBloomFilter dynamicBloomFilter = new DynamicBloomFilter(items, 0.001, maxItems);
+ for (int i = 0; i < 15; i++) {
+ for (int j = 0; j < batchSize; j++) {
+ dynamicBloomFilter.addHash(RANDOM.nextLong());
+ count++;
+ }
+ // When the amount of inserted data exceeds the maxItems, the number of bloom filters
+ // will not change.
+ if (count > maxItems) {
+ Assertions.assertThat(curFilterNum).isEqualTo(dynamicBloomFilter.matrix().length);
+ } else {
+ Assertions.assertThat(curFilterNum).isLessThan(dynamicBloomFilter.matrix().length);
+ curFilterNum = dynamicBloomFilter.matrix().length;
+ }
+ }
+ }
+
+ private byte[] random() {
+ byte[] b = new byte[Math.abs(RANDOM.nextInt(400) + 1)];
+ RANDOM.nextBytes(b);
+ return b;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 01d4e89af95d..fe84cb8a646b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -29,6 +29,7 @@
import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory;
import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory;
import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory;
+import org.apache.paimon.fileindex.dynamicbloomfilter.DynamicBloomFilterFileIndexFactory;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -582,6 +583,154 @@ public void testBloomFilterInDisk() throws Exception {
reader.forEachRemaining(row -> assertThat(row.getString(1).toString()).isEqualTo("b"));
}
+ @Test
+ public void testDynamicBloomFilterInDisk() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("index_column", DataTypes.STRING())
+ .field("index_column2", DataTypes.INT())
+ .field("index_column3", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ "index_column, index_column2, index_column3");
+ options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ TableScan.Plan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1, BinaryString.fromString("b")))
+ .plan();
+ List metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit) split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(2);
+
+ RecordReader reader =
+ table.newRead()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1, BinaryString.fromString("b")))
+ .createReader(plan.splits());
+ reader.forEachRemaining(row -> assertThat(row.getString(1).toString()).isEqualTo("b"));
+ }
+
+ @Test
+ public void testDynamicBloomFilterInMemory() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("index_column", DataTypes.STRING())
+ .field("index_column2", DataTypes.INT())
+ .field("index_column3", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ "index_column, index_column2, index_column3");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column.max_items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column2.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column2.max_items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column3.items",
+ "150");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + DynamicBloomFilterFileIndexFactory
+ .DYNAMIC_BLOOM_FILTER
+ + ".index_column3.max_items",
+ "150");
+ options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ TableScan.Plan plan =
+ table.newScan()
+ .withFilter(
+ new PredicateBuilder(rowType)
+ .equal(1, BinaryString.fromString("b")))
+ .plan();
+ List metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit) split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(1);
+ }
+
@Test
public void testBSIAndBitmapIndexInMemory() throws Exception {
RowType rowType =
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index fa635e2ab666..26e5cfad99f5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -963,6 +963,82 @@ public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
}
+ @Test
+ public void testDeletionVectorsWithDynamicBloomFileIndexInMeta() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+ conf.set("file-index.dynamic-bloom-filter.columns", "b");
+ conf.set("file-index.dynamic-bloom-filter.b.items", "20");
+ conf.set("file-index.dynamic-bloom-filter.b.max_items", "20");
+ });
+
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 1, 300L));
+ write.write(rowData(1, 2, 400L));
+ write.write(rowData(1, 3, 200L));
+ write.write(rowData(1, 4, 500L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 5, 100L));
+ write.write(rowData(1, 6, 600L));
+ write.write(rowData(1, 7, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ Predicate predicate = builder.equal(2, 300L);
+
+ List splits =
+ toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());
+
+ assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testDeletionVectorsWithDynamicBloomFileIndexInFile() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+ conf.set("file-index.dynamic-bloom-filter.columns", "b");
+ });
+
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 1, 300L));
+ write.write(rowData(1, 2, 400L));
+ write.write(rowData(1, 3, 200L));
+ write.write(rowData(1, 4, 500L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 5, 100L));
+ write.write(rowData(1, 6, 600L));
+ write.write(rowData(1, 7, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
+ TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "1|1|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|2|400|binary|varbinary|mapKey:mapVal|multiset",
+ "1|3|200|binary|varbinary|mapKey:mapVal|multiset",
+ "1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testDeletionVectorsWithBitmapFileIndexInFile() throws Exception {
FileStoreTable table =
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkFileIndexTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkFileIndexTest.java
new file mode 100644
index 000000000000..88a9b267f22e
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkFileIndexTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FlinkFileIndexTest extends CatalogITCaseBase {
+
+ @Test
+ public void testDynamicBloomFileIndex() throws Catalog.TableNotExistException, IOException {
+ sql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v STRING,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '-1',"
+ + " 'file-index.dynamic-bloom-filter.columns'='k,v'"
+ + ")");
+
+ sql(
+ "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')");
+
+ FileStoreTable table = paimonTable("T");
+ List list = table.store().newScan().plan().files();
+ for (ManifestEntry entry : list) {
+ List extraFiles =
+ entry.file().extraFiles().stream()
+ .filter(s -> s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+
+ Assertions.assertThat(extraFiles.size()).isEqualTo(1);
+
+ String file = extraFiles.get(0);
+
+ Path indexFilePath =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(entry.partition(), entry.bucket())
+ .toPath(file);
+ try (FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ table.fileIO().newInputStream(indexFilePath), table.rowType())) {
+ Set readerSetK = reader.readColumnIndex("k");
+ Assertions.assertThat(readerSetK.size()).isEqualTo(1);
+
+ Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0, 1);
+ for (FileIndexReader fileIndexReader : readerSetK) {
+ Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isTrue();
+ }
+
+ predicateK = new PredicateBuilder(table.rowType()).equal(0, 4);
+ for (FileIndexReader fileIndexReader : readerSetK) {
+ Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isFalse();
+ }
+
+ Set readerSetV = reader.readColumnIndex("v");
+ Assertions.assertThat(readerSetV.size()).isEqualTo(1);
+
+ Predicate predicateV =
+ new PredicateBuilder(table.rowType())
+ .equal(1, BinaryString.fromString("100"));
+ for (FileIndexReader fileIndexReader : readerSetV) {
+ Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isTrue();
+ }
+
+ predicateV =
+ new PredicateBuilder(table.rowType())
+ .equal(1, BinaryString.fromString("101"));
+ for (FileIndexReader fileIndexReader : readerSetV) {
+ Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isFalse();
+ }
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 6ca78b088fb7..481ccf9d08ef 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -358,11 +358,21 @@ public void testLimit() {
@Test
public void testFileIndex() {
batchSql(
- "INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')");
+ "INSERT INTO bloom_index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')");
batchSql(
- "INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'd', 'BBB'), (3, 'd', 'BBB')");
+ "INSERT INTO bloom_index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'd', 'BBB'), (3, 'd', 'BBB')");
+ assertThat(
+ batchSql(
+ "SELECT * FROM bloom_index_table WHERE indexc = 'c' and (id = 2 or id = 3)"))
+ .containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3, "c", "BBB"));
- assertThat(batchSql("SELECT * FROM index_table WHERE indexc = 'c' and (id = 2 or id = 3)"))
+ batchSql(
+ "INSERT INTO dynamic_bloom_index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')");
+ batchSql(
+ "INSERT INTO dynamic_bloom_index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'd', 'BBB'), (3, 'd', 'BBB')");
+ assertThat(
+ batchSql(
+ "SELECT * FROM dynamic_bloom_index_table WHERE indexc = 'c' and (id = 2 or id = 3)"))
.containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3, "c", "BBB"));
}
@@ -460,7 +470,8 @@ protected List ddl() {
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP) WITH ('bucket' = '-1')",
- "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500')");
+ "CREATE TABLE IF NOT EXISTS bloom_index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500')",
+ "CREATE TABLE IF NOT EXISTS dynamic_bloom_index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500','file-index.bloom-filter.indexc.max_items' = '1000')");
}
@Override