Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Trim key field in reading, map it to value field #4651

Merged
merged 17 commits into from
Dec 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ColumnarRowIterator mapping(
vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
}
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors);
}
return copy(vectors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}

public static boolean isKeyField(String field) {
return field.startsWith(KEY_FIELD_PREFIX);
}

// ----------------------------------------------------------------------------------------
// Structured type fields
// ----------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector(
return dataType.accept(visitor);
}

public static ColumnVector[] createIndexMappedVectors(
int[] indexMapping, ColumnVector[] vectors) {
public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) {
ColumnVector[] newVectors = new ColumnVector[indexMapping.length];
for (int i = 0; i < indexMapping.length; i++) {
int realIndex = indexMapping[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testCreateIndexMappedVectors() {
int[] mapping = new int[] {0, 2, 1, 3, 2, 3, 1, 0, 4};

ColumnVector[] newColumnVectors =
VectorMappingUtils.createIndexMappedVectors(mapping, columnVectors);
VectorMappingUtils.createMappedVectors(mapping, columnVectors);

for (int i = 0; i < mapping.length; i++) {
Assertions.assertThat(newColumnVectors[i]).isEqualTo(columnVectors[mapping[i]]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
PartitionSettedRow.from(partitionInfo);
iterator = iterator.transform(partitionSettedRow::replaceRow);
}

if (indexMapping != null) {
final ProjectedRow projectedRow = ProjectedRow.from(indexMapping);
iterator = iterator.transform(projectedRow::replaceRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand All @@ -35,17 +36,25 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;

/** Class with index mapping and bulk format. */
public class BulkFormatMapping {

// Index mapping from data schema fields to table schema fields, this is used to realize paimon
// schema evolution. And it combines trimeedKeyMapping, which maps key fields to the value
// fields
@Nullable private final int[] indexMapping;
// help indexMapping to cast different data type
@Nullable private final CastFieldGetter[] castMapping;
// partition fields mapping, add partition fields to the read fields
@Nullable private final Pair<int[], RowType> partitionPair;
private final FormatReaderFactory bulkFormat;
private final TableSchema dataSchema;
Expand All @@ -54,18 +63,39 @@ public class BulkFormatMapping {
public BulkFormatMapping(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable int[] trimmedKeyMapping,
@Nullable Pair<int[], RowType> partitionPair,
FormatReaderFactory bulkFormat,
TableSchema dataSchema,
List<Predicate> dataFilters) {
this.indexMapping = indexMapping;
this.indexMapping = combine(indexMapping, trimmedKeyMapping);
this.castMapping = castMapping;
this.bulkFormat = bulkFormat;
this.partitionPair = partitionPair;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
}

private int[] combine(@Nullable int[] indexMapping, @Nullable int[] trimmedKeyMapping) {
if (indexMapping == null) {
return trimmedKeyMapping;
}
if (trimmedKeyMapping == null) {
return indexMapping;
}

int[] combined = new int[indexMapping.length];

for (int i = 0; i < indexMapping.length; i++) {
if (indexMapping[i] < 0) {
combined[i] = indexMapping[i];
} else {
combined[i] = trimmedKeyMapping[indexMapping[i]];
}
}
return combined;
}

@Nullable
public int[] getIndexMapping() {
return indexMapping;
Expand Down Expand Up @@ -112,24 +142,46 @@ public BulkFormatMappingBuilder(
this.filters = filters;
}

/**
* There are three steps here to build BulkFormatMapping:
*
* <p>1. Calculate the readDataFields, which is what we intend to read from the data schema.
* Meanwhile, generate the indexCastMapping, which is used to map the index of the
* readDataFields to the index of the data schema.
*
* <p>2. Calculate the mapping to trim _KEY_ fields. For example: we want _KEY_a, _KEY_b,
* _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g from the data, but actually we don't need
* to read _KEY_a and a, _KEY_b and b the same time, so we need to trim them. So we mapping
* it: read before: _KEY_a, _KEY_b, _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g read
* after: a, b, _FIELD_SEQUENCE, _ROW_KIND, c, d, e, f, g and the mapping is
* [0,1,2,3,0,1,4,5,6,7,8], it converts the [read after] columns to [read before] columns.
*
* <p>3. We want read much fewer fields than readDataFields, so we kick out the partition
* fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce
* the real read fields and tell us how to map it back.
*/
public BulkFormatMapping build(
String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {

List<DataField> readDataFields = readDataFields(dataSchema);

// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
List<DataField> readDataFields = readDataFields(allDataFields);
// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair = trimKeyFields(readDataFields, allDataFields);

// build partition mapping and filter partition fields
Pair<Pair<int[], RowType>, List<DataField>>
partitionMappingAndFieldsWithoutPartitionPair =
PartitionUtils.constructPartitionMapping(dataSchema, readDataFields);
PartitionUtils.constructPartitionMapping(
dataSchema, trimmedKeyPair.getRight().getFields());
Pair<int[], RowType> partitionMapping =
partitionMappingAndFieldsWithoutPartitionPair.getLeft();

// build read row type
RowType readDataRowType =
RowType readRowType =
new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight());

// build read filters
Expand All @@ -138,18 +190,55 @@ public BulkFormatMapping build(
return new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
partitionMapping,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(readDataRowType, readFilters),
.createReaderFactory(readRowType, readFilters),
dataSchema,
readFilters);
}

private List<DataField> readDataFields(TableSchema dataSchema) {
List<DataField> dataFields = fieldsExtractor.apply(dataSchema);
static Pair<int[], RowType> trimKeyFields(
List<DataField> fieldsWithoutPartition, List<DataField> fields) {
int[] map = new int[fieldsWithoutPartition.size()];
List<DataField> trimmedFields = new ArrayList<>();
Map<Integer, DataField> fieldMap = new HashMap<>();
Map<Integer, Integer> positionMap = new HashMap<>();

for (DataField field : fields) {
fieldMap.put(field.id(), field);
}

for (int i = 0; i < fieldsWithoutPartition.size(); i++) {
DataField field = fieldsWithoutPartition.get(i);
boolean keyField = SpecialFields.isKeyField(field.name());
int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id();
// field in data schema
DataField f = fieldMap.get(id);

if (f != null) {
if (positionMap.containsKey(id)) {
map[i] = positionMap.get(id);
} else {
map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size());
// If the target field is not key field, we remain what it is, because it
// may be projected. Example: the target field is a row type, but only read
// the few fields in it. If we simply trimmedFields.add(f), we will read
// more fields than we need.
trimmedFields.add(keyField ? f : field);
}
} else {
throw new RuntimeException("Can't find field with id: " + id + " in fields.");
}
}

return Pair.of(map, new RowType(trimmedFields));
}

private List<DataField> readDataFields(List<DataField> allDataFields) {
List<DataField> readDataFields = new ArrayList<>();
for (DataField dataField : dataFields) {
for (DataField dataField : allDataFields) {
readTableFields.stream()
.filter(f -> f.id() == dataField.id())
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public void testIdentifierAfterFullCompaction() throws Exception {
containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected);
}

@RepeatedTest(1000)
@RepeatedTest(10)
public void testRandomFullCompaction() throws Exception {
List<ManifestFileMeta> input = new ArrayList<>();
Set<FileEntry.Identifier> manifestEntrySet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */
public class BulkFormatMappingTest {

@Test
public void testTrimKeyFields() {
List<DataField> keyFields = new ArrayList<>();
List<DataField> allFields = new ArrayList<>();
List<DataField> testFields = new ArrayList<>();

for (int i = 0; i < 10; i++) {
keyFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + i,
SpecialFields.KEY_FIELD_PREFIX + i,
DataTypes.STRING()));
}

allFields.addAll(keyFields);
for (int i = 0; i < 20; i++) {
allFields.add(new DataField(i, String.valueOf(i), DataTypes.STRING()));
}

testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + 1,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 3,
SpecialFields.KEY_FIELD_PREFIX + 3,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 5,
SpecialFields.KEY_FIELD_PREFIX + 5,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 7,
SpecialFields.KEY_FIELD_PREFIX + 7,
DataTypes.STRING()));
testFields.add(new DataField(3, String.valueOf(3), DataTypes.STRING()));
testFields.add(new DataField(4, String.valueOf(4), DataTypes.STRING()));
testFields.add(new DataField(5, String.valueOf(5), DataTypes.STRING()));
testFields.add(new DataField(1, String.valueOf(1), DataTypes.STRING()));
testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING()));

Pair<int[], RowType> res =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields);

Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5);

List<DataField> fields = res.getRight().getFields();
Assertions.assertThat(fields.size()).isEqualTo(6);
Assertions.assertThat(fields.get(0).id()).isEqualTo(1);
Assertions.assertThat(fields.get(1).id()).isEqualTo(3);
Assertions.assertThat(fields.get(2).id()).isEqualTo(5);
Assertions.assertThat(fields.get(3).id()).isEqualTo(7);
Assertions.assertThat(fields.get(4).id()).isEqualTo(4);
Assertions.assertThat(fields.get(5).id()).isEqualTo(6);
}

@Test
public void testTrimKeyWithIndexMapping() {
List<DataField> readTableFields = new ArrayList<>();
List<DataField> readDataFields = new ArrayList<>();

readTableFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + "a",
DataTypes.STRING()));
readTableFields.add(new DataField(0, "0", DataTypes.STRING()));
readTableFields.add(new DataField(1, "a", DataTypes.STRING()));
readTableFields.add(new DataField(2, "2", DataTypes.STRING()));
readTableFields.add(new DataField(3, "3", DataTypes.STRING()));

readDataFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + "a",
DataTypes.STRING()));
readDataFields.add(new DataField(0, "0", DataTypes.STRING()));
readDataFields.add(new DataField(1, "a", DataTypes.STRING()));
readDataFields.add(new DataField(3, "3", DataTypes.STRING()));

// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(
readDataFields, readDataFields);

BulkFormatMapping bulkFormatMapping =
new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
null,
null,
null,
null);

Assertions.assertThat(bulkFormatMapping.getIndexMapping()).containsExactly(0, 1, 0, -1, 2);
List<DataField> trimmed = trimmedKeyPair.getRight().getFields();
Assertions.assertThat(trimmed.get(0).id()).isEqualTo(1);
Assertions.assertThat(trimmed.get(1).id()).isEqualTo(0);
Assertions.assertThat(trimmed.get(2).id()).isEqualTo(3);
Assertions.assertThat(trimmed.size()).isEqualTo(3);
}
}
Loading
Loading