Skip to content

Commit

Permalink
[core] Trim key field in reading, map it to value field (#4651)
Browse files Browse the repository at this point in the history
This closes #4651.

---------

Co-authored-by: tsreaper <[email protected]>
  • Loading branch information
leaves12138 and tsreaper authored Dec 10, 2024
1 parent b6fb471 commit 3691419
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ColumnarRowIterator mapping(
vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors);
}
if (indexMapping != null) {
vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors);
}
return copy(vectors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}

public static boolean isKeyField(String field) {
return field.startsWith(KEY_FIELD_PREFIX);
}

// ----------------------------------------------------------------------------------------
// Structured type fields
// ----------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector(
return dataType.accept(visitor);
}

public static ColumnVector[] createIndexMappedVectors(
int[] indexMapping, ColumnVector[] vectors) {
public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) {
ColumnVector[] newVectors = new ColumnVector[indexMapping.length];
for (int i = 0; i < indexMapping.length; i++) {
int realIndex = indexMapping[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testCreateIndexMappedVectors() {
int[] mapping = new int[] {0, 2, 1, 3, 2, 3, 1, 0, 4};

ColumnVector[] newColumnVectors =
VectorMappingUtils.createIndexMappedVectors(mapping, columnVectors);
VectorMappingUtils.createMappedVectors(mapping, columnVectors);

for (int i = 0; i < mapping.length; i++) {
Assertions.assertThat(newColumnVectors[i]).isEqualTo(columnVectors[mapping[i]]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
PartitionSettedRow.from(partitionInfo);
iterator = iterator.transform(partitionSettedRow::replaceRow);
}

if (indexMapping != null) {
final ProjectedRow projectedRow = ProjectedRow.from(indexMapping);
iterator = iterator.transform(projectedRow::replaceRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand All @@ -35,17 +36,25 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;

/** Class with index mapping and bulk format. */
public class BulkFormatMapping {

// Index mapping from data schema fields to table schema fields, this is used to realize paimon
// schema evolution. And it combines trimeedKeyMapping, which maps key fields to the value
// fields
@Nullable private final int[] indexMapping;
// help indexMapping to cast different data type
@Nullable private final CastFieldGetter[] castMapping;
// partition fields mapping, add partition fields to the read fields
@Nullable private final Pair<int[], RowType> partitionPair;
private final FormatReaderFactory bulkFormat;
private final TableSchema dataSchema;
Expand All @@ -54,18 +63,39 @@ public class BulkFormatMapping {
public BulkFormatMapping(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable int[] trimmedKeyMapping,
@Nullable Pair<int[], RowType> partitionPair,
FormatReaderFactory bulkFormat,
TableSchema dataSchema,
List<Predicate> dataFilters) {
this.indexMapping = indexMapping;
this.indexMapping = combine(indexMapping, trimmedKeyMapping);
this.castMapping = castMapping;
this.bulkFormat = bulkFormat;
this.partitionPair = partitionPair;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
}

private int[] combine(@Nullable int[] indexMapping, @Nullable int[] trimmedKeyMapping) {
if (indexMapping == null) {
return trimmedKeyMapping;
}
if (trimmedKeyMapping == null) {
return indexMapping;
}

int[] combined = new int[indexMapping.length];

for (int i = 0; i < indexMapping.length; i++) {
if (indexMapping[i] < 0) {
combined[i] = indexMapping[i];
} else {
combined[i] = trimmedKeyMapping[indexMapping[i]];
}
}
return combined;
}

@Nullable
public int[] getIndexMapping() {
return indexMapping;
Expand Down Expand Up @@ -112,24 +142,46 @@ public BulkFormatMappingBuilder(
this.filters = filters;
}

/**
* There are three steps here to build BulkFormatMapping:
*
* <p>1. Calculate the readDataFields, which is what we intend to read from the data schema.
* Meanwhile, generate the indexCastMapping, which is used to map the index of the
* readDataFields to the index of the data schema.
*
* <p>2. Calculate the mapping to trim _KEY_ fields. For example: we want _KEY_a, _KEY_b,
* _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g from the data, but actually we don't need
* to read _KEY_a and a, _KEY_b and b the same time, so we need to trim them. So we mapping
* it: read before: _KEY_a, _KEY_b, _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g read
* after: a, b, _FIELD_SEQUENCE, _ROW_KIND, c, d, e, f, g and the mapping is
* [0,1,2,3,0,1,4,5,6,7,8], it converts the [read after] columns to [read before] columns.
*
* <p>3. We want read much fewer fields than readDataFields, so we kick out the partition
* fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce
* the real read fields and tell us how to map it back.
*/
public BulkFormatMapping build(
String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {

List<DataField> readDataFields = readDataFields(dataSchema);

// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
List<DataField> readDataFields = readDataFields(allDataFields);
// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair = trimKeyFields(readDataFields, allDataFields);

// build partition mapping and filter partition fields
Pair<Pair<int[], RowType>, List<DataField>>
partitionMappingAndFieldsWithoutPartitionPair =
PartitionUtils.constructPartitionMapping(dataSchema, readDataFields);
PartitionUtils.constructPartitionMapping(
dataSchema, trimmedKeyPair.getRight().getFields());
Pair<int[], RowType> partitionMapping =
partitionMappingAndFieldsWithoutPartitionPair.getLeft();

// build read row type
RowType readDataRowType =
RowType readRowType =
new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight());

// build read filters
Expand All @@ -138,18 +190,55 @@ public BulkFormatMapping build(
return new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
partitionMapping,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(readDataRowType, readFilters),
.createReaderFactory(readRowType, readFilters),
dataSchema,
readFilters);
}

private List<DataField> readDataFields(TableSchema dataSchema) {
List<DataField> dataFields = fieldsExtractor.apply(dataSchema);
static Pair<int[], RowType> trimKeyFields(
List<DataField> fieldsWithoutPartition, List<DataField> fields) {
int[] map = new int[fieldsWithoutPartition.size()];
List<DataField> trimmedFields = new ArrayList<>();
Map<Integer, DataField> fieldMap = new HashMap<>();
Map<Integer, Integer> positionMap = new HashMap<>();

for (DataField field : fields) {
fieldMap.put(field.id(), field);
}

for (int i = 0; i < fieldsWithoutPartition.size(); i++) {
DataField field = fieldsWithoutPartition.get(i);
boolean keyField = SpecialFields.isKeyField(field.name());
int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id();
// field in data schema
DataField f = fieldMap.get(id);

if (f != null) {
if (positionMap.containsKey(id)) {
map[i] = positionMap.get(id);
} else {
map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size());
// If the target field is not key field, we remain what it is, because it
// may be projected. Example: the target field is a row type, but only read
// the few fields in it. If we simply trimmedFields.add(f), we will read
// more fields than we need.
trimmedFields.add(keyField ? f : field);
}
} else {
throw new RuntimeException("Can't find field with id: " + id + " in fields.");
}
}

return Pair.of(map, new RowType(trimmedFields));
}

private List<DataField> readDataFields(List<DataField> allDataFields) {
List<DataField> readDataFields = new ArrayList<>();
for (DataField dataField : dataFields) {
for (DataField dataField : allDataFields) {
readTableFields.stream()
.filter(f -> f.id() == dataField.id())
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public void testIdentifierAfterFullCompaction() throws Exception {
containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected);
}

@RepeatedTest(1000)
@RepeatedTest(10)
public void testRandomFullCompaction() throws Exception {
List<ManifestFileMeta> input = new ArrayList<>();
Set<FileEntry.Identifier> manifestEntrySet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */
public class BulkFormatMappingTest {

@Test
public void testTrimKeyFields() {
List<DataField> keyFields = new ArrayList<>();
List<DataField> allFields = new ArrayList<>();
List<DataField> testFields = new ArrayList<>();

for (int i = 0; i < 10; i++) {
keyFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + i,
SpecialFields.KEY_FIELD_PREFIX + i,
DataTypes.STRING()));
}

allFields.addAll(keyFields);
for (int i = 0; i < 20; i++) {
allFields.add(new DataField(i, String.valueOf(i), DataTypes.STRING()));
}

testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + 1,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 3,
SpecialFields.KEY_FIELD_PREFIX + 3,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 5,
SpecialFields.KEY_FIELD_PREFIX + 5,
DataTypes.STRING()));
testFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 7,
SpecialFields.KEY_FIELD_PREFIX + 7,
DataTypes.STRING()));
testFields.add(new DataField(3, String.valueOf(3), DataTypes.STRING()));
testFields.add(new DataField(4, String.valueOf(4), DataTypes.STRING()));
testFields.add(new DataField(5, String.valueOf(5), DataTypes.STRING()));
testFields.add(new DataField(1, String.valueOf(1), DataTypes.STRING()));
testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING()));

Pair<int[], RowType> res =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields);

Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5);

List<DataField> fields = res.getRight().getFields();
Assertions.assertThat(fields.size()).isEqualTo(6);
Assertions.assertThat(fields.get(0).id()).isEqualTo(1);
Assertions.assertThat(fields.get(1).id()).isEqualTo(3);
Assertions.assertThat(fields.get(2).id()).isEqualTo(5);
Assertions.assertThat(fields.get(3).id()).isEqualTo(7);
Assertions.assertThat(fields.get(4).id()).isEqualTo(4);
Assertions.assertThat(fields.get(5).id()).isEqualTo(6);
}

@Test
public void testTrimKeyWithIndexMapping() {
List<DataField> readTableFields = new ArrayList<>();
List<DataField> readDataFields = new ArrayList<>();

readTableFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + "a",
DataTypes.STRING()));
readTableFields.add(new DataField(0, "0", DataTypes.STRING()));
readTableFields.add(new DataField(1, "a", DataTypes.STRING()));
readTableFields.add(new DataField(2, "2", DataTypes.STRING()));
readTableFields.add(new DataField(3, "3", DataTypes.STRING()));

readDataFields.add(
new DataField(
SpecialFields.KEY_FIELD_ID_START + 1,
SpecialFields.KEY_FIELD_PREFIX + "a",
DataTypes.STRING()));
readDataFields.add(new DataField(0, "0", DataTypes.STRING()));
readDataFields.add(new DataField(1, "a", DataTypes.STRING()));
readDataFields.add(new DataField(3, "3", DataTypes.STRING()));

// build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(
readDataFields, readDataFields);

BulkFormatMapping bulkFormatMapping =
new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
null,
null,
null,
null);

Assertions.assertThat(bulkFormatMapping.getIndexMapping()).containsExactly(0, 1, 0, -1, 2);
List<DataField> trimmed = trimmedKeyPair.getRight().getFields();
Assertions.assertThat(trimmed.get(0).id()).isEqualTo(1);
Assertions.assertThat(trimmed.get(1).id()).isEqualTo(0);
Assertions.assertThat(trimmed.get(2).id()).isEqualTo(3);
Assertions.assertThat(trimmed.size()).isEqualTo(3);
}
}
Loading

0 comments on commit 3691419

Please sign in to comment.