diff --git a/docs/content/append-table/file-index.md b/docs/content/append-table/file-index.md new file mode 100644 index 0000000000000..4bf769d6eea31 --- /dev/null +++ b/docs/content/append-table/file-index.md @@ -0,0 +1,61 @@ +--- +title: "File Index" +weight: 4 +type: docs +aliases: +- /append-table/file-index.html +--- + + +# Data File Index + +Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index +file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file +corresponds to an index file, which has a separate file definition and can contain different types of indexes with +multiple columns. + +## Concept + +Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will +be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file, +which has a separate file definition and can contain different types of indexes with multiple columns. + +## Usage + +Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup +scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter +currently, but other types of index will be supported in the future. + +Currently, file index is only supported in append-only table. + +`Bloom Filter`: +* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index. +* `file-index.bloom-filter..fpp` to config false positive probability. +* `file-index.bloom-filter..items` to config the expected distinct items in one data file. + +More filter types will be supported... + +## Procedure + +If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before +we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config +`file-index..columns` to the table. + +How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}}) diff --git a/docs/content/concepts/specification.md b/docs/content/concepts/specification.md index 8463b51ed2ff0..3116f2eed8395 100644 --- a/docs/content/concepts/specification.md +++ b/docs/content/concepts/specification.md @@ -247,33 +247,7 @@ Global Index is in the index directory, currently, only two places will use glob ## Data File Index -### Concept - -Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will -be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file, -which has a separate file definition and can contain different types of indexes with multiple columns. - -### Usage - -Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup -scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter -currently, but other types of index will be supported in the future. - -Currently, file index is only supported in append-only table. - -`Bloom Filter`: -* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index. -* `file-index.bloom-filter..fpp` to config false positive probability. -* `file-index.bloom-filter..items` to config the expected distinct items in one data file. - - -More filter types will be supported... - -### Procedure - -If you want to add file index to existing table, without any rewrite, you can use `file_index_rewrite` procedure. Before -we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config -`file-index..columns` to the table. - -How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}}) - +Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index +file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file +corresponds to an index file, which has a separate file definition and can contain different types of indexes with +multiple columns. diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index c855ee1bf7eb7..da7b7ff5b7cf8 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -275,20 +275,20 @@ All available procedures are listed below. CALL sys.repair('test_db.T') - file_index_rewrite + rewrite_file_index - CALL sys.file_index_rewrite(<identifier> [, <partitions>])

+ CALL sys.rewrite_file_index(<identifier> [, <partitions>])

Rewrite the file index for the table. Argument:
  • identifier: <databaseName>.<tableName>.
  • -
  • partitions : partition filter.
  • +
  • partitions : specific partitions.
  • -- rewrite the file index for the whole table
    - CALL sys.file_index_rewrite('test_db.T')

    + CALL sys.rewrite_file_index('test_db.T')

    -- repair all tables in a specific partition
    - CALL sys.file_index_rewrite('test_db.T', 'pt=a')

    + CALL sys.rewrite_file_index('test_db.T', 'pt=a')

    diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java similarity index 78% rename from paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java rename to paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java index c21d5418a67a6..83d0cf078f3ab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.fileindex.FileIndexCommon; import org.apache.paimon.fileindex.FileIndexFormat; import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.fileindex.FileIndexWriter; import org.apache.paimon.fileindex.FileIndexer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -45,8 +46,8 @@ import java.util.List; import java.util.Map; -/** Index file writer. */ -public final class FileIndexWriter implements Closeable { +/** Index file writer for a data file. */ +public final class DataFileIndexWriter implements Closeable { public static final FileIndexResult EMPTY_RESULT = FileIndexResult.of(null, null); @@ -63,12 +64,12 @@ public final class FileIndexWriter implements Closeable { private byte[] embeddedIndexBytes; - public FileIndexWriter( + public DataFileIndexWriter( FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions, - @Nullable Map evolutionMap) { + @Nullable Map colNameMapping) { this.fileIO = fileIO; this.path = path; List fields = rowType.getFields(); @@ -82,15 +83,15 @@ public FileIndexWriter( for (Map.Entry> entry : fileIndexOptions.entrySet()) { FileIndexOptions.Column entryColumn = entry.getKey(); - String tempName = entryColumn.getColumnName(); - if (evolutionMap != null) { - tempName = evolutionMap.getOrDefault(tempName, null); - if (tempName == null) { + String colName = entryColumn.getColumnName(); + if (colNameMapping != null) { + colName = colNameMapping.getOrDefault(colName, null); + if (colName == null) { continue; } } - final String columnName = tempName; + String columnName = colName; DataField field = map.get(columnName); if (field == null) { throw new IllegalArgumentException(columnName + " does not exist in column fields"); @@ -98,6 +99,7 @@ public FileIndexWriter( for (Map.Entry typeEntry : entry.getValue().entrySet()) { String indexType = typeEntry.getKey(); + IndexMaintainer maintainer = indexMaintainers.get(columnName); if (entryColumn.isNestedColumn()) { if (field.type().getTypeRoot() != DataTypeRoot.MAP) { throw new IllegalArgumentException( @@ -105,34 +107,36 @@ public FileIndexWriter( + columnName + " is nested column, but is not map type. Only should map type yet."); } - MapType mapType = (MapType) field.type(); - ((MapFileIndexMaintainer) - indexMaintainers.computeIfAbsent( - columnName, - name -> - new MapFileIndexMaintainer( - columnName, - indexType, - mapType.getKeyType(), - mapType.getValueType(), - fileIndexOptions.getMapTopLevelOptions( - columnName, typeEntry.getKey()), - index.get(columnName)))) - .add(entryColumn.getNestedColumnName(), typeEntry.getValue()); + MapFileIndexMaintainer mapMaintainer = (MapFileIndexMaintainer) maintainer; + if (mapMaintainer == null) { + MapType mapType = (MapType) field.type(); + mapMaintainer = + new MapFileIndexMaintainer( + columnName, + indexType, + mapType.getKeyType(), + mapType.getValueType(), + fileIndexOptions.getMapTopLevelOptions( + columnName, typeEntry.getKey()), + index.get(columnName)); + indexMaintainers.put(columnName, mapMaintainer); + } + mapMaintainer.add(entryColumn.getNestedColumnName(), typeEntry.getValue()); } else { - indexMaintainers.computeIfAbsent( - columnName, - name -> - new FileIndexMaintainer( - columnName, - indexType, - FileIndexer.create( - indexType, - field.type(), - typeEntry.getValue()) - .createWriter(), - InternalRow.createFieldGetter( - field.type(), index.get(columnName)))); + if (maintainer == null) { + maintainer = + new FileIndexMaintainer( + columnName, + indexType, + FileIndexer.create( + indexType, + field.type(), + typeEntry.getValue()) + .createWriter(), + InternalRow.createFieldGetter( + field.type(), index.get(columnName))); + indexMaintainers.put(columnName, maintainer); + } } } } @@ -149,18 +153,18 @@ public void write(InternalRow row) { public void close() throws IOException { Map> indexMaps = serializeMaintainers(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(baos)) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (FileIndexFormat.Writer writer = FileIndexFormat.createWriter(out)) { writer.writeColumnIndexes(indexMaps); } - if (baos.size() > inManifestThreshold) { + if (out.size() > inManifestThreshold) { try (OutputStream outputStream = fileIO.newOutputStream(path, true)) { - outputStream.write(baos.toByteArray()); + outputStream.write(out.toByteArray()); } resultFileName = path.getName(); } else { - embeddedIndexBytes = baos.toByteArray(); + embeddedIndexBytes = out.toByteArray(); } } @@ -182,21 +186,21 @@ public FileIndexResult result() { } @Nullable - public static FileIndexWriter create( + public static DataFileIndexWriter create( FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions) { return create(fileIO, path, rowType, fileIndexOptions, null); } @Nullable - public static FileIndexWriter create( + public static DataFileIndexWriter create( FileIO fileIO, Path path, RowType rowType, FileIndexOptions fileIndexOptions, - @Nullable Map evolutionMap) { + @Nullable Map colNameMapping) { return fileIndexOptions.isEmpty() ? null - : new FileIndexWriter(fileIO, path, rowType, fileIndexOptions, evolutionMap); + : new DataFileIndexWriter(fileIO, path, rowType, fileIndexOptions, colNameMapping); } /** File index result. */ @@ -238,13 +242,13 @@ private static class FileIndexMaintainer implements IndexMaintainer { private final String columnName; private final String indexType; - private final org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter; + private final FileIndexWriter fileIndexWriter; private final InternalRow.FieldGetter getter; public FileIndexMaintainer( String columnName, String indexType, - org.apache.paimon.fileindex.FileIndexWriter fileIndexWriter, + FileIndexWriter fileIndexWriter, InternalRow.FieldGetter getter) { this.columnName = columnName; this.indexType = indexType; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index f7da2760ee044..6c7090f3d8eb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -72,11 +72,11 @@ public String uuid() { return uuid; } - public static Path toFileIndexPath(Path filePath) { - return new Path(filePath.getParent(), filePath.getName() + INDEX_PATH_SUFFIX); + public static Path dataFileToFileIndexPath(Path dataFilePath) { + return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX); } - public static Path fileIndexPathIncrease(Path filePath) { + public static Path createNewFileIndexFilePath(Path filePath) { String fileName = filePath.getName(); int dot = fileName.lastIndexOf("."); int dash = fileName.lastIndexOf("-"); @@ -87,8 +87,8 @@ public static Path fileIndexPathIncrease(Path filePath) { return new Path( filePath.getParent(), fileName.substring(0, dash + 1) + (num + 1) + INDEX_PATH_SUFFIX); - } catch (NumberFormatException e) { - // ignore + } catch (NumberFormatException ignore) { + // it is the first index file, has no number } } return new Path( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 3da9098174cfb..ae18768c953a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -37,7 +37,7 @@ import java.util.Collections; import java.util.function.Function; -import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath; +import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; /** * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link InternalRow}. @@ -48,7 +48,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter partitionGetter() { return row -> deserializeBinaryRow(row.getBinary(2)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java index 329dee2301034..a83d805491ee5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java @@ -112,6 +112,19 @@ public final List deserializeList(byte[] bytes) throws IOException { return deserializeList(view); } + public byte[] serializeToBytes(T record) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(record, view); + return out.toByteArray(); + } + + public T deserializeFromBytes(byte[] bytes) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in); + return deserialize(view); + } + /** Convert a {@link T} to {@link InternalRow}. */ public abstract InternalRow toRow(T record); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java similarity index 83% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java index 21582ae287da9..a85094b9aa25e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FileIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java @@ -20,9 +20,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.sink.FileIndexSink; import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy; -import org.apache.paimon.flink.source.FileIndexScanSource; +import org.apache.paimon.flink.sink.RewriteFileIndexSink; +import org.apache.paimon.flink.source.RewriteFileIndexSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.predicate.Predicate; @@ -46,12 +46,12 @@ import static org.apache.paimon.utils.ParameterUtils.getPartitions; -/** Migrate procedure to migrate hive table to paimon table. */ -public class FileIndexProcedure extends ProcedureBase { +/** Rewrite file index procedure to re-generated all file index. */ +public class RewriteFileIndexProcedure extends ProcedureBase { @Override public String identifier() { - return "file_index_rewrite"; + return "rewrite_file_index"; } public String[] call(ProcedureContext procedureContext, String sourceTablePath) @@ -89,30 +89,20 @@ public String[] call( partitionPredicate = null; } - TopoBuilder.build(env, (FileStoreTable) table, partitionPredicate); - + FileStoreTable storeTable = (FileStoreTable) table; + DataStreamSource source = + env.fromSource( + new RewriteFileIndexSource(storeTable, partitionPredicate), + WatermarkStrategy.noWatermarks(), + "index source", + new ManifestEntryTypeInfo()); + new RewriteFileIndexSink(storeTable).sinkFrom(source); return execute(env, "Add file index for table: " + sourceTablePath); } - private static class TopoBuilder { - - public static void build( - StreamExecutionEnvironment env, - FileStoreTable table, - Predicate partitionPredicate) { - DataStreamSource source = - env.fromSource( - new FileIndexScanSource(table, partitionPredicate), - WatermarkStrategy.noWatermarks(), - "index source", - new TypeInfo()); - new FileIndexSink(table).sinkFrom(source); - } - } - - private static class TypeInfo extends GenericTypeInfo { + private static class ManifestEntryTypeInfo extends GenericTypeInfo { - public TypeInfo() { + public ManifestEntryTypeInfo() { super(ManifestEntry.class); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java similarity index 78% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index a69f3f23cf01a..39dcca03c6aab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -23,14 +23,14 @@ import org.apache.paimon.fileindex.FileIndexCommon; import org.apache.paimon.fileindex.FileIndexFormat; import org.apache.paimon.fileindex.FileIndexOptions; -import org.apache.paimon.flink.procedure.FileIndexProcedure; +import org.apache.paimon.flink.procedure.RewriteFileIndexProcedure; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileIndexWriter; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.FileIndexWriter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; @@ -45,13 +45,14 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import javax.annotation.Nullable; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -64,13 +65,13 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.paimon.io.DataFilePathFactory.fileIndexPathIncrease; -import static org.apache.paimon.io.DataFilePathFactory.toFileIndexPath; +import static org.apache.paimon.io.DataFilePathFactory.createNewFileIndexFilePath; +import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; -/** File index sink for {@link FileIndexProcedure}. */ -public class FileIndexSink extends FlinkWriteSink { +/** File index sink for {@link RewriteFileIndexProcedure}. */ +public class RewriteFileIndexSink extends FlinkWriteSink { - public FileIndexSink(FileStoreTable table) { + public RewriteFileIndexSink(FileStoreTable table) { super(table, null); } @@ -84,6 +85,8 @@ protected OneInputStreamOperator createWriteOperator private static class FileIndexModificationOperator extends PrepareCommitOperator { + private static final long serialVersionUID = 1L; + private final FileStoreTable table; private transient FileIndexProcessor fileIndexProcessor; @@ -145,7 +148,7 @@ public static class FileIndexProcessor { private final FileIO fileIO; private final FileStorePathFactory pathFactory; private final Map, DataFilePathFactory> dataFilePathFactoryMap; - private final SchemaCache schemaCache; + private final SchemaCache schemaInfoCache; private final long sizeInMeta; public FileIndexProcessor(FileStoreTable table) { @@ -154,7 +157,7 @@ public FileIndexProcessor(FileStoreTable table) { this.fileIO = table.fileIO(); this.pathFactory = table.store().pathFactory(); this.dataFilePathFactoryMap = new HashMap<>(); - this.schemaCache = + this.schemaInfoCache = new SchemaCache(fileIndexOptions, new SchemaManager(fileIO, table.location())); this.sizeInMeta = table.coreOptions().fileIndexInManifestThreshold(); } @@ -166,13 +169,7 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi Pair.of(partition, bucket), p -> pathFactory.createDataFilePathFactory(partition, bucket)); - Tuple4, int[], Map>> t4 = - schemaCache.schemaInfo(dataFileMeta.schemaId()); - RowType fileSchema = t4.f0; - Map evolutionNameMap = t4.f1; - int[] projection = t4.f2; - Map> expectedFileIndex = t4.f3; - + SchemaInfo schemaInfo = schemaInfoCache.schemaInfo(dataFileMeta.schemaId()); List extras = new ArrayList<>(dataFileMeta.extraFiles()); List indexFiles = dataFileMeta.extraFiles().stream() @@ -188,20 +185,22 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)), - fileSchema)) { + schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } - newIndexPath = fileIndexPathIncrease(dataFilePathFactory.toPath(indexFile)); + newIndexPath = createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile)); } else { maintainers = new HashMap<>(); - newIndexPath = toFileIndexPath(dataFilePathFactory.toPath(dataFileMeta.fileName())); + newIndexPath = + dataFileToFileIndexPath( + dataFilePathFactory.toPath(dataFileMeta.fileName())); } // remove unnecessary for (Map.Entry> entry : new HashSet<>(maintainers.entrySet())) { String name = entry.getKey(); - if (!expectedFileIndex.containsKey(name)) { + if (!schemaInfo.projectedColFullNames.contains(name)) { maintainers.remove(name); } else { Map indexTypeBytes = maintainers.get(name); @@ -213,18 +212,19 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi } } - // ignore close - FileIndexWriter fileIndexWriter = - FileIndexWriter.create( + // ignore close, do not close to write file, only collect serialized maintainers + @SuppressWarnings("resource") + DataFileIndexWriter dataFileIndexWriter = + DataFileIndexWriter.create( fileIO, newIndexPath, - fileSchema.project(projection), + schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols), fileIndexOptions, - evolutionNameMap); - if (fileIndexWriter != null) { + schemaInfo.colNameMapping); + if (dataFileIndexWriter != null) { try (RecordReader reader = table.newReadBuilder() - .withProjection(projection) + .withProjection(schemaInfo.projectedIndexCols) .newRead() .createReader( DataSplit.builder() @@ -238,10 +238,10 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi Collections.singletonList(dataFileMeta)) .rawConvertible(true) .build())) { - reader.forEachRemaining(fileIndexWriter::write); + reader.forEachRemaining(dataFileIndexWriter::write); } - fileIndexWriter + dataFileIndexWriter .serializeMaintainers() .forEach( (key, value) -> @@ -276,38 +276,37 @@ private static class SchemaCache { private final FileIndexOptions fileIndexOptions; private final SchemaManager schemaManager; - private final TableSchema latest; - private final Map< - Long, Tuple4, int[], Map>>> - schemaInfos; - + private final TableSchema currentSchema; + private final Map schemaInfos; private final Set fileSchemaIds; public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager schemaManager) { this.fileIndexOptions = fileIndexOptions; this.schemaManager = schemaManager; - this.latest = schemaManager.latest().orElseThrow(RuntimeException::new); + this.currentSchema = schemaManager.latest().orElseThrow(RuntimeException::new); this.schemaInfos = new HashMap<>(); this.fileSchemaIds = new HashSet<>(); } - public Tuple4, int[], Map>> schemaInfo( - long schemaId) { + public SchemaInfo schemaInfo(long schemaId) { if (!fileSchemaIds.contains(schemaId)) { RowType fileSchema = schemaManager.schema(schemaId).logicalRowType(); - Map evolutionmap = - schemaId == latest.id() + + @Nullable + Map colNameMapping = + schemaId == currentSchema.id() ? null - : createIndexNameMapping(latest.fields(), fileSchema.getFields()); + : createIndexNameMapping( + currentSchema.fields(), fileSchema.getFields()); - List projectedColumnNames = new ArrayList<>(); - Map> expectedIndexNameType = new HashMap<>(); + List projectedColNames = new ArrayList<>(); + Set projectedColFullNames = new HashSet<>(); for (Map.Entry> entry : fileIndexOptions.entrySet()) { FileIndexOptions.Column column = entry.getKey(); String columnName; - if (evolutionmap != null) { - columnName = evolutionmap.getOrDefault(column.getColumnName(), null); + if (colNameMapping != null) { + columnName = colNameMapping.getOrDefault(column.getColumnName(), null); // if column name has no corresponding field, then we just skip it if (columnName == null) { continue; @@ -315,26 +314,24 @@ public Tuple4, int[], Map>> sch } else { columnName = column.getColumnName(); } - projectedColumnNames.add(columnName); + projectedColNames.add(columnName); String fullColumnName = column.isNestedColumn() ? FileIndexCommon.toMapKey( columnName, column.getNestedColumnName()) : column.getColumnName(); - expectedIndexNameType - .computeIfAbsent(fullColumnName, name -> new HashSet<>()) - .addAll(entry.getValue().keySet()); + projectedColFullNames.add(fullColumnName); } schemaInfos.put( schemaId, - Tuple4.of( + new SchemaInfo( fileSchema, - evolutionmap, - projectedColumnNames.stream() + colNameMapping, + projectedColNames.stream() .mapToInt(fileSchema::getFieldIndex) .toArray(), - expectedIndexNameType)); + projectedColFullNames)); fileSchemaIds.add(schemaId); } @@ -359,4 +356,23 @@ private static Map createIndexNameMapping( return indexMapping; } } + + private static class SchemaInfo { + + private final RowType fileSchema; + private final Map colNameMapping; + private final int[] projectedIndexCols; + private final Set projectedColFullNames; + + private SchemaInfo( + RowType fileSchema, + Map colNameMapping, + int[] projectedIndexCols, + Set projectedColFullNames) { + this.fileSchema = fileSchema; + this.colNameMapping = colNameMapping; + this.projectedIndexCols = projectedIndexCols; + this.projectedColFullNames = projectedColFullNames; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java index fa83aad83640b..9a488613f51ab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileIndexScanSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RewriteFileIndexSource.java @@ -51,16 +51,18 @@ import java.util.stream.Collectors; /** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */ -public class FileIndexScanSource +public class RewriteFileIndexSource implements Source< - ManifestEntry, FileIndexScanSource.Split, FileIndexScanSource.CheckpointState> { + ManifestEntry, + RewriteFileIndexSource.Split, + RewriteFileIndexSource.CheckpointState> { - private static final long serialVersionUID = 2319102734891237489L; + private static final long serialVersionUID = 1L; private final FileStoreTable table; @Nullable private final Predicate partitionPredicate; - public FileIndexScanSource(FileStoreTable table, @Nullable Predicate partitionPredicate) { + public RewriteFileIndexSource(FileStoreTable table, @Nullable Predicate partitionPredicate) { this.table = table; this.partitionPredicate = partitionPredicate; } diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 2fe8403dc835b..c41539091f0ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -32,7 +32,7 @@ org.apache.paimon.flink.action.QueryServiceActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure org.apache.paimon.flink.procedure.CompactProcedure -org.apache.paimon.flink.procedure.FileIndexProcedure +org.apache.paimon.flink.procedure.RewriteFileIndexProcedure org.apache.paimon.flink.procedure.CreateTagProcedure org.apache.paimon.flink.procedure.DeleteTagProcedure org.apache.paimon.flink.procedure.CreateBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java index 1fbbb586992e4..d465287ffcadd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FileIndexProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java @@ -40,8 +40,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -/** IT Case for {@link FileIndexProcedure}. */ -public class FileIndexProcedureITCase extends CatalogITCaseBase { +/** IT Case for {@link RewriteFileIndexProcedure}. */ +public class RewriteFileIndexProcedureITCase extends CatalogITCaseBase { @Test public void testFileIndexProcedureAddIndex() throws Exception { @@ -61,7 +61,7 @@ public void testFileIndexProcedureAddIndex() throws Exception { tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')"); - sql("CALL sys.file_index_rewrite('default.T')"); + sql("CALL sys.rewrite_file_index('default.T')"); FileStoreTable table = paimonTable("T"); List list = table.store().newScan().plan().files(); @@ -156,7 +156,7 @@ public void testFileIndexProcedureSchemaEvolution() throws Exception { tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='order_id,v')"); - sql("CALL sys.file_index_rewrite('default.T')"); + sql("CALL sys.rewrite_file_index('default.T')"); reader = table.newRead() @@ -188,7 +188,7 @@ public void testFileIndexProcedureDropIndex() throws Exception { tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k')"); - sql("CALL sys.file_index_rewrite('default.T')"); + sql("CALL sys.rewrite_file_index('default.T')"); FileStoreTable table = paimonTable("T"); List list = table.store().newScan().plan().files(); @@ -219,7 +219,7 @@ public void testFileIndexProcedureDropIndex() throws Exception { sql("ALTER TABLE T RESET ('file-index.bloom-filter.columns')"); - sql("CALL sys.file_index_rewrite('default.T')"); + sql("CALL sys.rewrite_file_index('default.T')"); table = paimonTable("T"); list = table.store().newScan().plan().files();