Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunhao committed Mar 13, 2024
1 parent 3e0101f commit de25f97
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import org.apache.paimon.table.source.snapshot.StartingScanner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Scanning plan containing snapshot ID and input splits. */
public class DataFilePlan implements TableScan.Plan, Serializable {
public class DataFilePlan implements TableScan.Plan {

private final List<DataSplit> splits;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@

import org.jetbrains.annotations.Nullable;

import java.io.Serializable;
import java.util.List;

/** An implementation of {@link SnapshotReader.Plan}. */
public class PlanImpl implements SnapshotReader.Plan, Serializable {
public class PlanImpl implements SnapshotReader.Plan {

private final Long watermark;
private final Long snapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,31 @@ public InnerTableScan withFilter(Predicate predicate) {

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new AllTableSplit(fileIO, allTablePaths));
return () ->
Collections.singletonList(
new AllTableSplit(
options(fileIO, allTablePaths).values().stream()
.flatMap(t -> t.values().stream())
.reduce(0, (a, b) -> a + b.size(), Integer::sum),
allTablePaths));
}
}

private static class AllTableSplit implements Split {

private static final long serialVersionUID = 1L;

private final FileIO fileIO;
private final long rowCount;
private final Map<String, Map<String, Path>> allTablePaths;

private AllTableSplit(FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
this.fileIO = fileIO;
private AllTableSplit(long rowCount, Map<String, Map<String, Path>> allTablePaths) {
this.rowCount = rowCount;
this.allTablePaths = allTablePaths;
}

@Override
public long rowCount() {
return options(fileIO, allTablePaths).values().stream()
.flatMap(t -> t.values().stream())
.reduce(0, (a, b) -> a + b.size(), Integer::sum);
return rowCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public InnerTableScan withFilter(Predicate pushdown) {
public Plan innerPlan() {
// plan here, just set the result of plan to split
TableScan.Plan plan = tablePlan();
return () -> Collections.singletonList(new FilesSplit(plan));
return () -> Collections.singletonList(new FilesSplit(plan.splits()));
}

private TableScan.Plan tablePlan() {
Expand Down Expand Up @@ -235,22 +235,22 @@ private static class FilesSplit implements Split {

private static final long serialVersionUID = 1L;

private final TableScan.Plan plan;
private final List<Split> splits;

private FilesSplit(TableScan.Plan plan) {
this.plan = plan;
private FilesSplit(List<Split> splits) {
this.splits = splits;
}

@Override
public long rowCount() {
return plan.splits().stream()
return splits.stream()
.map(s -> (DataSplit) s)
.mapToLong(s -> s.dataFiles().size())
.sum();
}

public TableScan.Plan plan() {
return plan;
public List<Split> splits() {
return splits;
}

@Override
Expand All @@ -262,12 +262,12 @@ public boolean equals(Object o) {
return false;
}
FilesSplit that = (FilesSplit) o;
return Objects.equals(plan, that.plan);
return Objects.equals(splits, that.splits);
}

@Override
public int hashCode() {
return Objects.hash(plan);
return Objects.hash(splits);
}
}

Expand Down Expand Up @@ -307,8 +307,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
FilesSplit filesSplit = (FilesSplit) split;
TableScan.Plan plan = filesSplit.plan();
if (plan.splits().isEmpty()) {
if (filesSplit.splits().isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}

Expand Down Expand Up @@ -343,7 +342,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) {
});
}
};
for (Split dataSplit : plan.splits()) {
for (Split dataSplit : filesSplit.splits()) {
iteratorList.add(
Iterators.transform(
((DataSplit) dataSplit).dataFiles().iterator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -135,30 +134,31 @@ public InnerTableScan withFilter(Predicate predicate) {
@Override
public Plan innerPlan() {
return () ->
Collections.singletonList(new PartitionsSplit(storeTable.newScan().plan()));
Collections.singletonList(
new PartitionsSplit(storeTable.newScan().plan().splits()));
}
}

private static class PartitionsSplit implements Split {

private static final long serialVersionUID = 1L;

private final TableScan.Plan plan;
private final List<Split> splits;

private PartitionsSplit(TableScan.Plan plan) {
this.plan = plan;
private PartitionsSplit(List<Split> splits) {
this.splits = splits;
}

@Override
public long rowCount() {
return plan.splits().stream()
return splits.stream()
.map(s -> ((DataSplit) s).partition())
.collect(Collectors.toSet())
.size();
}

private TableScan.Plan plan() {
return plan;
private List<Split> splits() {
return splits;
}

@Override
Expand All @@ -170,12 +170,12 @@ public boolean equals(Object o) {
return false;
}
PartitionsSplit that = (PartitionsSplit) o;
return Objects.equals(plan, that.plan);
return Objects.equals(splits, that.splits);
}

@Override
public int hashCode() {
return Objects.hash(plan);
return Objects.hash(splits);
}
}

Expand Down Expand Up @@ -212,16 +212,15 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
PartitionsSplit filesSplit = (PartitionsSplit) split;
TableScan.Plan plan = filesSplit.plan();
if (plan.splits().isEmpty()) {
if (filesSplit.splits().isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}
List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
RowDataToObjectArrayConverter partitionConverter =
new RowDataToObjectArrayConverter(
fileStoreTable.schema().logicalPartitionType());

for (Split dataSplit : plan.splits()) {
for (Split dataSplit : filesSplit.splits()) {
iteratorList.add(
Iterators.transform(
((DataSplit) dataSplit).dataFiles().iterator(),
Expand Down

0 comments on commit de25f97

Please sign in to comment.