diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index c0341c208757..21e8c52118b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -33,13 +33,17 @@ import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.KeyValueFieldsExtractor; +import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.AsyncRecordReader; import org.apache.paimon.utils.BulkFormatMapping; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; import javax.annotation.Nullable; @@ -61,7 +65,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory { private final RowType keyType; private final RowType valueType; - private final BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder; + private final BulkFormatMappingBuilder bulkFormatMappingBuilder; private final DataFilePathFactory pathFactory; private final long asyncThreshold; @@ -75,7 +79,7 @@ private KeyValueFileReaderFactory( TableSchema schema, RowType keyType, RowType valueType, - BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder, + BulkFormatMappingBuilder bulkFormatMappingBuilder, DataFilePathFactory pathFactory, long asyncThreshold, BinaryRow partition, @@ -272,7 +276,7 @@ public KeyValueFileReaderFactory build( schema, projectedKeyType, projectedValueType, - BulkFormatMapping.newBuilder( + new BulkFormatMappingBuilder( formatDiscover, extractor, keyProjection, valueProjection, filters), pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), @@ -289,4 +293,111 @@ public FileIO fileIO() { return fileIO; } } + + /** Builder to build {@link BulkFormatMapping}. */ + private static class BulkFormatMappingBuilder { + + private final FileFormatDiscover formatDiscover; + private final KeyValueFieldsExtractor extractor; + private final int[][] keyProjection; + private final int[][] valueProjection; + @Nullable private final List filters; + + private BulkFormatMappingBuilder( + FileFormatDiscover formatDiscover, + KeyValueFieldsExtractor extractor, + int[][] keyProjection, + int[][] valueProjection, + @Nullable List filters) { + this.formatDiscover = formatDiscover; + this.extractor = extractor; + this.keyProjection = keyProjection; + this.valueProjection = valueProjection; + this.filters = filters; + } + + public BulkFormatMapping build( + String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { + List tableKeyFields = extractor.keyFields(tableSchema); + List tableValueFields = extractor.valueFields(tableSchema); + int[][] tableProjection = + KeyValue.project(keyProjection, valueProjection, tableKeyFields.size()); + + List dataKeyFields = extractor.keyFields(dataSchema); + List dataValueFields = extractor.valueFields(dataSchema); + + RowType keyType = new RowType(dataKeyFields); + RowType valueType = new RowType(dataValueFields); + RowType dataRecordType = KeyValue.schema(keyType, valueType); + + int[][] dataKeyProjection = + SchemaEvolutionUtil.createDataProjection( + tableKeyFields, dataKeyFields, keyProjection); + int[][] dataValueProjection = + SchemaEvolutionUtil.createDataProjection( + tableValueFields, dataValueFields, valueProjection); + int[][] dataProjection = + KeyValue.project(dataKeyProjection, dataValueProjection, dataKeyFields.size()); + + /* + * We need to create index mapping on projection instead of key and value separately + * here, for example + * + *
    + *
  • the table key fields: 1->d, 3->a, 4->b, 5->c + *
  • the data key fields: 1->a, 2->b, 3->c + *
+ * + *

The value fields of table and data are 0->value_count, the key and value + * projections are as follows + * + *

    + *
  • table key projection: [0, 1, 2, 3], value projection: [0], data projection: [0, + * 1, 2, 3, 4, 5, 6] which 4/5 is seq/kind and 6 is value + *
  • data key projection: [0, 1, 2], value projection: [0], data projection: [0, 1, + * 2, 3, 4, 5] where 3/4 is seq/kind and 5 is value + *
+ * + *

We will get value index mapping null from above and we can't create projection + * index mapping based on key and value index mapping any more. + */ + IndexCastMapping indexCastMapping = + SchemaEvolutionUtil.createIndexCastMapping( + Projection.of(tableProjection).toTopLevelIndexes(), + tableKeyFields, + tableValueFields, + Projection.of(dataProjection).toTopLevelIndexes(), + dataKeyFields, + dataValueFields); + + List dataFilters = + tableSchema.id() == dataSchema.id() + ? filters + : SchemaEvolutionUtil.createDataFilters( + tableSchema.fields(), dataSchema.fields(), filters); + + Pair partitionPair = null; + if (!dataSchema.partitionKeys().isEmpty()) { + Pair partitionMapping = + PartitionUtils.constructPartitionMapping( + dataRecordType, dataSchema.partitionKeys(), dataProjection); + // is partition fields are not selected, we just do nothing. + if (partitionMapping != null) { + dataProjection = partitionMapping.getRight(); + partitionPair = + Pair.of( + partitionMapping.getLeft(), + dataSchema.projectedLogicalRowType(dataSchema.partitionKeys())); + } + } + RowType projectedRowType = Projection.of(dataProjection).project(dataRecordType); + return new BulkFormatMapping( + indexCastMapping.getIndexMapping(), + indexCastMapping.getCastMapping(), + partitionPair, + formatDiscover + .discover(formatIdentifier) + .createReaderFactory(projectedRowType, dataFilters)); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 97c18d769f0d..0c84988d0be0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -18,23 +18,12 @@ package org.apache.paimon.utils; -import org.apache.paimon.KeyValue; import org.apache.paimon.casting.CastFieldGetter; -import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatReaderFactory; -import org.apache.paimon.partition.PartitionUtils; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.schema.IndexCastMapping; -import org.apache.paimon.schema.KeyValueFieldsExtractor; -import org.apache.paimon.schema.SchemaEvolutionUtil; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import javax.annotation.Nullable; -import java.util.List; - /** Class with index mapping and bulk format. */ public class BulkFormatMapping { @@ -72,121 +61,4 @@ public Pair getPartitionPair() { public FormatReaderFactory getReaderFactory() { return bulkFormat; } - - public static BulkFormatMappingBuilder newBuilder( - FileFormatDiscover formatDiscover, - KeyValueFieldsExtractor extractor, - int[][] keyProjection, - int[][] valueProjection, - @Nullable List filters) { - return new BulkFormatMappingBuilder( - formatDiscover, extractor, keyProjection, valueProjection, filters); - } - - /** Builder to build {@link BulkFormatMapping}. */ - public static class BulkFormatMappingBuilder { - - private final FileFormatDiscover formatDiscover; - private final KeyValueFieldsExtractor extractor; - private final int[][] keyProjection; - private final int[][] valueProjection; - @Nullable private final List filters; - - private BulkFormatMappingBuilder( - FileFormatDiscover formatDiscover, - KeyValueFieldsExtractor extractor, - int[][] keyProjection, - int[][] valueProjection, - @Nullable List filters) { - this.formatDiscover = formatDiscover; - this.extractor = extractor; - this.keyProjection = keyProjection; - this.valueProjection = valueProjection; - this.filters = filters; - } - - public BulkFormatMapping build( - String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { - List tableKeyFields = extractor.keyFields(tableSchema); - List tableValueFields = extractor.valueFields(tableSchema); - int[][] tableProjection = - KeyValue.project(keyProjection, valueProjection, tableKeyFields.size()); - - List dataKeyFields = extractor.keyFields(dataSchema); - List dataValueFields = extractor.valueFields(dataSchema); - - RowType keyType = new RowType(dataKeyFields); - RowType valueType = new RowType(dataValueFields); - RowType dataRecordType = KeyValue.schema(keyType, valueType); - - int[][] dataKeyProjection = - SchemaEvolutionUtil.createDataProjection( - tableKeyFields, dataKeyFields, keyProjection); - int[][] dataValueProjection = - SchemaEvolutionUtil.createDataProjection( - tableValueFields, dataValueFields, valueProjection); - int[][] dataProjection = - KeyValue.project(dataKeyProjection, dataValueProjection, dataKeyFields.size()); - - /* - * We need to create index mapping on projection instead of key and value separately - * here, for example - * - *

    - *
  • the table key fields: 1->d, 3->a, 4->b, 5->c - *
  • the data key fields: 1->a, 2->b, 3->c - *
- * - *

The value fields of table and data are 0->value_count, the key and value - * projections are as follows - * - *

    - *
  • table key projection: [0, 1, 2, 3], value projection: [0], data projection: [0, - * 1, 2, 3, 4, 5, 6] which 4/5 is seq/kind and 6 is value - *
  • data key projection: [0, 1, 2], value projection: [0], data projection: [0, 1, - * 2, 3, 4, 5] where 3/4 is seq/kind and 5 is value - *
- * - *

We will get value index mapping null from above and we can't create projection - * index mapping based on key and value index mapping any more. - */ - IndexCastMapping indexCastMapping = - SchemaEvolutionUtil.createIndexCastMapping( - Projection.of(tableProjection).toTopLevelIndexes(), - tableKeyFields, - tableValueFields, - Projection.of(dataProjection).toTopLevelIndexes(), - dataKeyFields, - dataValueFields); - - List dataFilters = - tableSchema.id() == dataSchema.id() - ? filters - : SchemaEvolutionUtil.createDataFilters( - tableSchema.fields(), dataSchema.fields(), filters); - - Pair partitionPair = null; - if (!dataSchema.partitionKeys().isEmpty()) { - Pair partitionMapping = - PartitionUtils.constructPartitionMapping( - dataRecordType, dataSchema.partitionKeys(), dataProjection); - // is partition fields are not selected, we just do nothing. - if (partitionMapping != null) { - dataProjection = partitionMapping.getRight(); - partitionPair = - Pair.of( - partitionMapping.getLeft(), - dataSchema.projectedLogicalRowType(dataSchema.partitionKeys())); - } - } - RowType projectedRowType = Projection.of(dataProjection).project(dataRecordType); - return new BulkFormatMapping( - indexCastMapping.getIndexMapping(), - indexCastMapping.getCastMapping(), - partitionPair, - formatDiscover - .discover(formatIdentifier) - .createReaderFactory(projectedRowType, dataFilters)); - } - } }