Skip to content

Commit

Permalink
minus
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Apr 9, 2024
1 parent 4e415a8 commit 6218970
Showing 1 changed file with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.Triple;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +70,8 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final Map<FormatKey, Triple<TableSchema, List<Predicate>, BulkFormatMapping>>
formatKeyTripleMap;
private final boolean fileIndexReadEnabled;

private int[][] projection;
Expand All @@ -89,7 +91,7 @@ public AppendOnlyFileStoreRead(
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
this.formatKeyTripleMap = new HashMap<>();
this.fileIndexReadEnabled = fileIndexReadEnabled;

this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes();
Expand All @@ -114,14 +116,12 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
if (split.beforeFiles().size() > 0) {
LOG.info("Ignore split before files: " + split.beforeFiles());
}
// use this to cache evolved predicates
Map<FormatKey, List<Predicate>> filePredicates = new HashMap<>();
Map<FormatKey, TableSchema> fileSchema = new HashMap<>();

for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
FormatKey formatKey = new FormatKey(file.schemaId(), formatIdentifier);
BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
Triple<TableSchema, List<Predicate>, BulkFormatMapping> formatMappingTriple =
formatKeyTripleMap.computeIfAbsent(
formatKey,
key -> {
TableSchema tableSchema = schema;
Expand Down Expand Up @@ -172,21 +172,22 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
Projection.of(dataProjection)
.project(dataSchema.logicalRowType());

fileSchema.put(key, dataSchema);
if (dataFilters != null) {
filePredicates.put(key, dataFilters);
}
return new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(
projectedRowType, dataFilters));
return Triple.of(
dataSchema,
dataFilters,
new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(
projectedRowType, dataFilters)));
});

List<Predicate> dataFilter = filePredicates.getOrDefault(formatKey, null);
TableSchema dataSchema = formatMappingTriple.f0;
List<Predicate> dataFilter = formatMappingTriple.f1;
BulkFormatMapping bulkFormatMapping = formatMappingTriple.f2;
if (dataFilter != null && !dataFilter.isEmpty()) {
List<String> indexFiles =
file.extraFiles().stream()
Expand All @@ -201,7 +202,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
fileSchema.get(formatKey).logicalRowType())) {
dataSchema.logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) {
continue;
Expand Down

0 comments on commit 6218970

Please sign in to comment.