From 4d2ceed5ff9f47add0b5d86e4e395137bb9d9e3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 13:34:14 +0800 Subject: [PATCH] [core] Trim key field in reading, map it to value field --- .../data/columnar/ColumnarRowIterator.java | 11 ++- .../apache/paimon/table/SpecialFields.java | 6 ++ .../paimon/utils/VectorMappingUtils.java | 3 +- .../paimon/io/DataFileRecordReader.java | 13 ++- .../paimon/io/KeyValueFileReaderFactory.java | 3 +- .../paimon/operation/RawFileSplitRead.java | 3 +- .../paimon/utils/BulkFormatMapping.java | 84 +++++++++++++++++-- 7 files changed, 105 insertions(+), 18 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 27e3d1c1ddad..a0d7d876658e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -87,15 +87,20 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) { } public ColumnarRowIterator mapping( - @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) { - if (partitionInfo != null || indexMapping != null) { + @Nullable int[] trimmedKeyMapping, + @Nullable PartitionInfo partitionInfo, + @Nullable int[] indexMapping) { + if (trimmedKeyMapping != null || partitionInfo != null || indexMapping != null) { VectorizedColumnBatch vectorizedColumnBatch = row.batch(); ColumnVector[] vectors = vectorizedColumnBatch.columns; + if (trimmedKeyMapping != null) { + vectors = VectorMappingUtils.createMappedVectors(trimmedKeyMapping, vectors); + } if (partitionInfo != null) { vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors); } if (indexMapping != null) { - vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors); + vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors); } return copy(vectors); } diff --git a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java index d438bfb0ffe9..68f7554ab406 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java @@ -72,6 +72,8 @@ public class SpecialFields { public static final String KEY_FIELD_PREFIX = "_KEY_"; public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START; + // reserve 1000 for other system fields + public static final int KEY_FIELD_ID_END = Integer.MAX_VALUE - 1_000; public static final DataField SEQUENCE_NUMBER = new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull()); @@ -99,6 +101,10 @@ public static boolean isSystemField(String field) { return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field); } + public static boolean isKeyField(int id) { + return id >= KEY_FIELD_ID_START && id < KEY_FIELD_ID_END; + } + // ---------------------------------------------------------------------------------------- // Structured type fields // ---------------------------------------------------------------------------------------- diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index 8b01e644de57..02b011a2f1cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector( return dataType.accept(visitor); } - public static ColumnVector[] createIndexMappedVectors( - int[] indexMapping, ColumnVector[] vectors) { + public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) { ColumnVector[] newVectors = new ColumnVector[indexMapping.length]; for (int i = 0; i < indexMapping.length; i++) { int realIndex = indexMapping[i]; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index d2559fe6240b..8548ec2bc8f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -41,13 +41,15 @@ public class DataFileRecordReader implements FileRecordReader { @Nullable private final int[] indexMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; + @Nullable private final int[] trimmedKeyMapping; public DataFileRecordReader( FormatReaderFactory readerFactory, FormatReaderFactory.Context context, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) + @Nullable PartitionInfo partitionInfo, + @Nullable int[] trimmedKeyMapping) throws IOException { try { this.reader = readerFactory.createReader(context); @@ -58,6 +60,7 @@ public DataFileRecordReader( this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; + this.trimmedKeyMapping = trimmedKeyMapping; } @Nullable @@ -69,8 +72,14 @@ public FileRecordIterator readBatch() throws IOException { } if (iterator instanceof ColumnarRowIterator) { - iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping); + iterator = + ((ColumnarRowIterator) iterator) + .mapping(trimmedKeyMapping, partitionInfo, indexMapping); } else { + if (trimmedKeyMapping != null) { + final ProjectedRow projectedRow = ProjectedRow.from(trimmedKeyMapping); + iterator = iterator.transform(projectedRow::replaceRow); + } if (partitionInfo != null) { final PartitionSettedRow partitionSettedRow = PartitionSettedRow.from(partitionInfo); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7d3acd729c55..26b272d772e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -144,7 +144,8 @@ private FileRecordReader createRecordReader( fileIO, filePath, fileSize, orcPoolSize), bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + bulkFormatMapping.getTrimmedKeyMapping()); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 46977457c4be..6cc3006175b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -221,7 +221,8 @@ private FileRecordReader createFileReader( formatReaderContext, bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + bulkFormatMapping.getTrimmedKeyMapping()); if (fileIndexResult instanceof BitmapIndexResult) { fileRecordReader = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 037622f95f1e..ecf509462179 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -26,6 +26,7 @@ import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -35,11 +36,15 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; +import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START; /** Class with index mapping and bulk format. */ public class BulkFormatMapping { @@ -47,6 +52,7 @@ public class BulkFormatMapping { @Nullable private final int[] indexMapping; @Nullable private final CastFieldGetter[] castMapping; @Nullable private final Pair partitionPair; + @Nullable private final int[] trimmedKeyMapping; private final FormatReaderFactory bulkFormat; private final TableSchema dataSchema; private final List dataFilters; @@ -55,6 +61,7 @@ public BulkFormatMapping( @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, @Nullable Pair partitionPair, + @Nullable int[] trimmedKeyMapping, FormatReaderFactory bulkFormat, TableSchema dataSchema, List dataFilters) { @@ -62,6 +69,7 @@ public BulkFormatMapping( this.castMapping = castMapping; this.bulkFormat = bulkFormat; this.partitionPair = partitionPair; + this.trimmedKeyMapping = trimmedKeyMapping; this.dataSchema = dataSchema; this.dataFilters = dataFilters; } @@ -81,6 +89,11 @@ public Pair getPartitionPair() { return partitionPair; } + @Nullable + public int[] getTrimmedKeyMapping() { + return trimmedKeyMapping; + } + public FormatReaderFactory getReaderFactory() { return bulkFormat; } @@ -112,11 +125,27 @@ public BulkFormatMappingBuilder( this.filters = filters; } + /** + * There are three steps here to build BulkFormatMapping: + * + *

1. Calculate the readDataFields, which is what we intend to read from the data schema. + * Meanwhile, generate the indexCastMapping, which is used to map the index of the + * readDataFields to the index of the data schema. + * + *

2. We want read much fewer fields than readDataFields, so we kick out the partition + * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce + * the real read fields and tell us how to map it back. + * + *

3. We still want read fewer fields, so we combine the _KEY_xxx fields to xxx fields. + * They are always the same, we just need to get once. We generate trimmedKeyPair to reduce + * the real read fields again, also it tells us how to map it back. + */ public BulkFormatMapping build( String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { - List readDataFields = readDataFields(dataSchema); - + // extract the whole data fields in logic. + List allDataFields = fieldsExtractor.apply(dataSchema); + List readDataFields = readDataFields(allDataFields); // build index cast mapping IndexCastMapping indexCastMapping = SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); @@ -128,9 +157,12 @@ public BulkFormatMapping build( Pair partitionMapping = partitionMappingAndFieldsWithoutPartitionPair.getLeft(); - // build read row type - RowType readDataRowType = - new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight()); + List fieldsWithoutPartition = + partitionMappingAndFieldsWithoutPartitionPair.getRight(); + + // map from key fields reading to value fields reading + Pair trimmedKeyPair = + trimKeyFields(fieldsWithoutPartition, allDataFields); // build read filters List readFilters = readFilters(filters, tableSchema, dataSchema); @@ -139,17 +171,51 @@ public BulkFormatMapping build( indexCastMapping.getIndexMapping(), indexCastMapping.getCastMapping(), partitionMapping, + trimmedKeyPair.getLeft(), formatDiscover .discover(formatIdentifier) - .createReaderFactory(readDataRowType, readFilters), + .createReaderFactory(trimmedKeyPair.getRight(), readFilters), dataSchema, readFilters); } - private List readDataFields(TableSchema dataSchema) { - List dataFields = fieldsExtractor.apply(dataSchema); + private Pair trimKeyFields( + List fieldsWithoutPartition, List fields) { + int[] map = new int[fieldsWithoutPartition.size()]; + List trimmedFields = new ArrayList<>(); + Map fieldMap = new HashMap<>(); + Map positionMap = new HashMap<>(); + + for (DataField field : fields) { + fieldMap.put(field.id(), field); + } + + AtomicInteger index = new AtomicInteger(); + for (int i = 0; i < fieldsWithoutPartition.size(); i++) { + DataField field = fieldsWithoutPartition.get(i); + boolean keyField = SpecialFields.isKeyField(field.id()); + int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id(); + // field in data schema + DataField f = fieldMap.get(id); + + if (f != null) { + if (positionMap.containsKey(id)) { + map[i] = positionMap.get(id); + } else { + trimmedFields.add(keyField ? f : field); + map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); + } + } else { + throw new RuntimeException("Can't find field with id: " + id + " in fields."); + } + } + + return Pair.of(map, new RowType(trimmedFields)); + } + + private List readDataFields(List allDataFields) { List readDataFields = new ArrayList<>(); - for (DataField dataField : dataFields) { + for (DataField dataField : allDataFields) { readTableFields.stream() .filter(f -> f.id() == dataField.id()) .findFirst()