diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
new file mode 100644
index 000000000000..c9b827ee72f0
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * File index file format. Put all column and offset in the header.
+ *
+ *
+ * _______________________________________ _____________________
+ * | magic |version|head length |
+ * |-------------------------------------|
+ * | index type |body info size|
+ * |-------------------------------------|
+ * | column name 1 |start pos |length |
+ * |-------------------------------------| HEAD
+ * | column name 2 |start pos |length |
+ * |-------------------------------------|
+ * | column name 3 |start pos |length |
+ * |-------------------------------------|
+ * | ... |
+ * |-------------------------------------|
+ * | ... |
+ * |-------------------------------------|
+ * | redundant length |redundant bytes |
+ * |-------------------------------------| ---------------------
+ * | BODY |
+ * | BODY |
+ * | BODY | BODY
+ * | BODY |
+ * |_____________________________________| _____________________
+ *
+ * magic: 8 bytes long
+ * version: 4 bytes int
+ * head length: 4 bytes int
+ * index type: var bytes utf (length + bytes)
+ * body info size: 4 bytes int (how many column items below)
+ * column name: var bytes utf
+ * start pos: 4 bytes int
+ * length: 4 bytes int
+ * redundant length: 4 bytes int (for compatibility with later versions, in this version, content is zero)
+ * redundant bytes: var bytes (for compatibility with later version, in this version, is empty)
+ * BODY: column bytes + column bytes + column bytes + .......
+ *
+ *
+ */
+public final class FileIndexFormat {
+
+ private static final long MAGIC = 1493475289347502L;
+
+ enum Version {
+ V_1(1);
+
+ private final int version;
+
+ Version(int version) {
+ this.version = version;
+ }
+
+ public int version() {
+ return version;
+ }
+ }
+
+ public static Writer createWriter(OutputStream outputStream) {
+ return new Writer(outputStream);
+ }
+
+ public static Reader createReader(SeekableInputStream inputStream, RowType fileRowType) {
+ return new Reader(inputStream, fileRowType);
+ }
+
+ /** Writer for file index file. */
+ public static class Writer implements Closeable {
+
+ private final DataOutputStream dataOutputStream;
+
+ // for version compatible
+ private static final int REDUNDANT_LENGTH = 0;
+
+ public Writer(OutputStream outputStream) {
+ this.dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ public void writeColumnIndex(String indexType, Map bytesMap)
+ throws IOException {
+
+ Map> bodyInfo = new HashMap<>();
+
+ // construct body
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+ for (Map.Entry entry : bytesMap.entrySet()) {
+ int startPosition = baos.size();
+ baos.write(entry.getValue());
+ bodyInfo.put(entry.getKey(), Pair.of(startPosition, baos.size() - startPosition));
+ }
+ byte[] body = baos.toByteArray();
+
+ writeHead(indexType, bodyInfo);
+
+ // writeBody
+ dataOutputStream.write(body);
+ }
+
+ private void writeHead(String indexType, Map> bodyInfo)
+ throws IOException {
+
+ int headLength = calculateHeadLength(indexType, bodyInfo);
+
+ // writeMagic
+ dataOutputStream.writeLong(MAGIC);
+ // writeVersion
+ dataOutputStream.writeInt(Version.V_1.version());
+ // writeHeadLength
+ dataOutputStream.writeInt(headLength);
+ // writeIndexType
+ dataOutputStream.writeUTF(indexType);
+ // writeColumnSize
+ dataOutputStream.writeInt(bodyInfo.size());
+ // writeColumnInfo, offset = headLength
+ for (Map.Entry> entry : bodyInfo.entrySet()) {
+ dataOutputStream.writeUTF(entry.getKey());
+ dataOutputStream.writeInt(entry.getValue().getLeft() + headLength);
+ dataOutputStream.writeInt(entry.getValue().getRight());
+ }
+ // writeRedundantLength
+ dataOutputStream.writeInt(REDUNDANT_LENGTH);
+ }
+
+ private int calculateHeadLength(
+ String indexType, Map> bodyInfo) throws IOException {
+ // magic 8 bytes, version 4 bytes, head length 4 bytes,
+ // column size 4 bytes, body info start&end 8 bytes per
+ // item, redundant length 4 bytes;
+ int baseLength = 8 + 4 + 4 + 4 + bodyInfo.size() * 8 + 4;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(baos);
+ dataOutput.writeUTF(indexType);
+ for (String s : bodyInfo.keySet()) {
+ dataOutput.writeUTF(s);
+ }
+
+ return baseLength + baos.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(dataOutputStream);
+ }
+ }
+
+ /** Reader for file index file. */
+ public static class Reader implements Closeable {
+
+ private final SeekableInputStream seekableInputStream;
+ // get header and cache it.
+ private final Map> header = new HashMap<>();
+ private final Map fields = new HashMap<>();
+ private final String type;
+
+ public Reader(SeekableInputStream seekableInputStream, RowType fileRowType) {
+ this.seekableInputStream = seekableInputStream;
+ DataInputStream dataInputStream = new DataInputStream(seekableInputStream);
+ fileRowType.getFields().forEach(field -> this.fields.put(field.name(), field));
+ try {
+ long magic = dataInputStream.readLong();
+ if (magic != MAGIC) {
+ throw new RuntimeException("This file is not file index file.");
+ }
+
+ int version = dataInputStream.readInt();
+ if (version != Version.V_1.version()) {
+ throw new RuntimeException(
+ "This index file is version of "
+ + version
+ + ", not in supported version list ["
+ + Version.V_1.version()
+ + "]");
+ }
+
+ int headLength = dataInputStream.readInt();
+ byte[] head = new byte[headLength - 8 - 4 - 4];
+ dataInputStream.readFully(head);
+
+ try (DataInputStream dataInput =
+ new DataInputStream(new ByteArrayInputStream(head))) {
+ this.type = dataInput.readUTF();
+ int columnSize = dataInput.readInt();
+ for (int i = 0; i < columnSize; i++) {
+ this.header.put(
+ dataInput.readUTF(),
+ Pair.of(dataInput.readInt(), dataInput.readInt()));
+ }
+ }
+
+ } catch (IOException e) {
+ IOUtils.closeQuietly(seekableInputStream);
+ throw new RuntimeException(
+ "Exception happens while construct file index reader.", e);
+ }
+ }
+
+ public FileIndexReader readColumnIndex(String columnName) {
+
+ return readColumnInputStream(columnName)
+ .map(
+ serializedBytes ->
+ FileIndexer.create(type, fields.get(columnName).type())
+ .createReader()
+ .recoverFrom(serializedBytes))
+ .orElse(null);
+ }
+
+ @VisibleForTesting
+ Optional readColumnInputStream(String columnName) {
+ return Optional.ofNullable(header.getOrDefault(columnName, null))
+ .map(
+ startAndLength -> {
+ byte[] b = new byte[startAndLength.getRight()];
+ try {
+ seekableInputStream.seek(startAndLength.getLeft());
+ int n = 0;
+ int len = b.length;
+ // read fully until b is full else throw.
+ while (n < len) {
+ int count = seekableInputStream.read(b, n, len - n);
+ if (count < 0) {
+ throw new EOFException();
+ }
+ n += count;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return b;
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(seekableInputStream);
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
new file mode 100644
index 000000000000..b07c6b8f08fd
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Or;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utils to check secondary index (e.g. bloom filter) predicate. */
+public class FileIndexPredicate implements Closeable {
+
+ private final FileIndexFormat.Reader reader;
+ private final Map fieldPredicates = new HashMap<>();
+
+ public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) throws IOException {
+ this(fileIO.newInputStream(path), fileRowType);
+ }
+
+ public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
+ this(new ByteArraySeekableStream(serializedBytes), fileRowType);
+ }
+
+ public FileIndexPredicate(SeekableInputStream inputStream, RowType fileRowType) {
+ this.reader = FileIndexFormat.createReader(inputStream, fileRowType);
+ }
+
+ public boolean testPredicate(@Nullable Predicate filePredicate) {
+ if (filePredicate == null) {
+ return true;
+ }
+
+ Set requredFieldNames = getRequiredNames(filePredicate);
+
+ List testWorkers =
+ requredFieldNames.stream()
+ .map(
+ cname ->
+ fieldPredicates.computeIfAbsent(
+ cname,
+ k ->
+ new FileIndexFieldPredicate(
+ cname,
+ reader.readColumnIndex(cname))))
+ .collect(Collectors.toList());
+
+ for (FileIndexFieldPredicate testWorker : testWorkers) {
+ if (!testWorker.test(filePredicate)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private Set getRequiredNames(Predicate filePredicate) {
+ return filePredicate.visit(
+ new PredicateVisitor>() {
+ final Set names = new HashSet<>();
+
+ @Override
+ public Set visit(LeafPredicate predicate) {
+ names.add(predicate.fieldName());
+ return names;
+ }
+
+ @Override
+ public Set visit(CompoundPredicate predicate) {
+ for (Predicate child : predicate.children()) {
+ child.visit(this);
+ }
+ return names;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.reader.close();
+ }
+
+ /** Predicate test worker. */
+ private static class FileIndexFieldPredicate implements PredicateVisitor {
+
+ private final String columnName;
+ private final FileIndexReader fileIndexReader;
+
+ public FileIndexFieldPredicate(String columnName, FileIndexReader fileIndexReader) {
+ this.columnName = columnName;
+ this.fileIndexReader = fileIndexReader;
+ }
+
+ public Boolean test(Predicate predicate) {
+ return predicate.visit(this);
+ }
+
+ @Override
+ public Boolean visit(LeafPredicate predicate) {
+ if (columnName.equals(predicate.fieldName())) {
+ return predicate
+ .function()
+ .visit(
+ fileIndexReader,
+ new FieldRef(
+ predicate.index(), predicate.fieldName(), predicate.type()),
+ predicate.literals());
+ }
+ return true;
+ }
+
+ @Override
+ public Boolean visit(CompoundPredicate predicate) {
+
+ if (predicate.function() instanceof Or) {
+ for (Predicate predicate1 : predicate.children()) {
+ if (predicate1.visit(this)) {
+ return true;
+ }
+ }
+ return false;
+
+ } else {
+ for (Predicate predicate1 : predicate.children()) {
+ if (!predicate1.visit(this)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
new file mode 100644
index 000000000000..6d9404564127
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+
+import java.util.List;
+
+/**
+ * Read file index from serialized bytes. Return true, means we need to search this file, else means
+ * needn't.
+ */
+public interface FileIndexReader extends FunctionVisitor {
+
+ FileIndexReader recoverFrom(byte[] serializedBytes);
+
+ @Override
+ default Boolean visitIsNotNull(FieldRef fieldRef) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitIsNull(FieldRef fieldRef) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitLessThan(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitEqual(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
+ return true;
+ }
+
+ @Override
+ default Boolean visitIn(FieldRef fieldRef, List