Skip to content

Commit

Permalink
[core] Trim key field in reading, map it to value field
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Dec 6, 2024
1 parent 812ef05 commit 4d2ceed
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,20 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) {
}

public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
if (partitionInfo != null || indexMapping != null) {
@Nullable int[] trimmedKeyMapping,
@Nullable PartitionInfo partitionInfo,
@Nullable int[] indexMapping) {
if (trimmedKeyMapping != null || partitionInfo != null || indexMapping != null) {
VectorizedColumnBatch vectorizedColumnBatch = row.batch();
ColumnVector[] vectors = vectorizedColumnBatch.columns;
if (trimmedKeyMapping != null) {
vectors = VectorMappingUtils.createMappedVectors(trimmedKeyMapping, vectors);
}
if (partitionInfo != null) {
vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
}
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors);
}
return copy(vectors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class SpecialFields {

public static final String KEY_FIELD_PREFIX = "_KEY_";
public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START;
// reserve 1000 for other system fields
public static final int KEY_FIELD_ID_END = Integer.MAX_VALUE - 1_000;

public static final DataField SEQUENCE_NUMBER =
new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull());
Expand Down Expand Up @@ -99,6 +101,10 @@ public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}

public static boolean isKeyField(int id) {
return id >= KEY_FIELD_ID_START && id < KEY_FIELD_ID_END;
}

// ----------------------------------------------------------------------------------------
// Structured type fields
// ----------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector(
return dataType.accept(visitor);
}

public static ColumnVector[] createIndexMappedVectors(
int[] indexMapping, ColumnVector[] vectors) {
public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) {
ColumnVector[] newVectors = new ColumnVector[indexMapping.length];
for (int i = 0; i < indexMapping.length; i++) {
int realIndex = indexMapping[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ public class DataFileRecordReader implements FileRecordReader<InternalRow> {
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
@Nullable private final int[] trimmedKeyMapping;

public DataFileRecordReader(
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
@Nullable PartitionInfo partitionInfo,
@Nullable int[] trimmedKeyMapping)
throws IOException {
try {
this.reader = readerFactory.createReader(context);
Expand All @@ -58,6 +60,7 @@ public DataFileRecordReader(
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
this.trimmedKeyMapping = trimmedKeyMapping;
}

@Nullable
Expand All @@ -69,8 +72,14 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
}

if (iterator instanceof ColumnarRowIterator) {
iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping);
iterator =
((ColumnarRowIterator) iterator)
.mapping(trimmedKeyMapping, partitionInfo, indexMapping);
} else {
if (trimmedKeyMapping != null) {
final ProjectedRow projectedRow = ProjectedRow.from(trimmedKeyMapping);
iterator = iterator.transform(projectedRow::replaceRow);
}
if (partitionInfo != null) {
final PartitionSettedRow partitionSettedRow =
PartitionSettedRow.from(partitionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private FileRecordReader<KeyValue> createRecordReader(
fileIO, filePath, fileSize, orcPoolSize),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
bulkFormatMapping.getTrimmedKeyMapping());

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private FileRecordReader<InternalRow> createFileReader(
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
bulkFormatMapping.getTrimmedKeyMapping());

if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand All @@ -35,18 +36,23 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;

/** Class with index mapping and bulk format. */
public class BulkFormatMapping {

@Nullable private final int[] indexMapping;
@Nullable private final CastFieldGetter[] castMapping;
@Nullable private final Pair<int[], RowType> partitionPair;
@Nullable private final int[] trimmedKeyMapping;
private final FormatReaderFactory bulkFormat;
private final TableSchema dataSchema;
private final List<Predicate> dataFilters;
Expand All @@ -55,13 +61,15 @@ public BulkFormatMapping(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable Pair<int[], RowType> partitionPair,
@Nullable int[] trimmedKeyMapping,
FormatReaderFactory bulkFormat,
TableSchema dataSchema,
List<Predicate> dataFilters) {
this.indexMapping = indexMapping;
this.castMapping = castMapping;
this.bulkFormat = bulkFormat;
this.partitionPair = partitionPair;
this.trimmedKeyMapping = trimmedKeyMapping;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
}
Expand All @@ -81,6 +89,11 @@ public Pair<int[], RowType> getPartitionPair() {
return partitionPair;
}

@Nullable
public int[] getTrimmedKeyMapping() {
return trimmedKeyMapping;
}

public FormatReaderFactory getReaderFactory() {
return bulkFormat;
}
Expand Down Expand Up @@ -112,11 +125,27 @@ public BulkFormatMappingBuilder(
this.filters = filters;
}

/**
* There are three steps here to build BulkFormatMapping:
*
* <p>1. Calculate the readDataFields, which is what we intend to read from the data schema.
* Meanwhile, generate the indexCastMapping, which is used to map the index of the
* readDataFields to the index of the data schema.
*
* <p>2. We want read much fewer fields than readDataFields, so we kick out the partition
* fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce
* the real read fields and tell us how to map it back.
*
* <p>3. We still want read fewer fields, so we combine the _KEY_xxx fields to xxx fields.
* They are always the same, we just need to get once. We generate trimmedKeyPair to reduce
* the real read fields again, also it tells us how to map it back.
*/
public BulkFormatMapping build(
String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {

List<DataField> readDataFields = readDataFields(dataSchema);

// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
List<DataField> readDataFields = readDataFields(allDataFields);
// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);
Expand All @@ -128,9 +157,12 @@ public BulkFormatMapping build(
Pair<int[], RowType> partitionMapping =
partitionMappingAndFieldsWithoutPartitionPair.getLeft();

// build read row type
RowType readDataRowType =
new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight());
List<DataField> fieldsWithoutPartition =
partitionMappingAndFieldsWithoutPartitionPair.getRight();

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair =
trimKeyFields(fieldsWithoutPartition, allDataFields);

// build read filters
List<Predicate> readFilters = readFilters(filters, tableSchema, dataSchema);
Expand All @@ -139,17 +171,51 @@ public BulkFormatMapping build(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionMapping,
trimmedKeyPair.getLeft(),
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(readDataRowType, readFilters),
.createReaderFactory(trimmedKeyPair.getRight(), readFilters),
dataSchema,
readFilters);
}

private List<DataField> readDataFields(TableSchema dataSchema) {
List<DataField> dataFields = fieldsExtractor.apply(dataSchema);
private Pair<int[], RowType> trimKeyFields(
List<DataField> fieldsWithoutPartition, List<DataField> fields) {
int[] map = new int[fieldsWithoutPartition.size()];
List<DataField> trimmedFields = new ArrayList<>();
Map<Integer, DataField> fieldMap = new HashMap<>();
Map<Integer, Integer> positionMap = new HashMap<>();

for (DataField field : fields) {
fieldMap.put(field.id(), field);
}

AtomicInteger index = new AtomicInteger();
for (int i = 0; i < fieldsWithoutPartition.size(); i++) {
DataField field = fieldsWithoutPartition.get(i);
boolean keyField = SpecialFields.isKeyField(field.id());
int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id();
// field in data schema
DataField f = fieldMap.get(id);

if (f != null) {
if (positionMap.containsKey(id)) {
map[i] = positionMap.get(id);
} else {
trimmedFields.add(keyField ? f : field);
map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement());
}
} else {
throw new RuntimeException("Can't find field with id: " + id + " in fields.");
}
}

return Pair.of(map, new RowType(trimmedFields));
}

private List<DataField> readDataFields(List<DataField> allDataFields) {
List<DataField> readDataFields = new ArrayList<>();
for (DataField dataField : dataFields) {
for (DataField dataField : allDataFields) {
readTableFields.stream()
.filter(f -> f.id() == dataField.id())
.findFirst()
Expand Down

0 comments on commit 4d2ceed

Please sign in to comment.