diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 74e794828b406..0549a793628a4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation; import org.apache.paimon.annotation.Documentation.Immutable; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.lookup.LookupStrategy; @@ -1706,11 +1707,11 @@ public boolean deletionVectorsEnabled() { return options.get(DELETION_VECTORS_ENABLED); } - public Map> indexColumns() { + public FileIndexOptions indexColumns() { String fileIndexPrefix = FILE_INDEX + "."; String fileIndexColumnSuffix = "." + COLUMNS; - Map> indexes = new HashMap<>(); + FileIndexOptions fileIndexOptions = new FileIndexOptions(); for (Map.Entry entry : options.toMap().entrySet()) { String key = entry.getKey(); if (key.startsWith(fileIndexPrefix)) { @@ -1727,8 +1728,7 @@ public Map> indexColumns() { throw new IllegalArgumentException( "Wrong option in " + key + ", should not have empty column"); } - indexes.computeIfAbsent(name.trim(), n -> new HashMap<>()) - .computeIfAbsent(indexType, t -> new Options()); + fileIndexOptions.computeIfAbsent(name.trim(), indexType); } } else { // else, it must be an option @@ -1740,7 +1740,7 @@ public Map> indexColumns() { String cname = kv[1]; String opkey = kv[2]; - if (!indexes.containsKey(cname) || !indexes.get(cname).containsKey(indexType)) { + if (fileIndexOptions.get(cname, indexType) == null) { // if indexes have not set, find .column in options, then set them String columns = options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix); @@ -1760,8 +1760,7 @@ public Map> indexColumns() { if (cname.equals(tname)) { foundTarget = true; } - indexes.computeIfAbsent(tname, n -> new HashMap<>()) - .computeIfAbsent(indexType, t -> new Options()); + fileIndexOptions.computeIfAbsent(name.trim(), indexType); } if (!foundTarget) { throw new IllegalArgumentException( @@ -1773,12 +1772,11 @@ public Map> indexColumns() { + columns); } } - - indexes.get(cname).get(indexType).set(opkey, entry.getValue()); + fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue()); } } } - return indexes; + return fileIndexOptions; } public long fileIndexInManifestThreshold() { diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java new file mode 100644 index 0000000000000..b8ee48479733a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.options.Options; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** Options of file index column. */ +public class FileIndexOptions { + + private final Map> indexTypeOptions; + + public FileIndexOptions() { + this.indexTypeOptions = new HashMap<>(); + } + + public void computeIfAbsent(String column, String indexType) { + indexTypeOptions + .computeIfAbsent(column, c -> new HashMap<>()) + .computeIfAbsent(indexType, i -> new Options()); + } + + public Options get(String column, String indexType) { + return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null)) + .map(x -> x.get(indexType)) + .orElse(null); + } + + public boolean isEmpty() { + return indexTypeOptions.isEmpty(); + } + + public Set>> entrySet() { + return indexTypeOptions.entrySet(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index dc7e94e17ef96..d7e2277eb8704 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.CompactIncrement; @@ -35,7 +36,6 @@ import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.MemorySize; -import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowType; @@ -52,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; /** @@ -80,7 +79,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private SinkWriter sinkWriter; private final FieldStatsCollector.Factory[] statsCollectors; private final IOManager ioManager; - private final Map> fileIndexes; + private final FileIndexOptions fileIndexes; private final long inManifestThreshold; private MemorySegmentPool memorySegmentPool; @@ -105,7 +104,7 @@ public AppendOnlyWriter( String spillCompression, FieldStatsCollector.Factory[] statsCollectors, MemorySize maxDiskSize, - Map> fileIndexes, + FileIndexOptions fileIndexes, long inManifestThreshold) { this.fileIO = fileIO; this.schemaId = schemaId; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 1bf5fc55b6496..ef83e8598790f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -34,9 +34,7 @@ public class DataFilePathFactory { public static final String CHANGELOG_FILE_PREFIX = "changelog-"; - public static final String INDEX_PATH_PREFIX = "index-"; - - public static final String INDEX_PATH_SUFFIX = "index"; + public static final String INDEX_PATH_SUFFIX = ".index"; private final Path parent; private final String uuid; @@ -74,10 +72,8 @@ public String uuid() { return uuid; } - public static Path toIndexPath(Path filePath) { - return new Path( - filePath.getParent(), - INDEX_PATH_PREFIX + filePath.getName() + "." + INDEX_PATH_SUFFIX); + public static Path toFileIndexPath(Path filePath) { + return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX); } public static String formatIdentifier(String fileName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexFileReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexFileReader.java new file mode 100644 index 0000000000000..45eb242ee3246 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexFileReader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexPredicate; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; + +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** File index reader, do the filter in the constructor. */ +public class FileIndexFileReader implements RecordReader { + + private final RecordReader reader; + + public FileIndexFileReader( + FileIO fileIO, + TableSchema dataSchema, + List dataFilter, + DataFilePathFactory dataFilePathFactory, + DataFileMeta file, + ConcatRecordReader.ReaderSupplier readerSupplier) + throws IOException { + boolean filterThisFile = false; + if (dataFilter != null && !dataFilter.isEmpty()) { + List indexFiles = + file.extraFiles().stream() + .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + if (!indexFiles.isEmpty()) { + // go to file index check + try (FileIndexPredicate predicate = + new FileIndexPredicate( + dataFilePathFactory.toPath(indexFiles.get(0)), + fileIO, + dataSchema.logicalRowType())) { + if (!predicate.testPredicate( + PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) { + filterThisFile = true; + } + } + } + } + + this.reader = filterThisFile ? null : readerSupplier.get(); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + return reader == null ? null : reader.readBatch(); + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java similarity index 62% rename from paimon-core/src/main/java/org/apache/paimon/io/IndexWriter.java rename to paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java index 57989bbb0d3d1..f7b36d2fc6550 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/IndexWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java @@ -20,28 +20,28 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.fileindex.FileIndexFormat; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.fileindex.FileIndexer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; /** Index file writer. */ -public final class IndexWriter { +public final class FileIndexWriter { - private static final Pair> EMPTY_RESULT = - Pair.of(null, Collections.emptyList()); + public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null); private final FileIO fileIO; @@ -52,15 +52,15 @@ public final class IndexWriter { private final List indexMaintainers = new ArrayList<>(); - private final List resultFileNames = new ArrayList<>(); + private String resultFileName; private byte[] embeddedIndexBytes; - public IndexWriter( + public FileIndexWriter( FileIO fileIO, Path path, RowType rowType, - Map> fileIndexes, + FileIndexOptions fileIndexes, long inManifestThreshold) { this.fileIO = fileIO; this.path = path; @@ -98,10 +98,6 @@ public void write(InternalRow row) { } public void close() throws IOException { - if (indexMaintainers.isEmpty()) { - return; - } - Map> indexMaps = new HashMap<>(); for (IndexMaintainer indexMaintainer : indexMaintainers) { @@ -119,15 +115,86 @@ public void close() throws IOException { try (OutputStream outputStream = fileIO.newOutputStream(path, false)) { outputStream.write(baos.toByteArray()); } - resultFileNames.add(path.getName()); + resultFileName = path.getName(); } else { embeddedIndexBytes = baos.toByteArray(); } } - public Pair> result() { - return indexMaintainers.isEmpty() - ? EMPTY_RESULT - : Pair.of(embeddedIndexBytes, resultFileNames); + public FileIndexResult result() { + return FileIndexResult.of(embeddedIndexBytes, resultFileName); + } + + @Nullable + public static FileIndexWriter create( + FileIO fileIO, + Path path, + RowType rowType, + FileIndexOptions fileIndexOptions, + long inManifestThreshold) { + return fileIndexOptions.isEmpty() + ? null + : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, inManifestThreshold); + } + + /** File index result. */ + public interface FileIndexResult { + + @Nullable + byte[] embeddedIndexBytes(); + + @Nullable + String independentIndexFile(); + + static FileIndexResult of(byte[] embeddedIndexBytes, String resultFileName) { + return new FileIndexResult() { + + @Override + public byte[] embeddedIndexBytes() { + return embeddedIndexBytes; + } + + @Override + public String independentIndexFile() { + return resultFileName; + } + }; + } + } + + /** One index maintainer for one column. */ + private static class IndexMaintainer { + + private final String columnName; + private final String indexType; + private final org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter; + private final InternalRow.FieldGetter getter; + + public IndexMaintainer( + String columnName, + String indexType, + org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter, + InternalRow.FieldGetter getter) { + this.columnName = columnName; + this.indexType = indexType; + this.fileIndexWriter = fileIndexWriter; + this.getter = getter; + } + + public void write(InternalRow row) { + fileIndexWriter.write(getter.getFieldOrNull(row)); + } + + public String getIndexType() { + return indexType; + } + + public String getColumnName() { + return columnName; + } + + public byte[] serializedBytes() { + return fileIndexWriter.serializedBytes(); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/io/IndexMaintainer.java deleted file mode 100644 index 5cf145d7f45b3..0000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/io/IndexMaintainer.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.io; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexWriter; - -/** One index maintainer for one column. */ -public class IndexMaintainer { - - private final String columnName; - private final String indexType; - private final FileIndexWriter fileIndexWriter; - private final InternalRow.FieldGetter getter; - - public IndexMaintainer( - String columnName, - String indexType, - FileIndexWriter fileIndexWriter, - InternalRow.FieldGetter getter) { - this.columnName = columnName; - this.indexType = indexType; - this.fileIndexWriter = fileIndexWriter; - this.getter = getter; - } - - public void write(InternalRow row) { - fileIndexWriter.write(getter.getFieldOrNull(row)); - } - - public String getIndexType() { - return indexType; - } - - public String getColumnName() { - return columnName; - } - - public byte[] serializedBytes() { - return fileIndexWriter.serializedBytes(); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 1d50ed48768a6..23b817a9ddabd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -19,27 +19,24 @@ package org.apache.paimon.io; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.TableStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; -import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collections; -import java.util.List; -import java.util.Map; import java.util.function.Function; -import static org.apache.paimon.io.DataFilePathFactory.toIndexPath; +import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath; /** * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link InternalRow}. @@ -50,7 +47,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter> indexExpr, + FileIndexOptions fileIndexOptions, long inManifestThreshold) { super( fileIO, @@ -76,29 +73,38 @@ public RowDataFileWriter( this.schemaId = schemaId; this.seqNumCounter = seqNumCounter; this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); - this.indexWriter = - new IndexWriter( - fileIO, toIndexPath(path), writeSchema, indexExpr, inManifestThreshold); + this.fileIndexWriter = + FileIndexWriter.create( + fileIO, + toFileIndexPath(path), + writeSchema, + fileIndexOptions, + inManifestThreshold); } @Override public void write(InternalRow row) throws IOException { super.write(row); // add row to index if needed - indexWriter.write(row); + if (fileIndexWriter != null) { + fileIndexWriter.write(row); + } seqNumCounter.add(1L); } @Override public void close() throws IOException { - indexWriter.close(); + if (fileIndexWriter != null) { + fileIndexWriter.close(); + } super.close(); } @Override public DataFileMeta result() throws IOException { BinaryTableStats stats = statsArraySerializer.toBinary(fieldStats()); - Pair> indexResult = indexWriter.result(); + FileIndexWriter.FileIndexResult indexResult = + fileIndexWriter == null ? FileIndexWriter.EMPTY_RESULT : fileIndexWriter.result(); return DataFileMeta.forAppend( path.getName(), fileIO.getFileSize(path), @@ -107,7 +113,9 @@ public DataFileMeta result() throws IOException { seqNumCounter.getValue() - super.recordCount(), seqNumCounter.getValue() - 1, schemaId, - indexResult.getRight() == null ? Collections.emptyList() : indexResult.getRight(), - indexResult.getLeft()); + indexResult.independentIndexFile() == null + ? Collections.emptyList() + : Collections.singletonList(indexResult.independentIndexFile()), + indexResult.embeddedIndexBytes()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java index 42369e5b19aa8..c1131dcd728d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java @@ -19,16 +19,14 @@ package org.apache.paimon.io; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.avro.AvroFileFormat; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.options.Options; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; -import java.util.Map; - /** {@link RollingFileWriter} for data files containing {@link InternalRow}. */ public class RowDataRollingFileWriter extends RollingFileWriter { @@ -42,7 +40,7 @@ public RowDataRollingFileWriter( LongCounter seqNumCounter, String fileCompression, FieldStatsCollector.Factory[] statsCollectors, - Map> fileIndexes, + FileIndexOptions fileIndexes, long inManifestThreshold) { super( () -> diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index dc9fca79aeb18..c001fdfb8bad2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -21,18 +21,17 @@ import org.apache.paimon.AppendOnlyFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.FileIndexFileReader; import org.apache.paimon.io.FileRecordReader; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; @@ -56,7 +55,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; @@ -188,31 +186,9 @@ public RecordReader createReader(DataSplit split) throws IOExceptio TableSchema dataSchema = formatMappingTriple.f0; List dataFilter = formatMappingTriple.f1; BulkFormatMapping bulkFormatMapping = formatMappingTriple.f2; - if (dataFilter != null && !dataFilter.isEmpty()) { - List indexFiles = - file.extraFiles().stream() - .filter( - name -> - name.startsWith( - DataFilePathFactory.INDEX_PATH_PREFIX)) - .collect(Collectors.toList()); - if (fileIndexReadEnabled && !indexFiles.isEmpty()) { - // go to file index check - try (FileIndexPredicate predicate = - new FileIndexPredicate( - dataFilePathFactory.toPath(indexFiles.get(0)), - fileIO, - dataSchema.logicalRowType())) { - if (!predicate.testPredicate( - PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) { - continue; - } - } - } - } final BinaryRow partition = split.partition(); - suppliers.add( + ConcatRecordReader.ReaderSupplier supplier = () -> new FileRecordReader( bulkFormatMapping.getReaderFactory(), @@ -223,7 +199,19 @@ public RecordReader createReader(DataSplit split) throws IOExceptio bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create( - bulkFormatMapping.getPartitionPair(), partition))); + bulkFormatMapping.getPartitionPair(), partition)); + + suppliers.add( + fileIndexReadEnabled + ? () -> + new FileIndexFileReader( + fileIO, + dataSchema, + dataFilter, + dataFilePathFactory, + file, + supplier) + : supplier); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index e9923856ef43b..baa2e9f4ab4ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -48,6 +48,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { private Predicate filter; + // just cache. private final Map dataFilterMapping = new HashMap<>(); public AppendOnlyFileStoreScan( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 6f359eb1626ce..b97ff88479b1b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -27,13 +27,13 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.options.MemorySize; -import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.table.BucketMode; @@ -75,7 +75,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite private final boolean spillable; private final MemorySize maxDiskSize; private final FieldStatsCollector.Factory[] statsCollectors; - private final Map> fileIndexes; + private final FileIndexOptions fileIndexes; private final long inManifestThreshold; private boolean forceBufferSpill = false; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index a9b9fdb377d00..51289c3336fc1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.disk.ExternalBuffer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FieldStats; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; @@ -605,7 +606,7 @@ private Pair> createWriter( StatsCollectorFactories.createStatsFactories( options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, - Collections.emptyMap(), + new FileIndexOptions(), 0); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 71f62041b2345..6e8083f40a641 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; @@ -43,7 +44,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -91,7 +91,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception StatsCollectorFactories.createStatsFactories( options, SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, - Collections.emptyMap(), + new FileIndexOptions(), 0); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index d468e92235c13..3a849b2542594 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -39,7 +40,6 @@ import java.io.File; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import static org.apache.paimon.CoreOptions.FileFormatType; @@ -89,7 +89,7 @@ public void initialize(String identifier) { StatsCollectorFactories.createStatsFactories( new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames()), - Collections.emptyMap(), + new FileIndexOptions(), 0), TARGET_FILE_SIZE); }