diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 6623b20b8e76..b03ac056f161 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -40,7 +40,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; @@ -148,10 +147,16 @@ public BulkFormatMappingBuilder( * *

1. Calculate the readDataFields, which is what we intend to read from the data schema. * Meanwhile, generate the indexCastMapping, which is used to map the index of the - * readDataFields to the index of the data schema. Also, this mapping combined - * trimmedKeyMapping(whith maps the _KEY_xxx fields to xxx fields.) + * readDataFields to the index of the data schema. * - *

2. We want read much fewer fields than readDataFields, so we kick out the partition + *

2. Calculate the mapping to trim _KEY_ fields. For example: we want _KEY_a, _KEY_b, + * _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g from the data, but actually we don't need + * to read _KEY_a and a, _KEY_b and b the same time, so we need to trim them. So we mapping + * it: read before: _KEY_a, _KEY_b, _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g read + * after: a, b, _FIELD_SEQUENCE, _ROW_KIND, c, d, e, f, g and the mapping is + * [0,1,2,3,0,1,4,5,6,7,8], it converts the [read after] columns to [read before] columns. + * + *

3. We want read much fewer fields than readDataFields, so we kick out the partition * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce * the real read fields and tell us how to map it back. */ @@ -205,7 +210,6 @@ static Pair trimKeyFields( fieldMap.put(field.id(), field); } - AtomicInteger index = new AtomicInteger(); for (int i = 0; i < fieldsWithoutPartition.size(); i++) { DataField field = fieldsWithoutPartition.get(i); boolean keyField = SpecialFields.isKeyField(field.name()); @@ -217,8 +221,12 @@ static Pair trimKeyFields( if (positionMap.containsKey(id)) { map[i] = positionMap.get(id); } else { + map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); + // If the target field is not key field, we remain what it is, because it + // may be projected. Ex: the target field is a row type, but only read the + // few fields in it. If we simply + // trimmedFields.add(f), we will read more fields than we need. trimmedFields.add(keyField ? f : field); - map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); } } else { throw new RuntimeException("Can't find field with id: " + id + " in fields.");