From b3eeea91e8e1a862b375732333768f823c3fb770 Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Mon, 15 Apr 2024 18:54:33 +0800 Subject: [PATCH] [core] Introduce file index read/write framework. (#3177) --- .../generated/core_configuration.html | 12 ++ .../java/org/apache/paimon/CoreOptions.java | 97 +++++++++ .../paimon/fileindex/FileIndexFormat.java | 17 +- .../paimon/fileindex/FileIndexOptions.java | 69 +++++++ .../bloomfilter/BloomFilterFileIndex.java | 22 +- .../apache/paimon/utils/BloomFilter64.java | 42 +++- .../apache/paimon/AppendOnlyFileStore.java | 6 +- .../org/apache/paimon/KeyValueFileStore.java | 3 +- .../paimon/append/AppendOnlyWriter.java | 9 +- .../org/apache/paimon/io/DataFileMeta.java | 54 ++++- .../paimon/io/DataFileMetaSerializer.java | 6 +- .../apache/paimon/io/DataFilePathFactory.java | 6 + .../paimon/io/FileIndexRecordReader.java | 90 ++++++++ .../org/apache/paimon/io/FileIndexWriter.java | 194 ++++++++++++++++++ .../apache/paimon/io/FileRecordReader.java | 36 ++-- .../paimon/io/KeyValueDataFileWriter.java | 4 +- .../apache/paimon/io/RowDataFileWriter.java | 31 ++- .../paimon/io/RowDataRollingFileWriter.java | 7 +- .../operation/AppendOnlyFileStoreScan.java | 45 +++- .../operation/AppendOnlyFileStoreWrite.java | 9 +- .../paimon/operation/RawFileSplitRead.java | 94 +++++++-- .../paimon/stats/FieldStatsConverters.java | 15 ++ ...endOnlyTableCompactionCoordinatorTest.java | 3 +- .../paimon/append/AppendOnlyWriterTest.java | 4 +- .../crosspartition/IndexBootstrapTest.java | 3 +- .../paimon/format/FileFormatSuffixTest.java | 4 +- .../paimon/io/DataFileTestDataGenerator.java | 3 +- .../apache/paimon/io/DataFileTestUtils.java | 9 +- .../paimon/io/RollingFileWriterTest.java | 4 +- .../ManifestCommittableSerializerTest.java | 3 +- .../manifest/ManifestFileMetaTestBase.java | 6 +- .../apache/paimon/mergetree/LevelsTest.java | 14 +- .../compact/IntervalPartitionTest.java | 3 +- .../compact/UniversalCompactionTest.java | 2 +- .../paimon/operation/ExpireSnapshotsTest.java | 3 +- .../table/AppendOnlyFileStoreTableTest.java | 149 ++++++++++++++ .../table/source/SplitGeneratorTest.java | 3 +- .../UnawareBucketAppendOnlyTableITCase.java | 12 +- .../CompactionTaskSimpleSerializerTest.java | 3 +- .../FileStoreSourceSplitGeneratorTest.java | 3 +- .../FileStoreSourceSplitSerializerTest.java | 3 +- .../source/TestChangelogDataReadWrite.java | 3 +- 42 files changed, 994 insertions(+), 111 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 8d39919d5dc4..4c42bbfbbb58 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -206,6 +206,18 @@ Boolean Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys. + +
file-index.in-manifest-threshold
+ 500 bytes + MemorySize + The threshold to store file index bytes in manifest. + + +
file-index.read.enabled
+ true + Boolean + Whether enabled read file index. +
file-reader-async-threshold
10 mb diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0d78d2ec85e7..b73133baef2f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation; import org.apache.paimon.annotation.Documentation.Immutable; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.lookup.LookupStrategy; @@ -70,6 +71,10 @@ public class CoreOptions implements Serializable { public static final String DISTINCT = "distinct"; + public static final String FILE_INDEX = "file-index"; + + public static final String COLUMNS = "columns"; + public static final ConfigOption BUCKET = key("bucket") .intType() @@ -135,6 +140,18 @@ public class CoreOptions implements Serializable { "Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by " + FILE_COMPRESSION_PER_LEVEL.key()); + public static final ConfigOption FILE_INDEX_IN_MANIFEST_THRESHOLD = + key("file-index.in-manifest-threshold") + .memoryType() + .defaultValue(MemorySize.parse("500 B")) + .withDescription("The threshold to store file index bytes in manifest."); + + public static final ConfigOption FILE_INDEX_READ_ENABLED = + key("file-index.read.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether enabled read file index."); + public static final ConfigOption MANIFEST_FORMAT = key("manifest.format") .enumType(FileFormatType.class) @@ -1703,6 +1720,86 @@ public boolean deletionVectorsEnabled() { return options.get(DELETION_VECTORS_ENABLED); } + public FileIndexOptions indexColumnsOptions() { + String fileIndexPrefix = FILE_INDEX + "."; + String fileIndexColumnSuffix = "." + COLUMNS; + + FileIndexOptions fileIndexOptions = new FileIndexOptions(fileIndexInManifestThreshold()); + for (Map.Entry entry : options.toMap().entrySet()) { + String key = entry.getKey(); + if (key.startsWith(fileIndexPrefix)) { + // start with file-index, decode this option + if (key.endsWith(fileIndexColumnSuffix)) { + // if end with .column, set up indexes + String indexType = + key.substring( + fileIndexPrefix.length(), + key.length() - fileIndexColumnSuffix.length()); + String[] names = entry.getValue().split(","); + for (String name : names) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException( + "Wrong option in " + key + ", should not have empty column"); + } + fileIndexOptions.computeIfAbsent(name.trim(), indexType); + } + } else { + // else, it must be an option + String[] kv = key.substring(fileIndexPrefix.length()).split("\\."); + if (kv.length != 3) { + continue; + } + String indexType = kv[0]; + String cname = kv[1]; + String opkey = kv[2]; + + if (fileIndexOptions.get(cname, indexType) == null) { + // if indexes have not set, find .column in options, then set them + String columns = + options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix); + if (columns == null) { + continue; + } + String[] names = columns.split(","); + boolean foundTarget = false; + for (String name : names) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException( + "Wrong option in " + + key + + ", should not have empty column"); + } + String tname = name.trim(); + if (cname.equals(tname)) { + foundTarget = true; + } + fileIndexOptions.computeIfAbsent(name.trim(), indexType); + } + if (!foundTarget) { + throw new IllegalArgumentException( + "Wrong option in " + + key + + ", can't found column " + + cname + + " in " + + columns); + } + } + fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue()); + } + } + } + return fileIndexOptions; + } + + public long fileIndexInManifestThreshold() { + return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes(); + } + + public boolean fileIndexReadEnabled() { + return options.get(FILE_INDEX_READ_ENABLED); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true), diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java index ec809dff4084..4f03faf9062f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFormat.java @@ -46,12 +46,12 @@ * File index file format. Put all column and offset in the header. * *
- * _______________________________________    _____________________
+ *   _____________________________________    _____________________
  * |     magic    |version|head length |
  * |-------------------------------------|
- * |              column size            |
+ * |            column number            |
  * |-------------------------------------|
- * |   column 1        |   index size   |
+ * |   column 1        | index number   |
  * |-------------------------------------|
  * |  index name 1 |start pos |length  |
  * |-------------------------------------|
@@ -59,7 +59,7 @@
  * |-------------------------------------|
  * |  index name 3 |start pos |length  |
  * |-------------------------------------|            HEAD
- * |   column 2        |   index size   |
+ * |   column 2        | index number   |
  * |-------------------------------------|
  * |  index name 1 |start pos |length  |
  * |-------------------------------------|
@@ -82,14 +82,15 @@
  * magic:                            8 bytes long
  * version:                          4 bytes int
  * head length:                      4 bytes int
- * index type:                       var bytes utf (length + bytes)
- * body info size:                   4 bytes int (how many column items below)
- * column name:                      var bytes utf
+ * column number:                    4 bytes int
+ * column x:                         var bytes utf (length + bytes)
+ * index number:                     4 bytes int (how many column items below)
+ * index name x:                     var bytes utf
  * start pos:                        4 bytes int
  * length:                           4 bytes int
  * redundant length:                 4 bytes int (for compatibility with later versions, in this version, content is zero)
  * redundant bytes:                  var bytes (for compatibility with later version, in this version, is empty)
- * BODY:                             column bytes + column bytes + column bytes + .......
+ * BODY:                             column index bytes + column index bytes + column index bytes + .......
  *
  * 
*/ diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java new file mode 100644 index 000000000000..e8751245ca1a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.options.Options; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** Options of file index column. */ +public class FileIndexOptions { + + // if the filter size greater than fileIndexInManifestThreshold, we put it in file + private final long fileIndexInManifestThreshold; + + private final Map> indexTypeOptions; + + public FileIndexOptions() { + this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes()); + } + + public FileIndexOptions(long fileIndexInManifestThreshold) { + this.indexTypeOptions = new HashMap<>(); + this.fileIndexInManifestThreshold = fileIndexInManifestThreshold; + } + + public void computeIfAbsent(String column, String indexType) { + indexTypeOptions + .computeIfAbsent(column, c -> new HashMap<>()) + .computeIfAbsent(indexType, i -> new Options()); + } + + public Options get(String column, String indexType) { + return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null)) + .map(x -> x.get(indexType)) + .orElse(null); + } + + public boolean isEmpty() { + return indexTypeOptions.isEmpty(); + } + + public long fileIndexInManifestThreshold() { + return fileIndexInManifestThreshold; + } + + public Set>> entrySet() { + return indexTypeOptions.entrySet(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java index c0486d535895..37ba4d205feb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java @@ -25,11 +25,10 @@ import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.types.DataType; import org.apache.paimon.utils.BloomFilter64; +import org.apache.paimon.utils.BloomFilter64.BitSet; import org.apache.hadoop.util.bloom.HashFunction; -import java.util.BitSet; - /** * Bloom filter for file index. * @@ -40,7 +39,7 @@ */ public class BloomFilterFileIndex implements FileIndexer { - public static final String BLOOM_FILTER = "bloom"; + public static final String BLOOM_FILTER = "bloom-filter"; private static final int DEFAULT_ITEMS = 1_000_000; private static final double DEFAULT_FPP = 0.1; @@ -84,19 +83,21 @@ public Writer(DataType type, int items, double fpp) { @Override public void write(Object key) { - filter.addHash(hashFunction.hash(key)); + if (key != null) { + filter.addHash(hashFunction.hash(key)); + } } @Override public byte[] serializedBytes() { int numHashFunctions = filter.getNumHashFunctions(); - byte[] bytes = filter.getBitSet().toByteArray(); - byte[] serialized = new byte[bytes.length + Integer.BYTES]; + byte[] serialized = new byte[filter.getBitSet().bitSize() / Byte.SIZE + Integer.BYTES]; + // little endian serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF); serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF); serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF); serialized[3] = (byte) (numHashFunctions & 0xFF); - System.arraycopy(bytes, 0, serialized, 4, bytes.length); + filter.getBitSet().toByteArray(serialized, 4, serialized.length - 4); return serialized; } } @@ -107,21 +108,20 @@ private static class Reader implements FileIndexReader { private final FastHash hashFunction; public Reader(DataType type, byte[] serializedBytes) { + // little endian int numHashFunctions = ((serializedBytes[0] << 24) + (serializedBytes[1] << 16) + (serializedBytes[2] << 8) + serializedBytes[3]); - byte[] bytes = new byte[serializedBytes.length - Integer.BYTES]; - System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length); - BitSet bitSet = BitSet.valueOf(bytes); + BitSet bitSet = new BitSet(serializedBytes, 4); this.filter = new BloomFilter64(numHashFunctions, bitSet); this.hashFunction = FastHash.getHashFunction(type); } @Override public Boolean visitEqual(FieldRef fieldRef, Object key) { - return filter.testHash(hashFunction.hash(key)); + return key == null || filter.testHash(hashFunction.hash(key)); } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java index 75b8661a2eba..5a4be5a45f05 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BloomFilter64.java @@ -18,8 +18,6 @@ package org.apache.paimon.utils; -import java.util.BitSet; - /** Bloom filter 64 handle 64 bits hash. */ public final class BloomFilter64 { @@ -29,15 +27,15 @@ public final class BloomFilter64 { public BloomFilter64(long items, double fpp) { int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2))); - this.numBits = nb + (Long.SIZE - (nb % Long.SIZE)); + this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE)); this.numHashFunctions = Math.max(1, (int) Math.round((double) numBits / items * Math.log(2))); - this.bitSet = new BitSet(numBits); + this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0); } public BloomFilter64(int numHashFunctions, BitSet bitSet) { this.numHashFunctions = numHashFunctions; - this.numBits = bitSet.size(); + this.numBits = bitSet.bitSize(); this.bitSet = bitSet; } @@ -81,4 +79,38 @@ public int getNumHashFunctions() { public BitSet getBitSet() { return bitSet; } + + /** Bit set used for bloom filter 64. */ + public static class BitSet { + + private static final byte MAST = 0x07; + + private final byte[] data; + private final int offset; + + public BitSet(byte[] data, int offset) { + assert data.length > 0 : "data length is zero!"; + assert offset >= 0 : "offset is negative!"; + this.data = data; + this.offset = offset; + } + + public void set(int index) { + data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index & MAST)); + } + + public boolean get(int index) { + return (data[(index >>> 3) + offset] & ((byte) 1 << (index & MAST))) != 0; + } + + public int bitSize() { + return (data.length - offset) * Byte.SIZE; + } + + public void toByteArray(byte[] bytes, int offset, int length) { + if (length >= 0) { + System.arraycopy(data, this.offset, bytes, offset, length); + } + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 186fd479922f..99efac540e80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -86,7 +86,8 @@ public RawFileSplitRead newRead() { schema, rowType, FileFormatDiscover.of(options), - pathFactory()); + pathFactory(), + options.fileIndexReadEnabled()); } @Override @@ -145,7 +146,8 @@ public void pushdown(Predicate predicate) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName); + branchName, + options.fileIndexReadEnabled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 354cc6dda195..aeb30731dfcc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -138,7 +138,8 @@ public RawFileSplitRead newBatchRawFileRead() { schema, valueType, FileFormatDiscover.of(options), - pathFactory()); + pathFactory(), + options.fileIndexReadEnabled()); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 9d9f418979a1..0ae21e6b28ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.CompactIncrement; @@ -78,6 +79,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private SinkWriter sinkWriter; private final FieldStatsCollector.Factory[] statsCollectors; private final IOManager ioManager; + private final FileIndexOptions fileIndexOptions; private MemorySegmentPool memorySegmentPool; private MemorySize maxDiskSize; @@ -100,7 +102,8 @@ public AppendOnlyWriter( String fileCompression, String spillCompression, FieldStatsCollector.Factory[] statsCollectors, - MemorySize maxDiskSize) { + MemorySize maxDiskSize, + FileIndexOptions fileIndexOptions) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -120,6 +123,7 @@ public AppendOnlyWriter( this.ioManager = ioManager; this.statsCollectors = statsCollectors; this.maxDiskSize = maxDiskSize; + this.fileIndexOptions = fileIndexOptions; this.sinkWriter = useWriteBuffer @@ -246,7 +250,8 @@ private RowDataRollingFileWriter createRollingRowWriter() { pathFactory, seqNumCounter, fileCompression, - statsCollectors); + statsCollectors, + fileIndexOptions); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 30dacd130155..fdf7266bd51e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -83,6 +83,9 @@ public class DataFileMeta { // We have to keep the compatibility. private final @Nullable Long deleteRowCount; + // file index filter bytes, if it is small, store in data file meta + private final @Nullable byte[] embeddedIndex; + public static DataFileMeta forAppend( String fileName, long fileSize, @@ -91,6 +94,28 @@ public static DataFileMeta forAppend( long minSequenceNumber, long maxSequenceNumber, long schemaId) { + return forAppend( + fileName, + fileSize, + rowCount, + rowStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + Collections.emptyList(), + null); + } + + public static DataFileMeta forAppend( + String fileName, + long fileSize, + long rowCount, + BinaryTableStats rowStats, + long minSequenceNumber, + long maxSequenceNumber, + long schemaId, + List extraFiles, + @Nullable byte[] embeddedIndex) { return new DataFileMeta( fileName, fileSize, @@ -103,7 +128,10 @@ public static DataFileMeta forAppend( maxSequenceNumber, schemaId, DUMMY_LEVEL, - 0L); + extraFiles, + Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), + 0L, + embeddedIndex); } public DataFileMeta( @@ -118,7 +146,8 @@ public DataFileMeta( long maxSequenceNumber, long schemaId, int level, - @Nullable Long deleteRowCount) { + @Nullable Long deleteRowCount, + @Nullable byte[] embeddedIndex) { this( fileName, fileSize, @@ -133,7 +162,8 @@ public DataFileMeta( level, Collections.emptyList(), Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), - deleteRowCount); + deleteRowCount, + embeddedIndex); } public DataFileMeta( @@ -150,12 +180,14 @@ public DataFileMeta( int level, List extraFiles, Timestamp creationTime, - @Nullable Long deleteRowCount) { + @Nullable Long deleteRowCount, + @Nullable byte[] embeddedIndex) { this.fileName = fileName; this.fileSize = fileSize; this.rowCount = rowCount; + this.embeddedIndex = embeddedIndex; this.minKey = minKey; this.maxKey = maxKey; this.keyStats = keyStats; @@ -191,6 +223,10 @@ public Optional deleteRowCount() { return Optional.ofNullable(deleteRowCount); } + public byte[] embeddedIndex() { + return embeddedIndex; + } + public BinaryRow minKey() { return minKey; } @@ -276,7 +312,8 @@ public DataFileMeta upgrade(int newLevel) { newLevel, extraFiles, creationTime, - deleteRowCount); + deleteRowCount, + embeddedIndex); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -301,7 +338,8 @@ public DataFileMeta copy(List newExtraFiles) { level, newExtraFiles, creationTime, - deleteRowCount); + deleteRowCount, + embeddedIndex); } @Override @@ -316,6 +354,7 @@ public boolean equals(Object o) { return Objects.equals(fileName, that.fileName) && fileSize == that.fileSize && rowCount == that.rowCount + && Objects.equals(embeddedIndex, that.embeddedIndex) && Objects.equals(minKey, that.minKey) && Objects.equals(maxKey, that.maxKey) && Objects.equals(keyStats, that.keyStats) @@ -335,6 +374,7 @@ public int hashCode() { fileName, fileSize, rowCount, + embeddedIndex, minKey, maxKey, keyStats, @@ -358,6 +398,7 @@ public String toString() { fileName, fileSize, rowCount, + embeddedIndex, minKey, maxKey, keyStats, @@ -387,6 +428,7 @@ public static RowType schema() { fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false)))); fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS())); fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true))); + fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true))); return new RowType(fields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index f91e3d293361..3de1fcac9f4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -54,7 +54,8 @@ public InternalRow toRow(DataFileMeta meta) { meta.level(), toStringArrayData(meta.extraFiles()), meta.creationTime(), - meta.deleteRowCount().orElse(null)); + meta.deleteRowCount().orElse(null), + meta.embeddedIndex()); } @Override @@ -73,6 +74,7 @@ public DataFileMeta fromRow(InternalRow row) { row.getInt(10), fromStringArrayData(row.getArray(11)), row.getTimestamp(12, 3), - row.isNullAt(13) ? null : row.getLong(13)); + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 385609de3f84..ef83e8598790 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -34,6 +34,8 @@ public class DataFilePathFactory { public static final String CHANGELOG_FILE_PREFIX = "changelog-"; + public static final String INDEX_PATH_SUFFIX = ".index"; + private final Path parent; private final String uuid; @@ -70,6 +72,10 @@ public String uuid() { return uuid; } + public static Path toFileIndexPath(Path filePath) { + return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX); + } + public static String formatIdentifier(String fileName) { int index = fileName.lastIndexOf('.'); if (index == -1) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java new file mode 100644 index 000000000000..0a91d4f7f530 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexPredicate; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** File index reader, do the filter in the constructor. */ +public class FileIndexRecordReader implements RecordReader { + + private final RecordReader reader; + + public FileIndexRecordReader( + FileIO fileIO, + TableSchema dataSchema, + List dataFilter, + DataFilePathFactory dataFilePathFactory, + DataFileMeta file, + ConcatRecordReader.ReaderSupplier readerSupplier) + throws IOException { + boolean filterThisFile = false; + if (dataFilter != null && !dataFilter.isEmpty()) { + List indexFiles = + file.extraFiles().stream() + .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + if (!indexFiles.isEmpty()) { + if (indexFiles.size() > 1) { + throw new RuntimeException( + "Found more than one index file for one data file: " + + String.join(" and ", indexFiles)); + } + // go to file index check + try (FileIndexPredicate predicate = + new FileIndexPredicate( + dataFilePathFactory.toPath(indexFiles.get(0)), + fileIO, + dataSchema.logicalRowType())) { + if (!predicate.testPredicate( + PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) { + filterThisFile = true; + } + } + } + } + + this.reader = filterThisFile ? null : readerSupplier.get(); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + return reader == null ? null : reader.readBatch(); + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java new file mode 100644 index 000000000000..6ba635c5ed39 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexFormat; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.fileindex.FileIndexer; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Index file writer. */ +public final class FileIndexWriter implements Closeable { + + public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null); + + private final FileIO fileIO; + + private final Path path; + + // if the filter size greater than fileIndexInManifestThreshold, we put it in file + private final long inManifestThreshold; + + private final List indexMaintainers = new ArrayList<>(); + + private String resultFileName; + + private byte[] embeddedIndexBytes; + + public FileIndexWriter( + FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) { + this.fileIO = fileIO; + this.path = path; + List fields = rowType.getFields(); + Map map = new HashMap<>(); + Map index = new HashMap<>(); + fields.forEach( + dataField -> { + map.put(dataField.name(), dataField); + index.put(dataField.name(), rowType.getFieldIndex(dataField.name())); + }); + for (Map.Entry> entry : fileIndexOptions.entrySet()) { + String columnName = entry.getKey(); + DataField field = map.get(columnName); + if (field == null) { + throw new IllegalArgumentException(columnName + " does not exist in column fields"); + } + for (Map.Entry typeEntry : entry.getValue().entrySet()) { + String indexType = typeEntry.getKey(); + indexMaintainers.add( + new IndexMaintainer( + columnName, + indexType, + FileIndexer.create(indexType, field.type(), typeEntry.getValue()) + .createWriter(), + InternalRow.createFieldGetter( + field.type(), index.get(columnName)))); + } + } + this.inManifestThreshold = fileIndexOptions.fileIndexInManifestThreshold(); + } + + public void write(InternalRow row) { + indexMaintainers.forEach(indexMaintainer -> indexMaintainer.write(row)); + } + + @Override + public void close() throws IOException { + Map> indexMaps = new HashMap<>(); + + for (IndexMaintainer indexMaintainer : indexMaintainers) { + indexMaps + .computeIfAbsent(indexMaintainer.getColumnName(), k -> new HashMap<>()) + .put(indexMaintainer.getIndexType(), indexMaintainer.serializedBytes()); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) { + writer.writeColumnIndexes(indexMaps); + } + + if (baos.size() > inManifestThreshold) { + try (OutputStream outputStream = fileIO.newOutputStream(path, false)) { + outputStream.write(baos.toByteArray()); + } + resultFileName = path.getName(); + } else { + embeddedIndexBytes = baos.toByteArray(); + } + } + + public FileIndexResult result() { + return FileIndexResult.of(embeddedIndexBytes, resultFileName); + } + + @Nullable + public static FileIndexWriter create( + FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) { + return fileIndexOptions.isEmpty() + ? null + : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions); + } + + /** File index result. */ + public interface FileIndexResult { + + @Nullable + byte[] embeddedIndexBytes(); + + @Nullable + String independentIndexFile(); + + static FileIndexResult of(byte[] embeddedIndexBytes, String resultFileName) { + return new FileIndexResult() { + + @Override + public byte[] embeddedIndexBytes() { + return embeddedIndexBytes; + } + + @Override + public String independentIndexFile() { + return resultFileName; + } + }; + } + } + + /** One index maintainer for one column. */ + private static class IndexMaintainer { + + private final String columnName; + private final String indexType; + private final org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter; + private final InternalRow.FieldGetter getter; + + public IndexMaintainer( + String columnName, + String indexType, + org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter, + InternalRow.FieldGetter getter) { + this.columnName = columnName; + this.indexType = indexType; + this.fileIndexWriter = fileIndexWriter; + this.getter = getter; + } + + public void write(InternalRow row) { + fileIndexWriter.write(getter.getFieldOrNull(row)); + } + + public String getIndexType() { + return indexType; + } + + public String getColumnName() { + return columnName; + } + + public byte[] serializedBytes() { + return fileIndexWriter.serializedBytes(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java index 0eac2961a912..1e12025ba533 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java @@ -41,24 +41,6 @@ public class FileRecordReader implements RecordReader { @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; - public FileRecordReader( - FormatReaderFactory readerFactory, - FormatReaderFactory.Context context, - @Nullable int[] indexMapping, - @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) - throws IOException { - try { - this.reader = readerFactory.createReader(context); - } catch (Exception e) { - FileUtils.checkExists(context.fileIO(), context.filePath()); - throw e; - } - this.indexMapping = indexMapping; - this.partitionInfo = partitionInfo; - this.castMapping = castMapping; - } - @Nullable @Override public RecordReader.RecordIterator readBatch() throws IOException { @@ -89,6 +71,24 @@ public RecordReader.RecordIterator readBatch() throws IOException { return iterator; } + public FileRecordReader( + FormatReaderFactory readerFactory, + FormatReaderFactory.Context context, + @Nullable int[] indexMapping, + @Nullable CastFieldGetter[] castMapping, + @Nullable PartitionInfo partitionInfo) + throws IOException { + try { + this.reader = readerFactory.createReader(context); + } catch (Exception e) { + FileUtils.checkExists(context.fileIO(), context.filePath()); + throw e; + } + this.indexMapping = indexMapping; + this.partitionInfo = partitionInfo; + this.castMapping = castMapping; + } + @Override public void close() throws IOException { reader.close(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index e2e5441f680e..0f1223ccac45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -168,6 +168,8 @@ public DataFileMeta result() throws IOException { maxSeqNumber, schemaId, level, - deleteRecordCount); + deleteRecordCount, + // TODO: enable file filter for primary key table (e.g. deletion table). + null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 3b085c645091..ce441b279cc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -19,6 +19,7 @@ package org.apache.paimon.io; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.TableStatsExtractor; import org.apache.paimon.fs.FileIO; @@ -32,8 +33,11 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collections; import java.util.function.Function; +import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath; + /** * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link InternalRow}. * Also produces {@link DataFileMeta} after writing a file. @@ -43,6 +47,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter new RowDataFileWriter( @@ -54,7 +56,8 @@ public RowDataRollingFileWriter( schemaId, seqNumCounter, fileCompression, - statsCollectors), + statsCollectors, + fileIndexOptions), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 866c87d75f66..baa2e9f4ab4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.AppendOnlyFileStore; +import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; @@ -31,15 +32,25 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { private final FieldStatsConverters fieldStatsConverters; + private final boolean fileIndexReadEnabled; + private Predicate filter; + // just cache. + private final Map dataFilterMapping = new HashMap<>(); + public AppendOnlyFileStoreScan( RowType partitionType, ScanBucketFilter bucketFilter, @@ -51,7 +62,8 @@ public AppendOnlyFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName) { + String branchName, + boolean fileIndexReadEnabled) { super( partitionType, bucketFilter, @@ -66,6 +78,7 @@ public AppendOnlyFileStoreScan( branchName); this.fieldStatsConverters = new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); + this.fileIndexReadEnabled = fileIndexReadEnabled; } public AppendOnlyFileStoreScan withFilter(Predicate predicate) { @@ -84,11 +97,13 @@ protected boolean filterByStats(ManifestEntry entry) { FieldStatsArraySerializer serializer = fieldStatsConverters.getOrCreate(entry.file().schemaId()); BinaryTableStats stats = entry.file().valueStats(); + return filter.test( - entry.file().rowCount(), - serializer.evolution(stats.minValues()), - serializer.evolution(stats.maxValues()), - serializer.evolution(stats.nullCounts(), entry.file().rowCount())); + entry.file().rowCount(), + serializer.evolution(stats.minValues()), + serializer.evolution(stats.maxValues()), + serializer.evolution(stats.nullCounts(), entry.file().rowCount())) + && (!fileIndexReadEnabled || testFileIndex(entry.file().embeddedIndex(), entry)); } @Override @@ -96,4 +111,24 @@ protected List filterWholeBucketByStats(List entri // We don't need to filter per-bucket entries here return entries; } + + private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) { + if (embeddedIndexBytes == null) { + return true; + } + + RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType(); + + Predicate dataPredicate = + dataFilterMapping.computeIfAbsent( + entry.file().schemaId(), + id -> fieldStatsConverters.convertFilter(entry.file().schemaId(), filter)); + + try (FileIndexPredicate predicate = + new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { + return predicate.testPredicate(dataPredicate); + } catch (IOException e) { + throw new RuntimeException("Exception happens while checking predicate.", e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index b3361b0df735..fc3f2a3d6d20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; @@ -74,6 +75,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite private final boolean spillable; private final MemorySize maxDiskSize; private final FieldStatsCollector.Factory[] statsCollectors; + private final FileIndexOptions fileIndexOptions; private boolean forceBufferSpill = false; private boolean skipCompaction; @@ -109,6 +111,7 @@ public AppendOnlyFileStoreWrite( this.maxDiskSize = options.writeBufferSpillDiskSize(); this.statsCollectors = StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames()); + this.fileIndexOptions = options.indexColumnsOptions(); } @Override @@ -155,7 +158,8 @@ protected RecordWriter createWriter( fileCompression, spillCompression, statsCollectors, - maxDiskSize); + maxDiskSize, + fileIndexOptions); } public AppendOnlyCompactManager.CompactRewriter compactRewriter( @@ -174,7 +178,8 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( pathFactory.createDataFilePathFactory(partition, bucket), new LongCounter(toCompact.get(0).minSequenceNumber()), fileCompression, - statsCollectors); + statsCollectors, + fileIndexOptions); try { rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index c801dcaa87d8..cf3b76e6736a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.casting.CastFieldGetter; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -25,9 +26,11 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.FileIndexRecordReader; import org.apache.paimon.io.FileRecordReader; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.partition.PartitionUtils; @@ -68,7 +71,8 @@ public class RawFileSplitRead implements SplitRead { private final TableSchema schema; private final FileFormatDiscover formatDiscover; private final FileStorePathFactory pathFactory; - private final Map bulkFormatMappings; + private final Map bulkFormatMappings; + private final boolean fileIndexReadEnabled; private int[][] projection; @@ -80,13 +84,15 @@ public RawFileSplitRead( TableSchema schema, RowType rowType, FileFormatDiscover formatDiscover, - FileStorePathFactory pathFactory) { + FileStorePathFactory pathFactory, + boolean fileIndexReadEnabled) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schema = schema; this.formatDiscover = formatDiscover; this.pathFactory = pathFactory; this.bulkFormatMappings = new HashMap<>(); + this.fileIndexReadEnabled = fileIndexReadEnabled; this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes(); } @@ -121,7 +127,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio for (DataFileMeta file : split.dataFiles()) { String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); - BulkFormatMapping bulkFormatMapping = + RawFileBulkFormatMapping bulkFormatMapping = bulkFormatMappings.computeIfAbsent( new FormatKey(file.schemaId(), formatIdentifier), this::createBulkFormatMapping); @@ -140,7 +146,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio return ConcatRecordReader.create(suppliers); } - private BulkFormatMapping createBulkFormatMapping(FormatKey key) { + private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) { TableSchema tableSchema = schema; TableSchema dataSchema = key.schemaId == schema.id() ? schema : schemaManager.schema(key.schemaId); @@ -180,37 +186,81 @@ private BulkFormatMapping createBulkFormatMapping(FormatKey key) { RowType projectedRowType = Projection.of(dataProjection).project(dataSchema.logicalRowType()); - return new BulkFormatMapping( + return new RawFileBulkFormatMapping( indexCastMapping.getIndexMapping(), indexCastMapping.getCastMapping(), partitionPair, formatDiscover .discover(key.format) - .createReaderFactory(projectedRowType, dataFilters)); + .createReaderFactory(projectedRowType, dataFilters), + dataSchema, + dataFilters); } private RecordReader createFileReader( BinaryRow partition, DataFileMeta file, DataFilePathFactory dataFilePathFactory, - BulkFormatMapping bulkFormatMapping, + RawFileBulkFormatMapping bulkFormatMapping, DeletionVector.Factory dvFactory) throws IOException { - FileRecordReader fileRecordReader = - new FileRecordReader( - bulkFormatMapping.getReaderFactory(), - new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file.fileName()), - file.fileSize()), - bulkFormatMapping.getIndexMapping(), - bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); - - Optional deletionVector = dvFactory.create(file.fileName()); - if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { - return new ApplyDeletionVectorReader<>(fileRecordReader, deletionVector.get()); + ConcatRecordReader.ReaderSupplier supplier = + () -> { + FileRecordReader fileRecordReader = + new FileRecordReader( + bulkFormatMapping.getReaderFactory(), + new FormatReaderContext( + fileIO, + dataFilePathFactory.toPath(file.fileName()), + file.fileSize()), + bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getCastMapping(), + PartitionUtils.create( + bulkFormatMapping.getPartitionPair(), partition)); + + Optional deletionVector = dvFactory.create(file.fileName()); + if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { + return new ApplyDeletionVectorReader<>( + fileRecordReader, deletionVector.get()); + } + return fileRecordReader; + }; + + return fileIndexReadEnabled + ? new FileIndexRecordReader( + fileIO, + bulkFormatMapping.getDataSchema(), + bulkFormatMapping.getDataFilters(), + dataFilePathFactory, + file, + supplier) + : supplier.get(); + } + + /** Bulk format mapping with data schema and data filters. */ + private static class RawFileBulkFormatMapping extends BulkFormatMapping { + + private final TableSchema dataSchema; + private final List dataFilters; + + public RawFileBulkFormatMapping( + int[] indexMapping, + CastFieldGetter[] castMapping, + Pair partitionPair, + FormatReaderFactory bulkFormat, + TableSchema dataSchema, + List dataFilters) { + super(indexMapping, castMapping, partitionPair, bulkFormat); + this.dataSchema = dataSchema; + this.dataFilters = dataFilters; + } + + public TableSchema getDataSchema() { + return dataSchema; + } + + public List getDataFilters() { + return dataFilters; } - return fileRecordReader; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java index 6733e1fea171..d3eb0e41697a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsConverters.java @@ -18,13 +18,17 @@ package org.apache.paimon.stats; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.IndexCastMapping; +import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; @@ -71,6 +75,17 @@ public FieldStatsArraySerializer getOrCreate(long dataSchemaId) { }); } + public Predicate convertFilter(long dataSchemaId, Predicate filter) { + return tableSchemaId == dataSchemaId + ? filter + : Objects.requireNonNull( + SchemaEvolutionUtil.createDataFilters( + schemaFields.apply(tableSchemaId), + schemaFields.apply(dataSchemaId), + Collections.singletonList(filter))) + .get(0); + } + public List tableDataFields() { return tableDataFields; } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java index e6209e2234f1..21b8825f7a86 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java @@ -186,6 +186,7 @@ private DataFileMeta newFile(long fileSize) { 0, 0, 0, - 0L); + 0L, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 565be9f90f2e..9211cfe60591 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.disk.ExternalBuffer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FieldStats; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; @@ -604,7 +605,8 @@ private Pair> createWriter( CoreOptions.SPILL_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), - MemorySize.MAX_VALUE); + MemorySize.MAX_VALUE, + new FileIndexOptions()); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index a96ee1429740..3eff9b7cdf33 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -138,7 +138,8 @@ private static DataFileMeta newFile(long timeMillis) { Instant.ofEpochMilli(timeMillis) .atZone(ZoneId.systemDefault()) .toLocalDateTime()), - 0L); + 0L, + null); } private Pair row(int pt, int col, int pk, int bucket) { diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index ffa290e0a53b..1a3503f7a789 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; @@ -89,7 +90,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception CoreOptions.SPILL_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( options, SCHEMA.getFieldNames()), - MemorySize.MAX_VALUE); + MemorySize.MAX_VALUE, + new FileIndexOptions()); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index aed01ca21857..3b4921c88026 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -161,7 +161,8 @@ private Data createDataFile(List kvs, int level, BinaryRow partition, maxSequenceNumber, 0, level, - kvs.stream().filter(kv -> kv.valueKind().isRetract()).count()), + kvs.stream().filter(kv -> kv.valueKind().isRetract()).count(), + null), kvs); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index b902ae967773..d5f6ceda32a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -51,7 +51,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { DataFileMeta.DUMMY_LEVEL, Collections.emptyList(), Timestamp.fromEpochMillis(100), - maxSeq - minSeq + 1); + maxSeq - minSeq + 1, + null); } public static DataFileMeta newFile() { @@ -67,7 +68,8 @@ public static DataFileMeta newFile() { 0, 0, 0, - 0L); + 0L, + null); } public static DataFileMeta newFile( @@ -89,7 +91,8 @@ public static DataFileMeta newFile( maxSequence, 0, level, - deleteRowCount); + deleteRowCount, + null); } public static BinaryRow row(int i) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index d91b8a9a0b94..1801c2dd0125 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -87,7 +88,8 @@ public void initialize(String identifier) { CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( new CoreOptions(new HashMap<>()), - SCHEMA.getFieldNames())), + SCHEMA.getFieldNames()), + new FileIndexOptions()), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index ad5fe2f800fd..473b333a6740 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -115,6 +115,7 @@ public static DataFileMeta newFile(int name, int level) { 1, 0, level, - 0L); + 0L, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index e066eeaf9ace..9120e7b6c900 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -82,7 +82,8 @@ protected ManifestEntry makeEntry(boolean isAdd, String fileName, Integer partit 0, // not used Collections.emptyList(), Timestamp.fromEpochMillis(200000), - 0L // not used + 0L, // not used + null // not used )); } @@ -243,6 +244,7 @@ public static ManifestEntry makeEntry( 0, // not used 0, // not used 0, // not used - 0L)); + 0L, + null)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java index c424b6094277..4763585b00a4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java @@ -69,6 +69,18 @@ public void testLevel0WithSameSequenceNumbers() { public static DataFileMeta newFile(int level) { return new DataFileMeta( - UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, null, 0, 1, 0, level, 0L); + UUID.randomUUID().toString(), + 0, + 1, + row(0), + row(0), + null, + null, + 0, + 1, + 0, + level, + 0L, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index c4edd76e264e..89007a33a61c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -181,7 +181,8 @@ private DataFileMeta makeInterval(int left, int right) { 0, Collections.emptyList(), Timestamp.fromEpochMillis(100000), - 0L); + 0L, + null); } private List> toMultiset(List> sections) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index 313f9799a5a8..0d891f2c78d3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -357,6 +357,6 @@ private LevelSortedRun level(int level, long size) { } static DataFileMeta file(long size) { - return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 0, 0L); + return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index c8c7be7e12e3..de95819ad889 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -200,7 +200,8 @@ public void testExpireExtraFiles() throws IOException { 0, extraFiles, Timestamp.now(), - 0L); + 0L, + null); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 3846ab234bba..6893649d532d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -20,14 +20,19 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -41,6 +46,9 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.junit.jupiter.api.Test; @@ -348,6 +356,129 @@ public void testStreamingSplitInUnawareBucketMode() throws Exception { commit.close(); } + @Test + public void testBloomFilterInMemory() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("index_column", DataTypes.STRING()) + .field("index_column2", DataTypes.INT()) + .field("index_column3", DataTypes.BIGINT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + "." + + CoreOptions.COLUMNS, + "index_column, index_column2, index_column3"); + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + ".index_column.items", + "150"); + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + ".index_column2.items", + "150"); + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + ".index_column3.items", + "150"); + options.set( + CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "500 B"); + }); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + List result = new ArrayList<>(); + write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L)); + write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L)); + result.addAll(write.prepareCommit(true, 0)); + write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L)); + result.addAll(write.prepareCommit(true, 0)); + commit.commit(0, result); + result.clear(); + + TableScan.Plan plan = + table.newScan() + .withFilter( + new PredicateBuilder(rowType) + .equal(1, BinaryString.fromString("b"))) + .plan(); + List metas = + plan.splits().stream() + .flatMap(split -> ((DataSplit) split).dataFiles().stream()) + .collect(Collectors.toList()); + assertThat(metas.size()).isEqualTo(1); + } + + @Test + public void testBloomFilterInDisk() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("index_column", DataTypes.STRING()) + .field("index_column2", DataTypes.INT()) + .field("index_column3", DataTypes.BIGINT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set( + CoreOptions.FILE_INDEX + + "." + + BloomFilterFileIndex.BLOOM_FILTER + + "." + + CoreOptions.COLUMNS, + "index_column, index_column2, index_column3"); + options.set(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "50 B"); + }); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + List result = new ArrayList<>(); + write.write(GenericRow.of(1, BinaryString.fromString("a"), 2, 3L)); + write.write(GenericRow.of(1, BinaryString.fromString("c"), 2, 3L)); + result.addAll(write.prepareCommit(true, 0)); + write.write(GenericRow.of(1, BinaryString.fromString("b"), 2, 3L)); + result.addAll(write.prepareCommit(true, 0)); + commit.commit(0, result); + result.clear(); + + TableScan.Plan plan = + table.newScan() + .withFilter( + new PredicateBuilder(rowType) + .equal(1, BinaryString.fromString("b"))) + .plan(); + List metas = + plan.splits().stream() + .flatMap(split -> ((DataSplit) split).dataFiles().stream()) + .collect(Collectors.toList()); + assertThat(metas.size()).isEqualTo(2); + + RecordReader reader = + table.newRead() + .withFilter( + new PredicateBuilder(rowType) + .equal(1, BinaryString.fromString("b"))) + .createReader(plan.splits()); + reader.forEachRemaining(row -> assertThat(row.getString(1).toString()).isEqualTo("b")); + } + @Test public void testStreamingProjection() throws Exception { writeData(); @@ -596,4 +727,22 @@ protected FileStoreTable createUnawareBucketFileStoreTable(Consumer con "")); return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + protected FileStoreTable createUnawareBucketFileStoreTable( + RowType rowType, Consumer configure) throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.set(CoreOptions.BUCKET, -1); + configure.accept(conf); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + conf.toMap(), + "")); + return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index a10413005530..8223afa5f576 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -55,7 +55,8 @@ public static DataFileMeta newFileFromSequence( maxSequence, 0, 0, - 0L); + 0L, + null); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java index d428bd29a4aa..c096371d30af 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java @@ -307,12 +307,22 @@ public void testLimit() { assertThat(sql("SELECT * FROM append_table LIMIT 1")).hasSize(1); } + @Test + public void testFileIndex() { + batchSql( + "INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')"); + + assertThat(batchSql("SELECT * FROM index_table WHERE indexc = 'c'")) + .containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3, "c", "BBB")); + } + @Override protected List ddl() { return Arrays.asList( "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')", - "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP) WITH ('bucket' = '-1')"); + "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP) WITH ('bucket' = '-1')", + "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500')"); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java index c607e7a8f4a4..42c125fbbf83 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java @@ -75,6 +75,7 @@ private DataFileMeta newFile() { 1, 0, 0, - 0L); + 0L, + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index 4f53932a2877..ef364a1ed278 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -112,7 +112,8 @@ private DataSplit dataSplit(int partition, int bucket, String... fileNames) { 0, // not used 0, // not used 0, // not used - 0L // not used + 0L, // not used + null // not used )); } return DataSplit.builder() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index cbe6cb86d8f3..8d7d4c04e172 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -85,7 +85,8 @@ public static DataFileMeta newFile(int level) { 1, 0, level, - 0L); + 0L, + null); } public static FileStoreSourceSplit newSourceSplit( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 7fd14b1adb86..edd6688da66d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -137,7 +137,8 @@ public TableRead createReadWithKey() { schema, VALUE_TYPE, FileFormatDiscover.of(options), - pathFactory); + pathFactory, + options.fileIndexReadEnabled()); return new KeyValueTableRead(() -> read, () -> rawFileRead, null); }