diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java index 8c2b47c848a1..d1355f1bbec8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java @@ -20,13 +20,12 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** Scanning plan containing snapshot ID and input splits. */ -public class DataFilePlan implements TableScan.Plan, Serializable { +public class DataFilePlan implements TableScan.Plan { private final List splits; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java index b5e861e02665..84c9ece9e181 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java @@ -22,11 +22,10 @@ import org.jetbrains.annotations.Nullable; -import java.io.Serializable; import java.util.List; /** An implementation of {@link SnapshotReader.Plan}. */ -public class PlanImpl implements SnapshotReader.Plan, Serializable { +public class PlanImpl implements SnapshotReader.Plan { private final Long watermark; private final Long snapshotId; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java index 5bb47e6e0d18..a7b1236bfd48 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java @@ -121,7 +121,13 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new AllTableSplit(fileIO, allTablePaths)); + return () -> + Collections.singletonList( + new AllTableSplit( + options(fileIO, allTablePaths).values().stream() + .flatMap(t -> t.values().stream()) + .reduce(0, (a, b) -> a + b.size(), Integer::sum), + allTablePaths)); } } @@ -129,19 +135,17 @@ private static class AllTableSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Map> allTablePaths; - private AllTableSplit(FileIO fileIO, Map> allTablePaths) { - this.fileIO = fileIO; + private AllTableSplit(long rowCount, Map> allTablePaths) { + this.rowCount = rowCount; this.allTablePaths = allTablePaths; } @Override public long rowCount() { - return options(fileIO, allTablePaths).values().stream() - .flatMap(t -> t.values().stream()) - .reduce(0, (a, b) -> a + b.size(), Integer::sum); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index d9f5d1ff26c4..0501dab08527 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -188,7 +188,7 @@ public InnerTableScan withFilter(Predicate pushdown) { public Plan innerPlan() { // plan here, just set the result of plan to split TableScan.Plan plan = tablePlan(); - return () -> Collections.singletonList(new FilesSplit(plan)); + return () -> Collections.singletonList(new FilesSplit(plan.splits())); } private TableScan.Plan tablePlan() { @@ -235,22 +235,22 @@ private static class FilesSplit implements Split { private static final long serialVersionUID = 1L; - private final TableScan.Plan plan; + private final List splits; - private FilesSplit(TableScan.Plan plan) { - this.plan = plan; + private FilesSplit(List splits) { + this.splits = splits; } @Override public long rowCount() { - return plan.splits().stream() + return splits.stream() .map(s -> (DataSplit) s) .mapToLong(s -> s.dataFiles().size()) .sum(); } - public TableScan.Plan plan() { - return plan; + public List splits() { + return splits; } @Override @@ -262,12 +262,12 @@ public boolean equals(Object o) { return false; } FilesSplit that = (FilesSplit) o; - return Objects.equals(plan, that.plan); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(plan); + return Objects.hash(splits); } } @@ -307,8 +307,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } FilesSplit filesSplit = (FilesSplit) split; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } @@ -343,7 +342,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { }); } }; - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index e8f3b45c831f..4f1899394b04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -36,7 +36,6 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -135,7 +134,8 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { return () -> - Collections.singletonList(new PartitionsSplit(storeTable.newScan().plan())); + Collections.singletonList( + new PartitionsSplit(storeTable.newScan().plan().splits())); } } @@ -143,22 +143,22 @@ private static class PartitionsSplit implements Split { private static final long serialVersionUID = 1L; - private final TableScan.Plan plan; + private final List splits; - private PartitionsSplit(TableScan.Plan plan) { - this.plan = plan; + private PartitionsSplit(List splits) { + this.splits = splits; } @Override public long rowCount() { - return plan.splits().stream() + return splits.stream() .map(s -> ((DataSplit) s).partition()) .collect(Collectors.toSet()) .size(); } - private TableScan.Plan plan() { - return plan; + private List splits() { + return splits; } @Override @@ -170,12 +170,12 @@ public boolean equals(Object o) { return false; } PartitionsSplit that = (PartitionsSplit) o; - return Objects.equals(plan, that.plan); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(plan); + return Objects.hash(splits); } } @@ -212,8 +212,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } PartitionsSplit filesSplit = (PartitionsSplit) split; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } List> iteratorList = new ArrayList<>(); @@ -221,7 +220,7 @@ public RecordReader createReader(Split split) throws IOException { new RowDataToObjectArrayConverter( fileStoreTable.schema().logicalPartitionType()); - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(),