getTimeStampMapper(int precision) {
+ return o -> {
+ if (o == null) {
+ return null;
+ } else if (precision <= 3) {
+ return ((Timestamp) o).getMillisecond();
+ } else {
+ return ((Timestamp) o).toMicros();
+ }
+ };
+ }
+ });
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java
new file mode 100644
index 000000000000..aa7a92b78525
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bsi;
+
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fileindex.FileIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+/** Factory to create {@link BitSliceIndexBitmapFileIndex}. */
+public class BitSliceIndexBitmapFileIndexFactory implements FileIndexerFactory {
+
+ public static final String BSI_INDEX = "bsi";
+
+ @Override
+ public String identifier() {
+ return BSI_INDEX;
+ }
+
+ @Override
+ public FileIndexer create(DataType dataType, Options options) {
+ return new BitSliceIndexBitmapFileIndex(dataType, options);
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java b/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java
new file mode 100644
index 000000000000..662d791d1232
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmap.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/* This file is based on source code from the RoaringBitmap Project (http://roaringbitmap.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A bit slice index compressed bitmap. */
+public class BitSliceIndexRoaringBitmap {
+
+ public static final byte VERSION_1 = 1;
+
+ public static final BitSliceIndexRoaringBitmap EMPTY =
+ new BitSliceIndexRoaringBitmap(0, new RoaringBitmap32(), new RoaringBitmap32[] {});
+
+ private final long min;
+ private final RoaringBitmap32 ebm;
+ private final RoaringBitmap32[] slices;
+
+ private BitSliceIndexRoaringBitmap(long min, RoaringBitmap32 ebm, RoaringBitmap32[] slices) {
+ this.min = min;
+ this.ebm = ebm;
+ this.slices = slices;
+ }
+
+ public RoaringBitmap32 eq(long predicate) {
+ return oNeilCompare(Operation.EQ, predicate - min, null);
+ }
+
+ public RoaringBitmap32 lt(long predicate) {
+ return oNeilCompare(Operation.LT, predicate - min, null);
+ }
+
+ public RoaringBitmap32 lte(long predicate) {
+ return oNeilCompare(Operation.LTE, predicate - min, null);
+ }
+
+ public RoaringBitmap32 gt(long predicate) {
+ return oNeilCompare(Operation.GT, predicate - min, null);
+ }
+
+ public RoaringBitmap32 gte(long predicate) {
+ return oNeilCompare(Operation.GTE, predicate - min, null);
+ }
+
+ public RoaringBitmap32 isNotNull() {
+ return ebm.clone();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BitSliceIndexRoaringBitmap that = (BitSliceIndexRoaringBitmap) o;
+ return min == that.min
+ && Objects.equals(ebm, that.ebm)
+ && Arrays.equals(slices, that.slices);
+ }
+
+ /**
+ * O'Neil bit-sliced index compare algorithm.
+ *
+ * See Improved query performance with
+ * variant indexes
+ *
+ * @param operation compare operation
+ * @param predicate the value we found filter
+ * @param foundSet rid set we want compare, using RoaringBitmap to express
+ * @return rid set we found in this bsi with giving conditions, using RoaringBitmap to express
+ */
+ private RoaringBitmap32 oNeilCompare(
+ Operation operation, long predicate, RoaringBitmap32 foundSet) {
+ RoaringBitmap32 fixedFoundSet = foundSet == null ? ebm : foundSet;
+ RoaringBitmap32 gt = new RoaringBitmap32();
+ RoaringBitmap32 lt = new RoaringBitmap32();
+ RoaringBitmap32 eq = ebm;
+
+ for (int i = slices.length - 1; i >= 0; i--) {
+ long bit = (predicate >> i) & 1;
+ if (bit == 1) {
+ lt = RoaringBitmap32.or(lt, RoaringBitmap32.andNot(eq, slices[i]));
+ eq = RoaringBitmap32.and(eq, slices[i]);
+ } else {
+ gt = RoaringBitmap32.or(gt, RoaringBitmap32.and(eq, slices[i]));
+ eq = RoaringBitmap32.andNot(eq, slices[i]);
+ }
+ }
+
+ eq = RoaringBitmap32.and(fixedFoundSet, eq);
+ switch (operation) {
+ case EQ:
+ return eq;
+ case NEQ:
+ return RoaringBitmap32.andNot(fixedFoundSet, eq);
+ case GT:
+ return RoaringBitmap32.and(gt, fixedFoundSet);
+ case LT:
+ return RoaringBitmap32.and(lt, fixedFoundSet);
+ case LTE:
+ return RoaringBitmap32.and(RoaringBitmap32.or(lt, eq), fixedFoundSet);
+ case GTE:
+ return RoaringBitmap32.and(RoaringBitmap32.or(gt, eq), fixedFoundSet);
+ default:
+ throw new IllegalArgumentException("not support operation: " + operation);
+ }
+ }
+
+ /** Specifies O'Neil compare algorithm operation. */
+ private enum Operation {
+ EQ,
+ NEQ,
+ LTE,
+ LT,
+ GTE,
+ GT
+ }
+
+ public static BitSliceIndexRoaringBitmap map(DataInput in) throws IOException {
+ int version = in.readByte();
+ if (version > VERSION_1) {
+ throw new RuntimeException(
+ String.format(
+ "deserialize bsi index fail, " + "your plugin version is lower than %d",
+ version));
+ }
+
+ // deserialize min
+ long min = in.readLong();
+
+ // deserialize ebm
+ RoaringBitmap32 ebm = new RoaringBitmap32();
+ ebm.deserialize(in);
+
+ // deserialize slices
+ RoaringBitmap32[] slices = new RoaringBitmap32[in.readInt()];
+ for (int i = 0; i < slices.length; i++) {
+ RoaringBitmap32 rb = new RoaringBitmap32();
+ rb.deserialize(in);
+ slices[i] = rb;
+ }
+
+ return new BitSliceIndexRoaringBitmap(min, ebm, slices);
+ }
+
+ /** A Builder for {@link BitSliceIndexRoaringBitmap}. */
+ public static class Appender {
+ private final long min;
+ private final long max;
+ private final RoaringBitmap32 ebm;
+ private final RoaringBitmap32[] slices;
+
+ public Appender(long min, long max) {
+ if (min < 0) {
+ throw new IllegalArgumentException("values should be non-negative");
+ }
+ if (min > max) {
+ throw new IllegalArgumentException("min should be less than max");
+ }
+
+ this.min = min;
+ this.max = max;
+ this.ebm = new RoaringBitmap32();
+ this.slices = new RoaringBitmap32[64 - Long.numberOfLeadingZeros(max - min)];
+ for (int i = 0; i < slices.length; i++) {
+ slices[i] = new RoaringBitmap32();
+ }
+ }
+
+ public void append(int rid, long value) {
+ if (value > max) {
+ throw new IllegalArgumentException(String.format("value %s is too large", value));
+ }
+
+ if (ebm.contains(rid)) {
+ throw new IllegalArgumentException(String.format("rid=%s is already exists", rid));
+ }
+
+ // reduce the number of slices
+ value = value - min;
+
+ // only bit=1 need to set
+ while (value != 0) {
+ slices[Long.numberOfTrailingZeros(value)].add(rid);
+ value &= (value - 1);
+ }
+ ebm.add(rid);
+ }
+
+ public boolean isNotEmpty() {
+ return !ebm.isEmpty();
+ }
+
+ public void serialize(DataOutput out) throws IOException {
+ out.writeByte(VERSION_1);
+ out.writeLong(min);
+ ebm.serialize(out);
+ out.writeInt(slices.length);
+ for (RoaringBitmap32 slice : slices) {
+ slice.serialize(out);
+ }
+ }
+
+ public BitSliceIndexRoaringBitmap build() throws IOException {
+ return new BitSliceIndexRoaringBitmap(min, ebm, slices);
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 7b86bae8a280..f9232dc829f5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -68,6 +68,10 @@ public long getCardinality() {
return roaringBitmap.getLongCardinality();
}
+ public RoaringBitmap32 clone() {
+ return new RoaringBitmap32(roaringBitmap.clone());
+ }
+
public void serialize(DataOutput out) throws IOException {
roaringBitmap.runOptimize();
roaringBitmap.serialize(out);
@@ -149,4 +153,8 @@ public RoaringBitmap next() {
}
}));
}
+
+ public static RoaringBitmap32 andNot(final RoaringBitmap32 x1, final RoaringBitmap32 x2) {
+ return new RoaringBitmap32(RoaringBitmap.andNot(x1.roaringBitmap, x2.roaringBitmap));
+ }
}
diff --git a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
index 8a899eb232b8..30c908c72381 100644
--- a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
+++ b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.fileindex.FileIndexerFactory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
-org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
\ No newline at end of file
+org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
+org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
\ No newline at end of file
diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java
new file mode 100644
index 000000000000..594ac40b9e1d
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndexTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bsi;
+
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResultLazy;
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test for {@link BitSliceIndexBitmapFileIndex}. */
+public class BitSliceIndexBitmapFileIndexTest {
+
+ @Test
+ public void testBitSliceIndexMix() {
+ IntType intType = new IntType();
+ FieldRef fieldRef = new FieldRef(0, "", intType);
+ BitSliceIndexBitmapFileIndex bsiFileIndex = new BitSliceIndexBitmapFileIndex(intType, null);
+ FileIndexWriter writer = bsiFileIndex.createWriter();
+
+ Object[] arr = {1, 2, null, -2, -2, -1, null, 2, 0, 5, null};
+
+ for (Object o : arr) {
+ writer.write(o);
+ }
+ byte[] bytes = writer.serializedBytes();
+ ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+ FileIndexReader reader = bsiFileIndex.createReader(stream, 0, bytes.length);
+
+ // test eq
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 100)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+
+ // test neq
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4, 5, 8, 9));
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7, 8, 9));
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 100)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+
+ // test in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitIn(fieldRef, Arrays.asList(-1, 1, 2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7));
+
+ // test not in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitNotIn(fieldRef, Arrays.asList(-1, 1, 2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 8, 9));
+
+ // test null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(2, 6, 10));
+
+ // test is not null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNotNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+
+ // test lt
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4, 5, 8));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
+
+ // test gt
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 5, 7, 8, 9));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 7, 8, 9));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(9));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 7, 9));
+ }
+
+ @Test
+ public void testBitSliceIndexPositiveOnly() {
+ IntType intType = new IntType();
+ FieldRef fieldRef = new FieldRef(0, "", intType);
+ BitSliceIndexBitmapFileIndex bsiFileIndex = new BitSliceIndexBitmapFileIndex(intType, null);
+ FileIndexWriter writer = bsiFileIndex.createWriter();
+
+ Object[] arr = {0, 1, null, 3, 4, 5, 6, 0, null};
+
+ for (Object o : arr) {
+ writer.write(o);
+ }
+ byte[] bytes = writer.serializedBytes();
+ ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+ FileIndexReader reader = bsiFileIndex.createReader(stream, 0, bytes.length);
+
+ // test eq
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 0)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1));
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+
+ // test neq
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, 3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 4, 5, 6, 7));
+
+ // test in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitIn(fieldRef, Arrays.asList(-1, 1, 2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3));
+
+ // test not in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitNotIn(fieldRef, Arrays.asList(-1, 1, 2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 4, 5, 6, 7));
+
+ // test null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(2, 8));
+
+ // test is not null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNotNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+
+ // test lt
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+
+ // test gt
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5, 6));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6));
+ }
+
+ @Test
+ public void testBitSliceIndexNegativeOnly() {
+ IntType intType = new IntType();
+ FieldRef fieldRef = new FieldRef(0, "", intType);
+ BitSliceIndexBitmapFileIndex bsiFileIndex = new BitSliceIndexBitmapFileIndex(intType, null);
+ FileIndexWriter writer = bsiFileIndex.createWriter();
+
+ Object[] arr = {null, -1, null, -3, -4, -5, -6, -1, null};
+
+ for (Object o : arr) {
+ writer.write(o);
+ }
+ byte[] bytes = writer.serializedBytes();
+ ByteArraySeekableStream stream = new ByteArraySeekableStream(bytes);
+ FileIndexReader reader = bsiFileIndex.createReader(stream, 0, bytes.length);
+
+ // test eq
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+ assertThat(((BitmapIndexResultLazy) reader.visitEqual(fieldRef, -1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+
+ // test neq
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, -2)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitNotEqual(fieldRef, -3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 4, 5, 6, 7));
+
+ // test in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitIn(fieldRef, Arrays.asList(-1, -4, -2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 4, 7));
+
+ // test not in
+ assertThat(
+ ((BitmapIndexResultLazy)
+ reader.visitNotIn(fieldRef, Arrays.asList(-1, -4, -2, 3)))
+ .get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 5, 6));
+
+ // test null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(0, 2, 8));
+
+ // test is not null
+ assertThat(((BitmapIndexResultLazy) reader.visitIsNotNull(fieldRef)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+
+ // test lt
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, -3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(4, 5, 6));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, -3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5, 6));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessThan(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitLessOrEqual(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 4, 5, 6, 7));
+
+ // test gt
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, -3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, -3)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf(1, 3, 7));
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterThan(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+ assertThat(((BitmapIndexResultLazy) reader.visitGreaterOrEqual(fieldRef, 1)).get())
+ .isEqualTo(RoaringBitmap32.bitmapOf());
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java
new file mode 100644
index 000000000000..8c4a27d4351e
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/BitSliceIndexRoaringBitmapTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BitSliceIndexRoaringBitmap}. */
+public class BitSliceIndexRoaringBitmapTest {
+
+ private long base;
+ private BitSliceIndexRoaringBitmap bsi;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ this.base = System.currentTimeMillis();
+ BitSliceIndexRoaringBitmap.Appender appender =
+ new BitSliceIndexRoaringBitmap.Appender(base, toPredicate(100));
+ IntStream.range(0, 31).forEach(x -> appender.append(x, toPredicate(x)));
+ IntStream.range(51, 100).forEach(x -> appender.append(x, toPredicate(x)));
+ appender.append(100, toPredicate(30));
+ this.bsi = appender.build();
+ }
+
+ @Test
+ public void testSerde() throws IOException {
+ BitSliceIndexRoaringBitmap.Appender appender =
+ new BitSliceIndexRoaringBitmap.Appender(0, toPredicate(100));
+ IntStream.range(0, 31).forEach(x -> appender.append(x, toPredicate(x)));
+ IntStream.range(51, 100).forEach(x -> appender.append(x, toPredicate(x)));
+ appender.append(100, toPredicate(30));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ appender.serialize(new DataOutputStream(out));
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ assertThat(BitSliceIndexRoaringBitmap.map(new DataInputStream(in)))
+ .isEqualTo(appender.build());
+ }
+
+ @Test
+ public void testEQ() {
+ assertThat(bsi.eq(toPredicate(1))).isEqualTo(RoaringBitmap32.bitmapOf(1));
+ assertThat(bsi.eq(toPredicate(32))).isEqualTo(RoaringBitmap32.bitmapOf());
+ assertThat(bsi.eq(toPredicate(30))).isEqualTo(RoaringBitmap32.bitmapOf(30, 100));
+ }
+
+ @Test
+ public void testLT() {
+ assertThat(bsi.lt(toPredicate(30)))
+ .isEqualTo(RoaringBitmap32.bitmapOf(IntStream.range(0, 30).toArray()));
+ assertThat(bsi.lt(toPredicate(45)))
+ .isEqualTo(
+ RoaringBitmap32.bitmapOf(
+ IntStream.concat(IntStream.range(0, 31), IntStream.range(100, 101))
+ .toArray()));
+ }
+
+ @Test
+ public void testLTE() {
+ RoaringBitmap32 expected =
+ RoaringBitmap32.bitmapOf(
+ IntStream.concat(IntStream.range(0, 31), IntStream.range(100, 101))
+ .toArray());
+ assertThat(bsi.lte(toPredicate(30))).isEqualTo(expected);
+ assertThat(bsi.lte(toPredicate(45))).isEqualTo(expected);
+ }
+
+ @Test
+ public void testGT() {
+ RoaringBitmap32 expected = RoaringBitmap32.bitmapOf(IntStream.range(51, 100).toArray());
+ assertThat(bsi.gt(toPredicate(30))).isEqualTo(expected);
+ assertThat(bsi.gt(toPredicate(45))).isEqualTo(expected);
+ }
+
+ @Test
+ public void testGTE() {
+ assertThat(bsi.gte(toPredicate(30)))
+ .isEqualTo(
+ RoaringBitmap32.bitmapOf(
+ IntStream.concat(IntStream.range(30, 31), IntStream.range(51, 101))
+ .toArray()));
+ assertThat(bsi.gte(toPredicate(45)))
+ .isEqualTo(RoaringBitmap32.bitmapOf(IntStream.range(51, 100).toArray()));
+ }
+
+ @Test
+ public void testIsNotNull() {
+ assertThat(bsi.isNotNull())
+ .isEqualTo(
+ RoaringBitmap32.bitmapOf(
+ IntStream.concat(IntStream.range(0, 31), IntStream.range(51, 101))
+ .toArray()));
+ }
+
+ private long toPredicate(long predicate) {
+ return base + predicate;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index e6503cc401b3..81dd30262058 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -26,7 +26,9 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory;
import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory;
+import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -559,6 +561,136 @@ public void testBloomFilterInDisk() throws Exception {
reader.forEachRemaining(row -> assertThat(row.getString(1).toString()).isEqualTo("b"));
}
+ @Test
+ public void testBSIAndBitmapIndexInMemory() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("event", DataTypes.STRING())
+ .field("price", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BitmapFileIndexFactory.BITMAP_INDEX
+ + "."
+ + CoreOptions.COLUMNS,
+ "event");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+ + "."
+ + CoreOptions.COLUMNS,
+ "price");
+ options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 MB");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ List result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
+ write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L));
+ write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ // test bitmap index and bsi index
+ Predicate predicate =
+ PredicateBuilder.and(
+ new PredicateBuilder(rowType).equal(1, BinaryString.fromString("C")),
+ new PredicateBuilder(rowType).greaterThan(2, 3L));
+ TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+ List metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit) split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(1);
+
+ RecordReader reader =
+ table.newRead().withFilter(predicate).createReader(plan.splits());
+ reader.forEachRemaining(
+ row -> {
+ assertThat(row.getString(1).toString()).isEqualTo("C");
+ assertThat(row.getLong(2)).isEqualTo(4L);
+ });
+ }
+
+ @Test
+ public void testBSIAndBitmapIndexInDisk() throws Exception {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("event", DataTypes.STRING())
+ .field("price", DataTypes.BIGINT())
+ .build();
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ rowType,
+ options -> {
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BitmapFileIndexFactory.BITMAP_INDEX
+ + "."
+ + CoreOptions.COLUMNS,
+ "event");
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+ + "."
+ + CoreOptions.COLUMNS,
+ "price");
+ options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 B");
+ });
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ List result = new ArrayList<>();
+ write.write(GenericRow.of(1, BinaryString.fromString("A"), 4L));
+ write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L));
+ write.write(GenericRow.of(1, BinaryString.fromString("B"), 3L));
+ write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(GenericRow.of(1, BinaryString.fromString("C"), 4L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+
+ // test bitmap index and bsi index
+ Predicate predicate =
+ PredicateBuilder.and(
+ new PredicateBuilder(rowType).equal(1, BinaryString.fromString("C")),
+ new PredicateBuilder(rowType).greaterThan(2, 3L));
+ TableScan.Plan plan = table.newScan().withFilter(predicate).plan();
+ List metas =
+ plan.splits().stream()
+ .flatMap(split -> ((DataSplit) split).dataFiles().stream())
+ .collect(Collectors.toList());
+ assertThat(metas.size()).isEqualTo(2);
+
+ RecordReader reader =
+ table.newRead().withFilter(predicate).createReader(plan.splits());
+ reader.forEachRemaining(
+ row -> {
+ assertThat(row.getString(1).toString()).isEqualTo("C");
+ assertThat(row.getLong(2)).isEqualTo(4L);
+ });
+ }
+
@Test
public void testWithShardAppendTable() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1));
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 2251619c4d60..806f1f62864d 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -119,6 +119,32 @@ public void testReadWriteTableWithBitmapIndex() throws Catalog.TableNotExistExce
});
}
+ @Test
+ public void testReadWriteTableWithBitSliceIndex() throws Catalog.TableNotExistException {
+
+ spark.sql(
+ "CREATE TABLE T(a int) TBLPROPERTIES ("
+ + "'file-index.bsi.columns'='a',"
+ + "'file-index.in-manifest-threshold'='1B');");
+ spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");
+
+ // check query result
+ List rows = spark.sql("SELECT a FROM T where a>=3;").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[3], [4], [5]]");
+
+ // check index reader
+ foreachIndexReader(
+ fileIndexReader -> {
+ FileIndexResult fileIndexResult =
+ fileIndexReader.visitGreaterOrEqual(
+ new FieldRef(0, "", new IntType()), 3);
+ assertThat(fileIndexResult).isInstanceOf(BitmapIndexResultLazy.class);
+ RoaringBitmap32 roaringBitmap32 =
+ ((BitmapIndexResultLazy) fileIndexResult).get();
+ assertThat(roaringBitmap32).isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
+ });
+ }
+
protected void foreachIndexReader(Consumer consumer)
throws Catalog.TableNotExistException {
Path tableRoot = fileSystemCatalog.getTableLocation(Identifier.create("db", "T"));
@@ -128,7 +154,7 @@ protected void foreachIndexReader(Consumer consumer)
tableRoot,
RowType.of(),
new CoreOptions(new Options()).partitionDefaultName(),
- CoreOptions.FILE_FORMAT.defaultValue().toString(),
+ CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
@@ -154,12 +180,11 @@ protected void foreachIndexReader(Consumer consumer)
.collect(Collectors.toList());
// assert index file exist and only one index file
assert indexFiles.size() == 1;
- try {
- FileIndexFormat.Reader reader =
- FileIndexFormat.createReader(
- fileIO.newInputStream(
- dataFilePathFactory.toPath(indexFiles.get(0))),
- tableSchema.logicalRowType());
+ try (FileIndexFormat.Reader reader =
+ FileIndexFormat.createReader(
+ fileIO.newInputStream(
+ dataFilePathFactory.toPath(indexFiles.get(0))),
+ tableSchema.logicalRowType())) {
Optional fileIndexReader =
reader.readColumnIndex("a").stream().findFirst();
// assert index reader exist