Skip to content

Commit

Permalink
minus
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Apr 8, 2024
1 parent 5ffbfb2 commit 4e415a8
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 76 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,24 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>file.index.columns</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The secondary index columns.</td>
</tr>
<tr>
<td><h5>file.index.read.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
<tr>
<td><h5>file.index.size-in-meta</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
<td>MemorySize</td>
<td>Max memory size for lookup cache.</td>
</tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public DataFileMeta fromRow(InternalRow row) {
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
deserializeBinaryRow(row.getBinary(14)),
row.isNullAt(14)
? DataFileMeta.EMPTY_FILTER
: deserializeBinaryRow(row.getBinary(14)),
deserializeBinaryRow(row.getBinary(3)),
deserializeBinaryRow(row.getBinary(4)),
BinaryTableStats.fromRow(row.getRow(5, 3)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,89 +115,100 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
LOG.info("Ignore split before files: " + split.beforeFiles());
}
// use this to cache evolved predicates
Map<Long, List<Predicate>> filePredicates = new HashMap<>();
Map<FormatKey, List<Predicate>> filePredicates = new HashMap<>();
Map<FormatKey, TableSchema> fileSchema = new HashMap<>();
for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
FormatKey key = new FormatKey(file.schemaId(), formatIdentifier);

if (!bulkFormatMappings.containsKey(key)) {
TableSchema tableSchema = schema;
TableSchema dataSchema = schemaManager.schema(key.schemaId);

// projection to data schema
int[][] dataProjection =
SchemaEvolutionUtil.createDataProjection(
tableSchema.fields(), dataSchema.fields(), projection);
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(
Projection.of(projection).toTopLevelIndexes(),
tableSchema.fields(),
Projection.of(dataProjection).toTopLevelIndexes(),
dataSchema.fields());

List<Predicate> dataFilters =
this.schema.id() == key.schemaId
? filters
: filePredicates.computeIfAbsent(
key.schemaId,
k ->
SchemaEvolutionUtil.createDataFilters(
FormatKey formatKey = new FormatKey(file.schemaId(), formatIdentifier);
BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
formatKey,
key -> {
TableSchema tableSchema = schema;
TableSchema dataSchema =
key.schemaId == schema.id()
? schema
: schemaManager.schema(key.schemaId);

// projection to data schema
int[][] dataProjection =
SchemaEvolutionUtil.createDataProjection(
tableSchema.fields(),
dataSchema.fields(),
projection);

IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(
Projection.of(projection).toTopLevelIndexes(),
tableSchema.fields(),
Projection.of(dataProjection).toTopLevelIndexes(),
dataSchema.fields());

List<Predicate> dataFilters =
this.schema.id() == key.schemaId
? filters
: SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(),
dataSchema.fields(),
filters));

if (dataFilters != null && !dataFilters.isEmpty()) {
List<String> indexFiles =
file.extraFiles().stream()
.filter(
name ->
name.startsWith(
DataFilePathFactory.INDEX_PATH_PREFIX))
.collect(Collectors.toList());
if (fileIndexReadEnabled && !indexFiles.isEmpty()) {
// go to secondary index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilters.toArray(new Predicate[0])))) {
continue;
}
filters);

Pair<int[], RowType> partitionPair = null;
if (!dataSchema.partitionKeys().isEmpty()) {
Pair<int[], int[][]> partitionMapping =
PartitionUtils.constructPartitionMapping(
dataSchema, dataProjection);
// if partition fields are not selected, we just do nothing
if (partitionMapping != null) {
dataProjection = partitionMapping.getRight();
partitionPair =
Pair.of(
partitionMapping.getLeft(),
dataSchema.projectedLogicalRowType(
dataSchema.partitionKeys()));
}
}

RowType projectedRowType =
Projection.of(dataProjection)
.project(dataSchema.logicalRowType());

fileSchema.put(key, dataSchema);
if (dataFilters != null) {
filePredicates.put(key, dataFilters);
}
return new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(
projectedRowType, dataFilters));
});

List<Predicate> dataFilter = filePredicates.getOrDefault(formatKey, null);
if (dataFilter != null && !dataFilter.isEmpty()) {
List<String> indexFiles =
file.extraFiles().stream()
.filter(
name ->
name.startsWith(
DataFilePathFactory.INDEX_PATH_PREFIX))
.collect(Collectors.toList());
if (fileIndexReadEnabled && !indexFiles.isEmpty()) {
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
fileIO,
fileSchema.get(formatKey).logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) {
continue;
}
}
}

Pair<int[], RowType> partitionPair = null;
if (!dataSchema.partitionKeys().isEmpty()) {
Pair<int[], int[][]> partitionMappping =
PartitionUtils.constructPartitionMapping(dataSchema, dataProjection);
// if partition fields are not selected, we just do nothing
if (partitionMappping != null) {
dataProjection = partitionMappping.getRight();
partitionPair =
Pair.of(
partitionMappping.getLeft(),
dataSchema.projectedLogicalRowType(
dataSchema.partitionKeys()));
}
}

RowType projectedRowType =
Projection.of(dataProjection).project(dataSchema.logicalRowType());

bulkFormatMappings.put(
key,
new BulkFormatMapping(
indexCastMapping.getIndexMapping(),
indexCastMapping.getCastMapping(),
partitionPair,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(projectedRowType, dataFilters)));
}
BulkFormatMapping bulkFormatMapping = bulkFormatMappings.get(key);

final BinaryRow partition = split.partition();
suppliers.add(
Expand Down

0 comments on commit 4e415a8

Please sign in to comment.