Skip to content

Commit

Permalink
[bug] [core] FilesTable splits too big to distribute.
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunhao committed May 31, 2024
1 parent dd12fd4 commit 0cf46ab
Showing 1 changed file with 54 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -134,7 +133,7 @@ public List<String> primaryKeys() {

@Override
public InnerTableScan newScan() {
return new FilesScan(storeTable);
return new FilesScan();
}

@Override
Expand All @@ -150,16 +149,10 @@ public Table copy(Map<String, String> dynamicOptions) {

private static class FilesScan extends ReadOnceTableScan {

private final FileStoreTable storeTable;

@Nullable private LeafPredicate partitionPredicate;
@Nullable private LeafPredicate bucketPredicate;
@Nullable private LeafPredicate levelPredicate;

private FilesScan(FileStoreTable storeTable) {
this.storeTable = storeTable;
}

@Override
public InnerTableScan withFilter(Predicate pushdown) {
if (pushdown == null) {
Expand All @@ -176,12 +169,57 @@ public InnerTableScan withFilter(Predicate pushdown) {

@Override
public Plan innerPlan() {
// plan here, just set the result of plan to split
TableScan.Plan plan = tablePlan();
return () -> Collections.singletonList(new FilesSplit(plan.splits()));
return () ->
Collections.singletonList(
new FilesSplit(partitionPredicate, bucketPredicate, levelPredicate));
}
}

private static class FilesSplit implements Split {

@Nullable private final LeafPredicate partitionPredicate;
@Nullable private final LeafPredicate bucketPredicate;
@Nullable private final LeafPredicate levelPredicate;

private FilesSplit(
@Nullable LeafPredicate partitionPredicate,
@Nullable LeafPredicate bucketPredicate,
@Nullable LeafPredicate levelPredicate) {
this.partitionPredicate = partitionPredicate;
this.bucketPredicate = bucketPredicate;
this.levelPredicate = levelPredicate;
}

@Override
public long rowCount() {
// just put one for statistics
return 1;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FilesSplit that = (FilesSplit) o;
return Objects.equals(partitionPredicate, that.partitionPredicate)
&& Objects.equals(bucketPredicate, that.bucketPredicate)
&& Objects.equals(this.levelPredicate, that.levelPredicate);
}

@Override
public int hashCode() {
return Objects.hash(partitionPredicate, bucketPredicate, levelPredicate);
}

public List<Split> splits(FileStoreTable storeTable) {
return tablePlan(storeTable).splits();
}

private TableScan.Plan tablePlan() {
private TableScan.Plan tablePlan(FileStoreTable storeTable) {
InnerTableScan scan = storeTable.newScan();
if (partitionPredicate != null) {
if (partitionPredicate.function() instanceof Equal) {
Expand Down Expand Up @@ -221,46 +259,6 @@ private TableScan.Plan tablePlan() {
}
}

private static class FilesSplit implements Split {

private static final long serialVersionUID = 1L;

private final List<Split> splits;

private FilesSplit(List<Split> splits) {
this.splits = splits;
}

@Override
public long rowCount() {
return splits.stream()
.map(s -> (DataSplit) s)
.mapToLong(s -> s.dataFiles().size())
.sum();
}

public List<Split> splits() {
return splits;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FilesSplit that = (FilesSplit) o;
return Objects.equals(splits, that.splits);
}

@Override
public int hashCode() {
return Objects.hash(splits);
}
}

private static class FilesRead implements InnerTableRead {

private final SchemaManager schemaManager;
Expand Down Expand Up @@ -292,12 +290,13 @@ public TableRead withIOManager(IOManager ioManager) {
}

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof FilesSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
FilesSplit filesSplit = (FilesSplit) split;
if (filesSplit.splits().isEmpty()) {
List<Split> splits = filesSplit.splits(storeTable);
if (splits.isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}

Expand Down Expand Up @@ -332,7 +331,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) {
});
}
};
for (Split dataSplit : filesSplit.splits()) {
for (Split dataSplit : splits) {
iteratorList.add(
Iterators.transform(
((DataSplit) dataSplit).dataFiles().iterator(),
Expand Down

0 comments on commit 0cf46ab

Please sign in to comment.