Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Trim key field in reading, map it to value field #4651

Merged
merged 17 commits into from
Dec 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,20 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) {
}

public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
if (partitionInfo != null || indexMapping != null) {
@Nullable int[] trimmedKeyMapping,
@Nullable PartitionInfo partitionInfo,
@Nullable int[] indexMapping) {
if (trimmedKeyMapping != null || partitionInfo != null || indexMapping != null) {
VectorizedColumnBatch vectorizedColumnBatch = row.batch();
ColumnVector[] vectors = vectorizedColumnBatch.columns;
if (trimmedKeyMapping != null) {
vectors = VectorMappingUtils.createMappedVectors(trimmedKeyMapping, vectors);
}
if (partitionInfo != null) {
vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
}
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors);
}
return copy(vectors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}

public static boolean isKeyField(String field) {
return field.startsWith(KEY_FIELD_PREFIX);
}

// ----------------------------------------------------------------------------------------
// Structured type fields
// ----------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector(
return dataType.accept(visitor);
}

public static ColumnVector[] createIndexMappedVectors(
int[] indexMapping, ColumnVector[] vectors) {
public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) {
ColumnVector[] newVectors = new ColumnVector[indexMapping.length];
for (int i = 0; i < indexMapping.length; i++) {
int realIndex = indexMapping[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testCreateIndexMappedVectors() {
int[] mapping = new int[] {0, 2, 1, 3, 2, 3, 1, 0, 4};

ColumnVector[] newColumnVectors =
VectorMappingUtils.createIndexMappedVectors(mapping, columnVectors);
VectorMappingUtils.createMappedVectors(mapping, columnVectors);

for (int i = 0; i < mapping.length; i++) {
Assertions.assertThat(newColumnVectors[i]).isEqualTo(columnVectors[mapping[i]]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ public class DataFileRecordReader implements FileRecordReader<InternalRow> {
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
@Nullable private final int[] trimmedKeyMapping;

public DataFileRecordReader(
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
@Nullable PartitionInfo partitionInfo,
@Nullable int[] trimmedKeyMapping)
throws IOException {
try {
this.reader = readerFactory.createReader(context);
Expand All @@ -58,6 +60,7 @@ public DataFileRecordReader(
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
this.trimmedKeyMapping = trimmedKeyMapping;
}

@Nullable
Expand All @@ -69,8 +72,14 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
}

if (iterator instanceof ColumnarRowIterator) {
iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping);
iterator =
((ColumnarRowIterator) iterator)
.mapping(trimmedKeyMapping, partitionInfo, indexMapping);
} else {
if (trimmedKeyMapping != null) {
final ProjectedRow projectedRow = ProjectedRow.from(trimmedKeyMapping);
iterator = iterator.transform(projectedRow::replaceRow);
}
if (partitionInfo != null) {
final PartitionSettedRow partitionSettedRow =
PartitionSettedRow.from(partitionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private FileRecordReader<KeyValue> createRecordReader(
fileIO, filePath, fileSize, orcPoolSize),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
bulkFormatMapping.getTrimmedKeyMapping());
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private FileRecordReader<InternalRow> createFileReader(
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
bulkFormatMapping.getTrimmedKeyMapping());

if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand All @@ -35,18 +36,28 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;

/** Class with index mapping and bulk format. */
public class BulkFormatMapping {

// index mapping from data schema fields to table schema fields, this is used to realize paimon
// schema evolution
@Nullable private final int[] indexMapping;
// help indexMapping to cast defferent data type
@Nullable private final CastFieldGetter[] castMapping;
// partition fields mapping, add partition fields to the read fields
@Nullable private final Pair<int[], RowType> partitionPair;
// key fields mapping, add key fields to the read fields
@Nullable private final int[] trimmedKeyMapping;
private final FormatReaderFactory bulkFormat;
private final TableSchema dataSchema;
private final List<Predicate> dataFilters;
Expand All @@ -55,13 +66,15 @@ public BulkFormatMapping(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable Pair<int[], RowType> partitionPair,
@Nullable int[] trimmedKeyMapping,
FormatReaderFactory bulkFormat,
TableSchema dataSchema,
List<Predicate> dataFilters) {
this.indexMapping = indexMapping;
this.castMapping = castMapping;
this.bulkFormat = bulkFormat;
this.partitionPair = partitionPair;
this.trimmedKeyMapping = trimmedKeyMapping;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
}
Expand All @@ -81,6 +94,11 @@ public Pair<int[], RowType> getPartitionPair() {
return partitionPair;
}

@Nullable
public int[] getTrimmedKeyMapping() {
return trimmedKeyMapping;
}

public FormatReaderFactory getReaderFactory() {
return bulkFormat;
}
Expand Down Expand Up @@ -112,11 +130,27 @@ public BulkFormatMappingBuilder(
this.filters = filters;
}

/**
* There are three steps here to build BulkFormatMapping:
*
* <p>1. Calculate the readDataFields, which is what we intend to read from the data schema.
* Meanwhile, generate the indexCastMapping, which is used to map the index of the
* readDataFields to the index of the data schema.
*
* <p>2. We want read much fewer fields than readDataFields, so we kick out the partition
* fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce
* the real read fields and tell us how to map it back.
*
* <p>3. We still want read fewer fields, so we combine the _KEY_xxx fields to xxx fields.
* They are always the same, we just need to get once. We generate trimmedKeyPair to reduce
* the real read fields again, also it tells us how to map it back.
*/
public BulkFormatMapping build(
String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {

List<DataField> readDataFields = readDataFields(dataSchema);

// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
List<DataField> readDataFields = readDataFields(allDataFields);
// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);
Expand All @@ -128,9 +162,12 @@ public BulkFormatMapping build(
Pair<int[], RowType> partitionMapping =
partitionMappingAndFieldsWithoutPartitionPair.getLeft();

// build read row type
RowType readDataRowType =
new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight());
List<DataField> fieldsWithoutPartition =
partitionMappingAndFieldsWithoutPartitionPair.getRight();

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair =
trimKeyFields(fieldsWithoutPartition, allDataFields);

// build read filters
List<Predicate> readFilters = readFilters(filters, tableSchema, dataSchema);
Expand All @@ -139,17 +176,51 @@ public BulkFormatMapping build(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionMapping,
trimmedKeyPair.getLeft(),
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(readDataRowType, readFilters),
.createReaderFactory(trimmedKeyPair.getRight(), readFilters),
dataSchema,
readFilters);
}

private List<DataField> readDataFields(TableSchema dataSchema) {
List<DataField> dataFields = fieldsExtractor.apply(dataSchema);
static Pair<int[], RowType> trimKeyFields(
List<DataField> fieldsWithoutPartition, List<DataField> fields) {
int[] map = new int[fieldsWithoutPartition.size()];
List<DataField> trimmedFields = new ArrayList<>();
Map<Integer, DataField> fieldMap = new HashMap<>();
Map<Integer, Integer> positionMap = new HashMap<>();

for (DataField field : fields) {
fieldMap.put(field.id(), field);
}

AtomicInteger index = new AtomicInteger();
for (int i = 0; i < fieldsWithoutPartition.size(); i++) {
DataField field = fieldsWithoutPartition.get(i);
boolean keyField = SpecialFields.isKeyField(field.name());
int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id();
// field in data schema
DataField f = fieldMap.get(id);

if (f != null) {
if (positionMap.containsKey(id)) {
map[i] = positionMap.get(id);
} else {
trimmedFields.add(keyField ? f : field);
map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement());
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
throw new RuntimeException("Can't find field with id: " + id + " in fields.");
}
}

return Pair.of(map, new RowType(trimmedFields));
}

private List<DataField> readDataFields(List<DataField> allDataFields) {
List<DataField> readDataFields = new ArrayList<>();
for (DataField dataField : dataFields) {
for (DataField dataField : allDataFields) {
readTableFields.stream()
.filter(f -> f.id() == dataField.id())
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public void testIdentifierAfterFullCompaction() throws Exception {
containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected);
}

@RepeatedTest(1000)
@RepeatedTest(10)
public void testRandomFullCompaction() throws Exception {
List<ManifestFileMeta> input = new ArrayList<>();
Set<FileEntry.Identifier> manifestEntrySet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */
public class BulkFormatMappingTest {

@Test
public void tesTtrimKeyFields() {
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved

List<DataField> keyFields = new ArrayList<>();
List<DataField> allFields = new ArrayList<>();
List<DataField> testFields = new ArrayList<>();

for (int i = 0; i < 10; i++) {
keyFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + i,
SpecialFields.KEY_FIELD_PREFIX + i,
DataTypes.STRING()));
}

allFields.addAll(keyFields);
for (int i = 0; i < 20; i++) {
allFields.add(new DataField(i, String.valueOf(i), DataTypes.STRING()));
}

testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + 1,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 3,
SpecialFields.KEY_FIELD_PREFIX + 3,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 5,
SpecialFields.KEY_FIELD_PREFIX + 5,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 7,
SpecialFields.KEY_FIELD_PREFIX + 7,
DataTypes.STRING()));
testFields.add(new DataField(3, String.valueOf(3), DataTypes.STRING()));
testFields.add(new DataField(4, String.valueOf(4), DataTypes.STRING()));
testFields.add(new DataField(5, String.valueOf(5), DataTypes.STRING()));
testFields.add(new DataField(1, String.valueOf(1), DataTypes.STRING()));
testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING()));

Pair<int[], RowType> res =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields);

Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5);

List<DataField> fields = res.getRight().getFields();
Assertions.assertThat(fields.size()).isEqualTo(6);
Assertions.assertThat(fields.get(0).id()).isEqualTo(1);
Assertions.assertThat(fields.get(1).id()).isEqualTo(3);
Assertions.assertThat(fields.get(2).id()).isEqualTo(5);
Assertions.assertThat(fields.get(3).id()).isEqualTo(7);
Assertions.assertThat(fields.get(4).id()).isEqualTo(4);
Assertions.assertThat(fields.get(5).id()).isEqualTo(6);
}
}
Loading
Loading