Skip to content

Commit

Permalink
[core] Rename BulkFormatMapping to FormatReaderMapping
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 12, 2024
1 parent 30dcfb5 commit 8415397
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.BulkFormatMapping.BulkFormatMappingBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.BulkFormatMappingBuilder;

import javax.annotation.Nullable;

Expand All @@ -68,7 +68,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final DataFilePathFactory pathFactory;
private final long asyncThreshold;

private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final Map<FormatKey, FormatReaderMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;

Expand Down Expand Up @@ -120,14 +120,14 @@ private FileRecordReader<KeyValue> createRecordReader(
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Supplier<BulkFormatMapping> formatSupplier =
Supplier<FormatReaderMapping> formatSupplier =
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
schema,
schemaId == schema.id() ? schema : schemaManager.schema(schemaId));

BulkFormatMapping bulkFormatMapping =
FormatReaderMapping formatReaderMapping =
reuseFormat
? bulkFormatMappings.computeIfAbsent(
new FormatKey(schemaId, formatIdentifier),
Expand All @@ -137,14 +137,14 @@ private FileRecordReader<KeyValue> createRecordReader(

FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
bulkFormatMapping.getReaderFactory(),
formatReaderMapping.getReaderFactory(),
orcPoolSize == null
? new FormatReaderContext(fileIO, filePath, fileSize)
: new OrcFormatReaderContext(
fileIO, filePath, fileSize, orcPoolSize),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.BulkFormatMapping.BulkFormatMappingBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.BulkFormatMappingBuilder;
import org.apache.paimon.utils.IOExceptionSupplier;

import org.slf4j.Logger;
Expand All @@ -75,7 +75,7 @@ public class RawFileSplitRead implements SplitRead<InternalRow> {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final Map<FormatKey, FormatReaderMapping> bulkFormatMappings;
private final boolean fileIndexReadEnabled;

private RowType readRowType;
Expand Down Expand Up @@ -159,7 +159,7 @@ public RecordReader<InternalRow> createReader(
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
long schemaId = file.schemaId();

Supplier<BulkFormatMapping> formatSupplier =
Supplier<FormatReaderMapping> formatSupplier =
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
Expand All @@ -168,7 +168,7 @@ public RecordReader<InternalRow> createReader(
? schema
: schemaManager.schema(schemaId));

BulkFormatMapping bulkFormatMapping =
FormatReaderMapping formatReaderMapping =
bulkFormatMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
key -> formatSupplier.get());
Expand All @@ -181,7 +181,7 @@ public RecordReader<InternalRow> createReader(
partition,
file,
dataFilePathFactory,
bulkFormatMapping,
formatReaderMapping,
dvFactory));
}

Expand All @@ -192,16 +192,16 @@ private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
BulkFormatMapping bulkFormatMapping,
FormatReaderMapping formatReaderMapping,
IOExceptionSupplier<DeletionVector> dvFactory)
throws IOException {
FileIndexResult fileIndexResult = null;
if (fileIndexReadEnabled) {
fileIndexResult =
FileIndexEvaluator.evaluate(
fileIO,
bulkFormatMapping.getDataSchema(),
bulkFormatMapping.getDataFilters(),
formatReaderMapping.getDataSchema(),
formatReaderMapping.getDataFilters(),
dataFilePathFactory,
file);
if (!fileIndexResult.remain()) {
Expand All @@ -217,11 +217,11 @@ private FileRecordReader<InternalRow> createFileReader(
fileIndexResult);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
bulkFormatMapping.getReaderFactory(),
formatReaderMapping.getReaderFactory(),
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));

if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;

/** Class with index mapping and bulk format. */
public class BulkFormatMapping {
/** Class with index mapping and format reader. */
public class FormatReaderMapping {

// Index mapping from data schema fields to table schema fields, this is used to realize paimon
// schema evolution. And it combines trimeedKeyMapping, which maps key fields to the value
Expand All @@ -56,21 +56,21 @@ public class BulkFormatMapping {
@Nullable private final CastFieldGetter[] castMapping;
// partition fields mapping, add partition fields to the read fields
@Nullable private final Pair<int[], RowType> partitionPair;
private final FormatReaderFactory bulkFormat;
private final FormatReaderFactory readerFactory;
private final TableSchema dataSchema;
private final List<Predicate> dataFilters;

public BulkFormatMapping(
public FormatReaderMapping(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable int[] trimmedKeyMapping,
@Nullable Pair<int[], RowType> partitionPair,
FormatReaderFactory bulkFormat,
FormatReaderFactory readerFactory,
TableSchema dataSchema,
List<Predicate> dataFilters) {
this.indexMapping = combine(indexMapping, trimmedKeyMapping);
this.castMapping = castMapping;
this.bulkFormat = bulkFormat;
this.readerFactory = readerFactory;
this.partitionPair = partitionPair;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
Expand Down Expand Up @@ -112,7 +112,7 @@ public Pair<int[], RowType> getPartitionPair() {
}

public FormatReaderFactory getReaderFactory() {
return bulkFormat;
return readerFactory;
}

public TableSchema getDataSchema() {
Expand All @@ -123,7 +123,7 @@ public List<Predicate> getDataFilters() {
return dataFilters;
}

/** Builder for {@link BulkFormatMapping}. */
/** Builder for {@link FormatReaderMapping}. */
public static class BulkFormatMappingBuilder {

private final FileFormatDiscover formatDiscover;
Expand Down Expand Up @@ -160,7 +160,7 @@ public BulkFormatMappingBuilder(
* fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce
* the real read fields and tell us how to map it back.
*/
public BulkFormatMapping build(
public FormatReaderMapping build(
String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) {

// extract the whole data fields in logic.
Expand All @@ -187,7 +187,7 @@ public BulkFormatMapping build(
// build read filters
List<Predicate> readFilters = readFilters(filters, tableSchema, dataSchema);

return new BulkFormatMapping(
return new FormatReaderMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.ArrayList;
import java.util.List;

/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */
public class BulkFormatMappingTest {
/** Test for {@link FormatReaderMapping.BulkFormatMappingBuilder}. */
public class FormatReaderMappingTest {

@Test
public void testTrimKeyFields() {
Expand Down Expand Up @@ -80,7 +80,7 @@ public void testTrimKeyFields() {
testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING()));

Pair<int[], RowType> res =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields);
FormatReaderMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields);

Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5);

Expand Down Expand Up @@ -124,11 +124,11 @@ public void testTrimKeyWithIndexMapping() {

// map from key fields reading to value fields reading
Pair<int[], RowType> trimmedKeyPair =
BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(
FormatReaderMapping.BulkFormatMappingBuilder.trimKeyFields(
readDataFields, readDataFields);

BulkFormatMapping bulkFormatMapping =
new BulkFormatMapping(
FormatReaderMapping formatReaderMapping =
new FormatReaderMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
trimmedKeyPair.getLeft(),
Expand All @@ -137,7 +137,8 @@ public void testTrimKeyWithIndexMapping() {
null,
null);

Assertions.assertThat(bulkFormatMapping.getIndexMapping()).containsExactly(0, 1, 0, -1, 2);
Assertions.assertThat(formatReaderMapping.getIndexMapping())
.containsExactly(0, 1, 0, -1, 2);
List<DataField> trimmed = trimmedKeyPair.getRight().getFields();
Assertions.assertThat(trimmed.get(0).id()).isEqualTo(1);
Assertions.assertThat(trimmed.get(1).id()).isEqualTo(0);
Expand Down

0 comments on commit 8415397

Please sign in to comment.