diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 8d39919d5dc4..4c42bbfbbb58 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -206,6 +206,18 @@
Boolean |
Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys. |
+
+ file-index.in-manifest-threshold |
+ 500 bytes |
+ MemorySize |
+ The threshold to store file index bytes in manifest. |
+
+
+ file-index.read.enabled |
+ true |
+ Boolean |
+ Whether enabled read file index. |
+
file-reader-async-threshold |
10 mb |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0d78d2ec85e7..b73133baef2f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,6 +22,7 @@
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
@@ -70,6 +71,10 @@ public class CoreOptions implements Serializable {
public static final String DISTINCT = "distinct";
+ public static final String FILE_INDEX = "file-index";
+
+ public static final String COLUMNS = "columns";
+
public static final ConfigOption BUCKET =
key("bucket")
.intType()
@@ -135,6 +140,18 @@ public class CoreOptions implements Serializable {
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());
+ public static final ConfigOption FILE_INDEX_IN_MANIFEST_THRESHOLD =
+ key("file-index.in-manifest-threshold")
+ .memoryType()
+ .defaultValue(MemorySize.parse("500 B"))
+ .withDescription("The threshold to store file index bytes in manifest.");
+
+ public static final ConfigOption FILE_INDEX_READ_ENABLED =
+ key("file-index.read.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether enabled read file index.");
+
public static final ConfigOption MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
@@ -1703,6 +1720,86 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}
+ public FileIndexOptions indexColumnsOptions() {
+ String fileIndexPrefix = FILE_INDEX + ".";
+ String fileIndexColumnSuffix = "." + COLUMNS;
+
+ FileIndexOptions fileIndexOptions = new FileIndexOptions(fileIndexInManifestThreshold());
+ for (Map.Entry entry : options.toMap().entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(fileIndexPrefix)) {
+ // start with file-index, decode this option
+ if (key.endsWith(fileIndexColumnSuffix)) {
+ // if end with .column, set up indexes
+ String indexType =
+ key.substring(
+ fileIndexPrefix.length(),
+ key.length() - fileIndexColumnSuffix.length());
+ String[] names = entry.getValue().split(",");
+ for (String name : names) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException(
+ "Wrong option in " + key + ", should not have empty column");
+ }
+ fileIndexOptions.computeIfAbsent(name.trim(), indexType);
+ }
+ } else {
+ // else, it must be an option
+ String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
+ if (kv.length != 3) {
+ continue;
+ }
+ String indexType = kv[0];
+ String cname = kv[1];
+ String opkey = kv[2];
+
+ if (fileIndexOptions.get(cname, indexType) == null) {
+ // if indexes have not set, find .column in options, then set them
+ String columns =
+ options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix);
+ if (columns == null) {
+ continue;
+ }
+ String[] names = columns.split(",");
+ boolean foundTarget = false;
+ for (String name : names) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException(
+ "Wrong option in "
+ + key
+ + ", should not have empty column");
+ }
+ String tname = name.trim();
+ if (cname.equals(tname)) {
+ foundTarget = true;
+ }
+ fileIndexOptions.computeIfAbsent(name.trim(), indexType);
+ }
+ if (!foundTarget) {
+ throw new IllegalArgumentException(
+ "Wrong option in "
+ + key
+ + ", can't found column "
+ + cname
+ + " in "
+ + columns);
+ }
+ }
+ fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue());
+ }
+ }
+ }
+ return fileIndexOptions;
+ }
+
+ public long fileIndexInManifestThreshold() {
+ return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
+ }
+
+ public boolean fileIndexReadEnabled() {
+ return options.get(FILE_INDEX_READ_ENABLED);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
index ec809dff4084..4f03faf9062f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java
@@ -46,12 +46,12 @@
* File index file format. Put all column and offset in the header.
*
*
- * _______________________________________ _____________________
+ * _____________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
- * | column size |
+ * | column number |
* |-------------------------------------|
- * | column 1 | index size |
+ * | column 1 | index number |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
@@ -59,7 +59,7 @@
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
- * | column 2 | index size |
+ * | column 2 | index number |
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
@@ -82,14 +82,15 @@
* magic: 8 bytes long
* version: 4 bytes int
* head length: 4 bytes int
- * index type: var bytes utf (length + bytes)
- * body info size: 4 bytes int (how many column items below)
- * column name: var bytes utf
+ * column number: 4 bytes int
+ * column x: var bytes utf (length + bytes)
+ * index number: 4 bytes int (how many column items below)
+ * index name x: var bytes utf
* start pos: 4 bytes int
* length: 4 bytes int
* redundant length: 4 bytes int (for compatibility with later versions, in this version, content is zero)
* redundant bytes: var bytes (for compatibility with later version, in this version, is empty)
- * BODY: column bytes + column bytes + column bytes + .......
+ * BODY: column index bytes + column index bytes + column index bytes + .......
*
*
*/
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
new file mode 100644
index 000000000000..e8751245ca1a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Options of file index column. */
+public class FileIndexOptions {
+
+ // if the filter size greater than fileIndexInManifestThreshold, we put it in file
+ private final long fileIndexInManifestThreshold;
+
+ private final Map> indexTypeOptions;
+
+ public FileIndexOptions() {
+ this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
+ }
+
+ public FileIndexOptions(long fileIndexInManifestThreshold) {
+ this.indexTypeOptions = new HashMap<>();
+ this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
+ }
+
+ public void computeIfAbsent(String column, String indexType) {
+ indexTypeOptions
+ .computeIfAbsent(column, c -> new HashMap<>())
+ .computeIfAbsent(indexType, i -> new Options());
+ }
+
+ public Options get(String column, String indexType) {
+ return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
+ .map(x -> x.get(indexType))
+ .orElse(null);
+ }
+
+ public boolean isEmpty() {
+ return indexTypeOptions.isEmpty();
+ }
+
+ public long fileIndexInManifestThreshold() {
+ return fileIndexInManifestThreshold;
+ }
+
+ public Set>> entrySet() {
+ return indexTypeOptions.entrySet();
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
index c0486d535895..37ba4d205feb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java
@@ -25,11 +25,10 @@
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
+import org.apache.paimon.utils.BloomFilter64.BitSet;
import org.apache.hadoop.util.bloom.HashFunction;
-import java.util.BitSet;
-
/**
* Bloom filter for file index.
*
@@ -40,7 +39,7 @@
*/
public class BloomFilterFileIndex implements FileIndexer {
- public static final String BLOOM_FILTER = "bloom";
+ public static final String BLOOM_FILTER = "bloom-filter";
private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
@@ -84,19 +83,21 @@ public Writer(DataType type, int items, double fpp) {
@Override
public void write(Object key) {
- filter.addHash(hashFunction.hash(key));
+ if (key != null) {
+ filter.addHash(hashFunction.hash(key));
+ }
}
@Override
public byte[] serializedBytes() {
int numHashFunctions = filter.getNumHashFunctions();
- byte[] bytes = filter.getBitSet().toByteArray();
- byte[] serialized = new byte[bytes.length + Integer.BYTES];
+ byte[] serialized = new byte[filter.getBitSet().bitSize() / Byte.SIZE + Integer.BYTES];
+ // little endian
serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
serialized[3] = (byte) (numHashFunctions & 0xFF);
- System.arraycopy(bytes, 0, serialized, 4, bytes.length);
+ filter.getBitSet().toByteArray(serialized, 4, serialized.length - 4);
return serialized;
}
}
@@ -107,21 +108,20 @@ private static class Reader implements FileIndexReader {
private final FastHash hashFunction;
public Reader(DataType type, byte[] serializedBytes) {
+ // little endian
int numHashFunctions =
((serializedBytes[0] << 24)
+ (serializedBytes[1] << 16)
+ (serializedBytes[2] << 8)
+ serializedBytes[3]);
- byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
- System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
- BitSet bitSet = BitSet.valueOf(bytes);
+ BitSet bitSet = new BitSet(serializedBytes, 4);
this.filter = new BloomFilter64(numHashFunctions, bitSet);
this.hashFunction = FastHash.getHashFunction(type);
}
@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
- return filter.testHash(hashFunction.hash(key));
+ return key == null || filter.testHash(hashFunction.hash(key));
}
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
index 75b8661a2eba..5a4be5a45f05 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java
@@ -18,8 +18,6 @@
package org.apache.paimon.utils;
-import java.util.BitSet;
-
/** Bloom filter 64 handle 64 bits hash. */
public final class BloomFilter64 {
@@ -29,15 +27,15 @@ public final class BloomFilter64 {
public BloomFilter64(long items, double fpp) {
int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
- this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+ this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE));
this.numHashFunctions =
Math.max(1, (int) Math.round((double) numBits / items * Math.log(2)));
- this.bitSet = new BitSet(numBits);
+ this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0);
}
public BloomFilter64(int numHashFunctions, BitSet bitSet) {
this.numHashFunctions = numHashFunctions;
- this.numBits = bitSet.size();
+ this.numBits = bitSet.bitSize();
this.bitSet = bitSet;
}
@@ -81,4 +79,38 @@ public int getNumHashFunctions() {
public BitSet getBitSet() {
return bitSet;
}
+
+ /** Bit set used for bloom filter 64. */
+ public static class BitSet {
+
+ private static final byte MAST = 0x07;
+
+ private final byte[] data;
+ private final int offset;
+
+ public BitSet(byte[] data, int offset) {
+ assert data.length > 0 : "data length is zero!";
+ assert offset >= 0 : "offset is negative!";
+ this.data = data;
+ this.offset = offset;
+ }
+
+ public void set(int index) {
+ data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index & MAST));
+ }
+
+ public boolean get(int index) {
+ return (data[(index >>> 3) + offset] & ((byte) 1 << (index & MAST))) != 0;
+ }
+
+ public int bitSize() {
+ return (data.length - offset) * Byte.SIZE;
+ }
+
+ public void toByteArray(byte[] bytes, int offset, int length) {
+ if (length >= 0) {
+ System.arraycopy(data, this.offset, bytes, offset, length);
+ }
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 186fd479922f..99efac540e80 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -86,7 +86,8 @@ public RawFileSplitRead newRead() {
schema,
rowType,
FileFormatDiscover.of(options),
- pathFactory());
+ pathFactory(),
+ options.fileIndexReadEnabled());
}
@Override
@@ -145,7 +146,8 @@ public void pushdown(Predicate predicate) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
- branchName);
+ branchName,
+ options.fileIndexReadEnabled());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 354cc6dda195..aeb30731dfcc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -138,7 +138,8 @@ public RawFileSplitRead newBatchRawFileRead() {
schema,
valueType,
FileFormatDiscover.of(options),
- pathFactory());
+ pathFactory(),
+ options.fileIndexReadEnabled());
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9d9f418979a1..0ae21e6b28ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
@@ -78,6 +79,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
+ private final FileIndexOptions fileIndexOptions;
private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
@@ -100,7 +102,8 @@ public AppendOnlyWriter(
String fileCompression,
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ FileIndexOptions fileIndexOptions) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -120,6 +123,7 @@ public AppendOnlyWriter(
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;
+ this.fileIndexOptions = fileIndexOptions;
this.sinkWriter =
useWriteBuffer
@@ -246,7 +250,8 @@ private RowDataRollingFileWriter createRollingRowWriter() {
pathFactory,
seqNumCounter,
fileCompression,
- statsCollectors);
+ statsCollectors,
+ fileIndexOptions);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 30dacd130155..fdf7266bd51e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -83,6 +83,9 @@ public class DataFileMeta {
// We have to keep the compatibility.
private final @Nullable Long deleteRowCount;
+ // file index filter bytes, if it is small, store in data file meta
+ private final @Nullable byte[] embeddedIndex;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -91,6 +94,28 @@ public static DataFileMeta forAppend(
long minSequenceNumber,
long maxSequenceNumber,
long schemaId) {
+ return forAppend(
+ fileName,
+ fileSize,
+ rowCount,
+ rowStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ Collections.emptyList(),
+ null);
+ }
+
+ public static DataFileMeta forAppend(
+ String fileName,
+ long fileSize,
+ long rowCount,
+ BinaryTableStats rowStats,
+ long minSequenceNumber,
+ long maxSequenceNumber,
+ long schemaId,
+ List extraFiles,
+ @Nullable byte[] embeddedIndex) {
return new DataFileMeta(
fileName,
fileSize,
@@ -103,7 +128,10 @@ public static DataFileMeta forAppend(
maxSequenceNumber,
schemaId,
DUMMY_LEVEL,
- 0L);
+ extraFiles,
+ Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+ 0L,
+ embeddedIndex);
}
public DataFileMeta(
@@ -118,7 +146,8 @@ public DataFileMeta(
long maxSequenceNumber,
long schemaId,
int level,
- @Nullable Long deleteRowCount) {
+ @Nullable Long deleteRowCount,
+ @Nullable byte[] embeddedIndex) {
this(
fileName,
fileSize,
@@ -133,7 +162,8 @@ public DataFileMeta(
level,
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
public DataFileMeta(
@@ -150,12 +180,14 @@ public DataFileMeta(
int level,
List extraFiles,
Timestamp creationTime,
- @Nullable Long deleteRowCount) {
+ @Nullable Long deleteRowCount,
+ @Nullable byte[] embeddedIndex) {
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
+ this.embeddedIndex = embeddedIndex;
this.minKey = minKey;
this.maxKey = maxKey;
this.keyStats = keyStats;
@@ -191,6 +223,10 @@ public Optional deleteRowCount() {
return Optional.ofNullable(deleteRowCount);
}
+ public byte[] embeddedIndex() {
+ return embeddedIndex;
+ }
+
public BinaryRow minKey() {
return minKey;
}
@@ -276,7 +312,8 @@ public DataFileMeta upgrade(int newLevel) {
newLevel,
extraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
public List collectFiles(DataFilePathFactory pathFactory) {
@@ -301,7 +338,8 @@ public DataFileMeta copy(List newExtraFiles) {
level,
newExtraFiles,
creationTime,
- deleteRowCount);
+ deleteRowCount,
+ embeddedIndex);
}
@Override
@@ -316,6 +354,7 @@ public boolean equals(Object o) {
return Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
&& rowCount == that.rowCount
+ && Objects.equals(embeddedIndex, that.embeddedIndex)
&& Objects.equals(minKey, that.minKey)
&& Objects.equals(maxKey, that.maxKey)
&& Objects.equals(keyStats, that.keyStats)
@@ -335,6 +374,7 @@ public int hashCode() {
fileName,
fileSize,
rowCount,
+ embeddedIndex,
minKey,
maxKey,
keyStats,
@@ -358,6 +398,7 @@ public String toString() {
fileName,
fileSize,
rowCount,
+ embeddedIndex,
minKey,
maxKey,
keyStats,
@@ -387,6 +428,7 @@ public static RowType schema() {
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)));
+ fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)));
return new RowType(fields);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index f91e3d293361..3de1fcac9f4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -54,7 +54,8 @@ public InternalRow toRow(DataFileMeta meta) {
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
- meta.deleteRowCount().orElse(null));
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex());
}
@Override
@@ -73,6 +74,7 @@ public DataFileMeta fromRow(InternalRow row) {
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
- row.isNullAt(13) ? null : row.getLong(13));
+ row.isNullAt(13) ? null : row.getLong(13),
+ row.isNullAt(14) ? null : row.getBinary(14));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 385609de3f84..ef83e8598790 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -34,6 +34,8 @@ public class DataFilePathFactory {
public static final String CHANGELOG_FILE_PREFIX = "changelog-";
+ public static final String INDEX_PATH_SUFFIX = ".index";
+
private final Path parent;
private final String uuid;
@@ -70,6 +72,10 @@ public String uuid() {
return uuid;
}
+ public static Path toFileIndexPath(Path filePath) {
+ return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX);
+ }
+
public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
new file mode 100644
index 000000000000..0a91d4f7f530
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** File index reader, do the filter in the constructor. */
+public class FileIndexRecordReader implements RecordReader {
+
+ private final RecordReader reader;
+
+ public FileIndexRecordReader(
+ FileIO fileIO,
+ TableSchema dataSchema,
+ List dataFilter,
+ DataFilePathFactory dataFilePathFactory,
+ DataFileMeta file,
+ ConcatRecordReader.ReaderSupplier readerSupplier)
+ throws IOException {
+ boolean filterThisFile = false;
+ if (dataFilter != null && !dataFilter.isEmpty()) {
+ List indexFiles =
+ file.extraFiles().stream()
+ .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ if (!indexFiles.isEmpty()) {
+ if (indexFiles.size() > 1) {
+ throw new RuntimeException(
+ "Found more than one index file for one data file: "
+ + String.join(" and ", indexFiles));
+ }
+ // go to file index check
+ try (FileIndexPredicate predicate =
+ new FileIndexPredicate(
+ dataFilePathFactory.toPath(indexFiles.get(0)),
+ fileIO,
+ dataSchema.logicalRowType())) {
+ if (!predicate.testPredicate(
+ PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) {
+ filterThisFile = true;
+ }
+ }
+ }
+ }
+
+ this.reader = filterThisFile ? null : readerSupplier.get();
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ return reader == null ? null : reader.readBatch();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
new file mode 100644
index 000000000000..6ba635c5ed39
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Index file writer. */
+public final class FileIndexWriter implements Closeable {
+
+ public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null);
+
+ private final FileIO fileIO;
+
+ private final Path path;
+
+ // if the filter size greater than fileIndexInManifestThreshold, we put it in file
+ private final long inManifestThreshold;
+
+ private final List indexMaintainers = new ArrayList<>();
+
+ private String resultFileName;
+
+ private byte[] embeddedIndexBytes;
+
+ public FileIndexWriter(
+ FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
+ this.fileIO = fileIO;
+ this.path = path;
+ List fields = rowType.getFields();
+ Map map = new HashMap<>();
+ Map index = new HashMap<>();
+ fields.forEach(
+ dataField -> {
+ map.put(dataField.name(), dataField);
+ index.put(dataField.name(), rowType.getFieldIndex(dataField.name()));
+ });
+ for (Map.Entry> entry : fileIndexOptions.entrySet()) {
+ String columnName = entry.getKey();
+ DataField field = map.get(columnName);
+ if (field == null) {
+ throw new IllegalArgumentException(columnName + " does not exist in column fields");
+ }
+ for (Map.Entry typeEntry : entry.getValue().entrySet()) {
+ String indexType = typeEntry.getKey();
+ indexMaintainers.add(
+ new IndexMaintainer(
+ columnName,
+ indexType,
+ FileIndexer.create(indexType, field.type(), typeEntry.getValue())
+ .createWriter(),
+ InternalRow.createFieldGetter(
+ field.type(), index.get(columnName))));
+ }
+ }
+ this.inManifestThreshold = fileIndexOptions.fileIndexInManifestThreshold();
+ }
+
+ public void write(InternalRow row) {
+ indexMaintainers.forEach(indexMaintainer -> indexMaintainer.write(row));
+ }
+
+ @Override
+ public void close() throws IOException {
+ Map> indexMaps = new HashMap<>();
+
+ for (IndexMaintainer indexMaintainer : indexMaintainers) {
+ indexMaps
+ .computeIfAbsent(indexMaintainer.getColumnName(), k -> new HashMap<>())
+ .put(indexMaintainer.getIndexType(), indexMaintainer.serializedBytes());
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) {
+ writer.writeColumnIndexes(indexMaps);
+ }
+
+ if (baos.size() > inManifestThreshold) {
+ try (OutputStream outputStream = fileIO.newOutputStream(path, false)) {
+ outputStream.write(baos.toByteArray());
+ }
+ resultFileName = path.getName();
+ } else {
+ embeddedIndexBytes = baos.toByteArray();
+ }
+ }
+
+ public FileIndexResult result() {
+ return FileIndexResult.of(embeddedIndexBytes, resultFileName);
+ }
+
+ @Nullable
+ public static FileIndexWriter create(
+ FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) {
+ return fileIndexOptions.isEmpty()
+ ? null
+ : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions);
+ }
+
+ /** File index result. */
+ public interface FileIndexResult {
+
+ @Nullable
+ byte[] embeddedIndexBytes();
+
+ @Nullable
+ String independentIndexFile();
+
+ static FileIndexResult of(byte[] embeddedIndexBytes, String resultFileName) {
+ return new FileIndexResult() {
+
+ @Override
+ public byte[] embeddedIndexBytes() {
+ return embeddedIndexBytes;
+ }
+
+ @Override
+ public String independentIndexFile() {
+ return resultFileName;
+ }
+ };
+ }
+ }
+
+ /** One index maintainer for one column. */
+ private static class IndexMaintainer {
+
+ private final String columnName;
+ private final String indexType;
+ private final org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter;
+ private final InternalRow.FieldGetter getter;
+
+ public IndexMaintainer(
+ String columnName,
+ String indexType,
+ org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter,
+ InternalRow.FieldGetter getter) {
+ this.columnName = columnName;
+ this.indexType = indexType;
+ this.fileIndexWriter = fileIndexWriter;
+ this.getter = getter;
+ }
+
+ public void write(InternalRow row) {
+ fileIndexWriter.write(getter.getFieldOrNull(row));
+ }
+
+ public String getIndexType() {
+ return indexType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public byte[] serializedBytes() {
+ return fileIndexWriter.serializedBytes();
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
index 0eac2961a912..1e12025ba533 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
@@ -41,24 +41,6 @@ public class FileRecordReader implements RecordReader {
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
- public FileRecordReader(
- FormatReaderFactory readerFactory,
- FormatReaderFactory.Context context,
- @Nullable int[] indexMapping,
- @Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
- throws IOException {
- try {
- this.reader = readerFactory.createReader(context);
- } catch (Exception e) {
- FileUtils.checkExists(context.fileIO(), context.filePath());
- throw e;
- }
- this.indexMapping = indexMapping;
- this.partitionInfo = partitionInfo;
- this.castMapping = castMapping;
- }
-
@Nullable
@Override
public RecordReader.RecordIterator readBatch() throws IOException {
@@ -89,6 +71,24 @@ public RecordReader.RecordIterator readBatch() throws IOException {
return iterator;
}
+ public FileRecordReader(
+ FormatReaderFactory readerFactory,
+ FormatReaderFactory.Context context,
+ @Nullable int[] indexMapping,
+ @Nullable CastFieldGetter[] castMapping,
+ @Nullable PartitionInfo partitionInfo)
+ throws IOException {
+ try {
+ this.reader = readerFactory.createReader(context);
+ } catch (Exception e) {
+ FileUtils.checkExists(context.fileIO(), context.filePath());
+ throw e;
+ }
+ this.indexMapping = indexMapping;
+ this.partitionInfo = partitionInfo;
+ this.castMapping = castMapping;
+ }
+
@Override
public void close() throws IOException {
reader.close();
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index e2e5441f680e..0f1223ccac45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -168,6 +168,8 @@ public DataFileMeta result() throws IOException {
maxSeqNumber,
schemaId,
level,
- deleteRecordCount);
+ deleteRecordCount,
+ // TODO: enable file filter for primary key table (e.g. deletion table).
+ null);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 3b085c645091..ce441b279cc7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
@@ -32,8 +33,11 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
import java.util.function.Function;
+import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath;
+
/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing {@link InternalRow}.
* Also produces {@link DataFileMeta} after writing a file.
@@ -43,6 +47,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter
new RowDataFileWriter(
@@ -54,7 +56,8 @@ public RowDataRollingFileWriter(
schemaId,
seqNumCounter,
fileCompression,
- statsCollectors),
+ statsCollectors,
+ fileIndexOptions),
targetFileSize);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 866c87d75f66..baa2e9f4ab4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
@@ -31,15 +32,25 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
private final FieldStatsConverters fieldStatsConverters;
+ private final boolean fileIndexReadEnabled;
+
private Predicate filter;
+ // just cache.
+ private final Map dataFilterMapping = new HashMap<>();
+
public AppendOnlyFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
@@ -51,7 +62,8 @@ public AppendOnlyFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
- String branchName) {
+ String branchName,
+ boolean fileIndexReadEnabled) {
super(
partitionType,
bucketFilter,
@@ -66,6 +78,7 @@ public AppendOnlyFileStoreScan(
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
+ this.fileIndexReadEnabled = fileIndexReadEnabled;
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -84,11 +97,13 @@ protected boolean filterByStats(ManifestEntry entry) {
FieldStatsArraySerializer serializer =
fieldStatsConverters.getOrCreate(entry.file().schemaId());
BinaryTableStats stats = entry.file().valueStats();
+
return filter.test(
- entry.file().rowCount(),
- serializer.evolution(stats.minValues()),
- serializer.evolution(stats.maxValues()),
- serializer.evolution(stats.nullCounts(), entry.file().rowCount()));
+ entry.file().rowCount(),
+ serializer.evolution(stats.minValues()),
+ serializer.evolution(stats.maxValues()),
+ serializer.evolution(stats.nullCounts(), entry.file().rowCount()))
+ && (!fileIndexReadEnabled || testFileIndex(entry.file().embeddedIndex(), entry));
}
@Override
@@ -96,4 +111,24 @@ protected List filterWholeBucketByStats(List entri
// We don't need to filter per-bucket entries here
return entries;
}
+
+ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
+ if (embeddedIndexBytes == null) {
+ return true;
+ }
+
+ RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType();
+
+ Predicate dataPredicate =
+ dataFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id -> fieldStatsConverters.convertFilter(entry.file().schemaId(), filter));
+
+ try (FileIndexPredicate predicate =
+ new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
+ return predicate.testPredicate(dataPredicate);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception happens while checking predicate.", e);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index b3361b0df735..fc3f2a3d6d20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -27,6 +27,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
@@ -74,6 +75,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite
private final boolean spillable;
private final MemorySize maxDiskSize;
private final FieldStatsCollector.Factory[] statsCollectors;
+ private final FileIndexOptions fileIndexOptions;
private boolean forceBufferSpill = false;
private boolean skipCompaction;
@@ -109,6 +111,7 @@ public AppendOnlyFileStoreWrite(
this.maxDiskSize = options.writeBufferSpillDiskSize();
this.statsCollectors =
StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
+ this.fileIndexOptions = options.indexColumnsOptions();
}
@Override
@@ -155,7 +158,8 @@ protected RecordWriter createWriter(
fileCompression,
spillCompression,
statsCollectors,
- maxDiskSize);
+ maxDiskSize,
+ fileIndexOptions);
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -174,7 +178,8 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter(
pathFactory.createDataFilePathFactory(partition, bucket),
new LongCounter(toCompact.get(0).minSequenceNumber()),
fileCompression,
- statsCollectors);
+ statsCollectors,
+ fileIndexOptions);
try {
rewriter.write(bucketReader(partition, bucket).read(toCompact));
} finally {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index c801dcaa87d8..cf3b76e6736a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.casting.CastFieldGetter;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -25,9 +26,11 @@
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileIndexRecordReader;
import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
@@ -68,7 +71,8 @@ public class RawFileSplitRead implements SplitRead {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
- private final Map bulkFormatMappings;
+ private final Map bulkFormatMappings;
+ private final boolean fileIndexReadEnabled;
private int[][] projection;
@@ -80,13 +84,15 @@ public RawFileSplitRead(
TableSchema schema,
RowType rowType,
FileFormatDiscover formatDiscover,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ boolean fileIndexReadEnabled) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
+ this.fileIndexReadEnabled = fileIndexReadEnabled;
this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes();
}
@@ -121,7 +127,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio
for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
- BulkFormatMapping bulkFormatMapping =
+ RawFileBulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
this::createBulkFormatMapping);
@@ -140,7 +146,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio
return ConcatRecordReader.create(suppliers);
}
- private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
+ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) {
TableSchema tableSchema = schema;
TableSchema dataSchema =
key.schemaId == schema.id() ? schema : schemaManager.schema(key.schemaId);
@@ -180,37 +186,81 @@ private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
RowType projectedRowType =
Projection.of(dataProjection).project(dataSchema.logicalRowType());
- return new BulkFormatMapping(
+ return new RawFileBulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(key.format)
- .createReaderFactory(projectedRowType, dataFilters));
+ .createReaderFactory(projectedRowType, dataFilters),
+ dataSchema,
+ dataFilters);
}
private RecordReader createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
- BulkFormatMapping bulkFormatMapping,
+ RawFileBulkFormatMapping bulkFormatMapping,
DeletionVector.Factory dvFactory)
throws IOException {
- FileRecordReader fileRecordReader =
- new FileRecordReader(
- bulkFormatMapping.getReaderFactory(),
- new FormatReaderContext(
- fileIO,
- dataFilePathFactory.toPath(file.fileName()),
- file.fileSize()),
- bulkFormatMapping.getIndexMapping(),
- bulkFormatMapping.getCastMapping(),
- PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
-
- Optional deletionVector = dvFactory.create(file.fileName());
- if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
- return new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get());
+ ConcatRecordReader.ReaderSupplier supplier =
+ () -> {
+ FileRecordReader fileRecordReader =
+ new FileRecordReader(
+ bulkFormatMapping.getReaderFactory(),
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize()),
+ bulkFormatMapping.getIndexMapping(),
+ bulkFormatMapping.getCastMapping(),
+ PartitionUtils.create(
+ bulkFormatMapping.getPartitionPair(), partition));
+
+ Optional deletionVector = dvFactory.create(file.fileName());
+ if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
+ return new ApplyDeletionVectorReader<>(
+ fileRecordReader, deletionVector.get());
+ }
+ return fileRecordReader;
+ };
+
+ return fileIndexReadEnabled
+ ? new FileIndexRecordReader(
+ fileIO,
+ bulkFormatMapping.getDataSchema(),
+ bulkFormatMapping.getDataFilters(),
+ dataFilePathFactory,
+ file,
+ supplier)
+ : supplier.get();
+ }
+
+ /** Bulk format mapping with data schema and data filters. */
+ private static class RawFileBulkFormatMapping extends BulkFormatMapping {
+
+ private final TableSchema dataSchema;
+ private final List dataFilters;
+
+ public RawFileBulkFormatMapping(
+ int[] indexMapping,
+ CastFieldGetter[] castMapping,
+ Pair partitionPair,
+ FormatReaderFactory bulkFormat,
+ TableSchema dataSchema,
+ List dataFilters) {
+ super(indexMapping, castMapping, partitionPair, bulkFormat);
+ this.dataSchema = dataSchema;
+ this.dataFilters = dataFilters;
+ }
+
+ public TableSchema getDataSchema() {
+ return dataSchema;
+ }
+
+ public List getDataFilters() {
+ return dataFilters;
}
- return fileRecordReader;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
index 6733e1fea171..d3eb0e41697a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java
@@ -18,13 +18,17 @@
package org.apache.paimon.stats;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.IndexCastMapping;
+import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +75,17 @@ public FieldStatsArraySerializer getOrCreate(long dataSchemaId) {
});
}
+ public Predicate convertFilter(long dataSchemaId, Predicate filter) {
+ return tableSchemaId == dataSchemaId
+ ? filter
+ : Objects.requireNonNull(
+ SchemaEvolutionUtil.createDataFilters(
+ schemaFields.apply(tableSchemaId),
+ schemaFields.apply(dataSchemaId),
+ Collections.singletonList(filter)))
+ .get(0);
+ }
+
public List tableDataFields() {
return tableDataFields;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index e6209e2234f1..21b8825f7a86 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -186,6 +186,7 @@ private DataFileMeta newFile(long fileSize) {
0,
0,
0,
- 0L);
+ 0L,
+ null);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 565be9f90f2e..9211cfe60591 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -27,6 +27,7 @@
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
@@ -604,7 +605,8 @@ private Pair> createWriter(
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
- MemorySize.MAX_VALUE);
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions());
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index a96ee1429740..3eff9b7cdf33 100644
--- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -138,7 +138,8 @@ private static DataFileMeta newFile(long timeMillis) {
Instant.ofEpochMilli(timeMillis)
.atZone(ZoneId.systemDefault())
.toLocalDateTime()),
- 0L);
+ 0L,
+ null);
}
private Pair row(int pt, int col, int pk, int bucket) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index ffa290e0a53b..1a3503f7a789 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -24,6 +24,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
@@ -89,7 +90,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
- MemorySize.MAX_VALUE);
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions());
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
appendOnlyWriter.write(
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index aed01ca21857..3b4921c88026 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -161,7 +161,8 @@ private Data createDataFile(List kvs, int level, BinaryRow partition,
maxSequenceNumber,
0,
level,
- kvs.stream().filter(kv -> kv.valueKind().isRetract()).count()),
+ kvs.stream().filter(kv -> kv.valueKind().isRetract()).count(),
+ null),
kvs);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index b902ae967773..d5f6ceda32a9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -51,7 +51,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
Timestamp.fromEpochMillis(100),
- maxSeq - minSeq + 1);
+ maxSeq - minSeq + 1,
+ null);
}
public static DataFileMeta newFile() {
@@ -67,7 +68,8 @@ public static DataFileMeta newFile() {
0,
0,
0,
- 0L);
+ 0L,
+ null);
}
public static DataFileMeta newFile(
@@ -89,7 +91,8 @@ public static DataFileMeta newFile(
maxSequence,
0,
level,
- deleteRowCount);
+ deleteRowCount,
+ null);
}
public static BinaryRow row(int i) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index d91b8a9a0b94..1801c2dd0125 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -87,7 +88,8 @@ public void initialize(String identifier) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
new CoreOptions(new HashMap<>()),
- SCHEMA.getFieldNames())),
+ SCHEMA.getFieldNames()),
+ new FileIndexOptions()),
TARGET_FILE_SIZE);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index ad5fe2f800fd..473b333a6740 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -115,6 +115,7 @@ public static DataFileMeta newFile(int name, int level) {
1,
0,
level,
- 0L);
+ 0L,
+ null);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e066eeaf9ace..9120e7b6c900 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -82,7 +82,8 @@ protected ManifestEntry makeEntry(boolean isAdd, String fileName, Integer partit
0, // not used
Collections.emptyList(),
Timestamp.fromEpochMillis(200000),
- 0L // not used
+ 0L, // not used
+ null // not used
));
}
@@ -243,6 +244,7 @@ public static ManifestEntry makeEntry(
0, // not used
0, // not used
0, // not used
- 0L));
+ 0L,
+ null));
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index c424b6094277..4763585b00a4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,6 +69,18 @@ public void testLevel0WithSameSequenceNumbers() {
public static DataFileMeta newFile(int level) {
return new DataFileMeta(
- UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, null, 0, 1, 0, level, 0L);
+ UUID.randomUUID().toString(),
+ 0,
+ 1,
+ row(0),
+ row(0),
+ null,
+ null,
+ 0,
+ 1,
+ 0,
+ level,
+ 0L,
+ null);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index c4edd76e264e..89007a33a61c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -181,7 +181,8 @@ private DataFileMeta makeInterval(int left, int right) {
0,
Collections.emptyList(),
Timestamp.fromEpochMillis(100000),
- 0L);
+ 0L,
+ null);
}
private List