From ee1d5417e2af218af4a04448a306fa99e68ac258 Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Fri, 31 May 2024 19:39:19 +0800 Subject: [PATCH] [core] Fix FilesTable splits too big to distribute (#3454) --- .../paimon/table/source/SingletonSplit.java | 28 +++++ .../table/system/AggregationFieldsTable.java | 18 +-- .../table/system/AllTableOptionsTable.java | 23 +--- .../paimon/table/system/BranchesTable.java | 16 +-- .../table/system/CatalogOptionsTable.java | 11 +- .../paimon/table/system/ConsumersTable.java | 18 +-- .../paimon/table/system/FilesTable.java | 104 +++++++++--------- .../paimon/table/system/ManifestsTable.java | 34 ++---- .../paimon/table/system/OptionsTable.java | 16 +-- .../paimon/table/system/PartitionsTable.java | 8 +- .../paimon/table/system/SchemasTable.java | 22 +--- .../paimon/table/system/SnapshotsTable.java | 9 +- .../paimon/table/system/StatisticTable.java | 23 +--- .../apache/paimon/table/system/TagsTable.java | 16 +-- 14 files changed, 125 insertions(+), 221 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java new file mode 100644 index 000000000000..e5a649a72eba --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +/** Singleton split use for system table, in which, scan always just produce one split. */ +public abstract class SingletonSplit implements Split { + + @Override + public long rowCount() { + return 1; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index 29bec8502d46..0aa17b1b6f52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataField; @@ -120,32 +121,21 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new AggregationSplit( - new SchemaManager(fileIO, location).listAllIds().size(), - location)); + return () -> Collections.singletonList(new AggregationSplit(location)); } } /** {@link Split} implementation for {@link AggregationFieldsTable}. */ - private static class AggregationSplit implements Split { + private static class AggregationSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private AggregationSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private AggregationSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java index a7b1236bfd48..eb50a38a5bc1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataField; @@ -42,7 +43,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -121,33 +121,20 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new AllTableSplit( - options(fileIO, allTablePaths).values().stream() - .flatMap(t -> t.values().stream()) - .reduce(0, (a, b) -> a + b.size(), Integer::sum), - allTablePaths)); + return () -> Collections.singletonList(new AllTableSplit(allTablePaths)); } } - private static class AllTableSplit implements Split { + private static class AllTableSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Map> allTablePaths; - private AllTableSplit(long rowCount, Map> allTablePaths) { - this.rowCount = rowCount; + private AllTableSplit(Map> allTablePaths) { this.allTablePaths = allTablePaths; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -192,7 +179,7 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof AllTableSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index 7ae31095cf99..f373706419c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -35,6 +35,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -122,28 +123,19 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - FileStoreTable table = FileStoreTableFactory.create(fileIO, location); - long rowCount = table.branchManager().branchCount(); - return () -> Collections.singletonList(new BranchesSplit(rowCount, location)); + return () -> Collections.singletonList(new BranchesSplit(location)); } } - private static class BranchesSplit implements Split { + private static class BranchesSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private BranchesSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private BranchesSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java index 2e94edf1eb4b..699d0324f1f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataField; @@ -39,7 +40,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -113,7 +113,7 @@ public Plan innerPlan() { } } - private static class CatalogOptionsSplit implements Split { + private static class CatalogOptionsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; @@ -123,11 +123,6 @@ private CatalogOptionsSplit(Options catalogOptions) { this.catalogOptions = catalogOptions.toMap(); } - @Override - public long rowCount() { - return catalogOptions.size(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -168,7 +163,7 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java index a3ec3017eb5b..05b3382d20ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -114,32 +115,21 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new ConsumersTable.ConsumersSplit( - new ConsumerManager(fileIO, location).listAllIds().size(), - location)); + return () -> Collections.singletonList(new ConsumersTable.ConsumersSplit(location)); } } /** {@link Split} implementation for {@link ConsumersTable}. */ - private static class ConsumersSplit implements Split { + private static class ConsumersSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private ConsumersSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private ConsumersSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 5f47e574b019..bb29a0bd7dda 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -43,6 +43,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -62,7 +63,6 @@ import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -134,7 +134,7 @@ public List primaryKeys() { @Override public InnerTableScan newScan() { - return new FilesScan(storeTable); + return new FilesScan(); } @Override @@ -150,16 +150,10 @@ public Table copy(Map dynamicOptions) { private static class FilesScan extends ReadOnceTableScan { - private final FileStoreTable storeTable; - @Nullable private LeafPredicate partitionPredicate; @Nullable private LeafPredicate bucketPredicate; @Nullable private LeafPredicate levelPredicate; - private FilesScan(FileStoreTable storeTable) { - this.storeTable = storeTable; - } - @Override public InnerTableScan withFilter(Predicate pushdown) { if (pushdown == null) { @@ -176,12 +170,51 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { - // plan here, just set the result of plan to split - TableScan.Plan plan = tablePlan(); - return () -> Collections.singletonList(new FilesSplit(plan.splits())); + return () -> + Collections.singletonList( + new FilesSplit(partitionPredicate, bucketPredicate, levelPredicate)); + } + } + + private static class FilesSplit extends SingletonSplit { + + @Nullable private final LeafPredicate partitionPredicate; + @Nullable private final LeafPredicate bucketPredicate; + @Nullable private final LeafPredicate levelPredicate; + + private FilesSplit( + @Nullable LeafPredicate partitionPredicate, + @Nullable LeafPredicate bucketPredicate, + @Nullable LeafPredicate levelPredicate) { + this.partitionPredicate = partitionPredicate; + this.bucketPredicate = bucketPredicate; + this.levelPredicate = levelPredicate; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FilesSplit that = (FilesSplit) o; + return Objects.equals(partitionPredicate, that.partitionPredicate) + && Objects.equals(bucketPredicate, that.bucketPredicate) + && Objects.equals(this.levelPredicate, that.levelPredicate); + } + + @Override + public int hashCode() { + return Objects.hash(partitionPredicate, bucketPredicate, levelPredicate); + } + + public List splits(FileStoreTable storeTable) { + return tablePlan(storeTable).splits(); } - private TableScan.Plan tablePlan() { + private TableScan.Plan tablePlan(FileStoreTable storeTable) { InnerTableScan scan = storeTable.newScan(); if (partitionPredicate != null) { if (partitionPredicate.function() instanceof Equal) { @@ -221,46 +254,6 @@ private TableScan.Plan tablePlan() { } } - private static class FilesSplit implements Split { - - private static final long serialVersionUID = 1L; - - private final List splits; - - private FilesSplit(List splits) { - this.splits = splits; - } - - @Override - public long rowCount() { - return splits.stream() - .map(s -> (DataSplit) s) - .mapToLong(s -> s.dataFiles().size()) - .sum(); - } - - public List splits() { - return splits; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FilesSplit that = (FilesSplit) o; - return Objects.equals(splits, that.splits); - } - - @Override - public int hashCode() { - return Objects.hash(splits); - } - } - private static class FilesRead implements InnerTableRead { private final SchemaManager schemaManager; @@ -292,12 +285,13 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof FilesSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } FilesSplit filesSplit = (FilesSplit) split; - if (filesSplit.splits().isEmpty()) { + List splits = filesSplit.splits(storeTable); + if (splits.isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } @@ -332,7 +326,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { }); } }; - for (Split dataSplit : filesSplit.splits()) { + for (Split dataSplit : splits) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 7d7a9570e55f..e07ff602c68f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -35,6 +35,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -48,13 +49,11 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -109,7 +108,7 @@ public Table copy(Map dynamicOptions) { return new ManifestsTable(dataTable.copy(dynamicOptions)); } - private class ManifestsScan extends ReadOnceTableScan { + private static class ManifestsScan extends ReadOnceTableScan { @Override public InnerTableScan withFilter(Predicate predicate) { @@ -119,41 +118,22 @@ public InnerTableScan withFilter(Predicate predicate) { @Override protected Plan innerPlan() { - return () -> - Collections.singletonList(new ManifestsSplit(allManifests(dataTable).size())); + return () -> Collections.singletonList(new ManifestsSplit()); } } - private static class ManifestsSplit implements Split { + private static class ManifestsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; - - private ManifestsSplit(long rowCount) { - this.rowCount = rowCount; - } - - @Override - public long rowCount() { - return rowCount; - } + private ManifestsSplit() {} @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { - return false; - } - ManifestsSplit that = (ManifestsSplit) o; - return Objects.equals(rowCount, that.rowCount); - } - - @Override - public int hashCode() { - return Objects.hash(rowCount); + return o != null && getClass() == o.getClass(); } } @@ -185,7 +165,7 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof ManifestsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index b740ddec1e2b..0c8ac2f251b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataField; @@ -112,29 +113,20 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new OptionsSplit(options(fileIO, location).size(), location)); + return () -> Collections.singletonList(new OptionsSplit(location)); } } - private static class OptionsSplit implements Split { + private static class OptionsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private OptionsSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private OptionsSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index c08cdfaa3de4..76c0768eeeac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -124,15 +125,10 @@ public Plan innerPlan() { } } - private static class PartitionsSplit implements Split { + private static class PartitionsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - @Override - public long rowCount() { - return 1; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index 127323b42b50..2dcbcd2d9d57 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -47,7 +48,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -127,33 +127,21 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new SchemasSplit( - new SchemaManager(fileIO, location).listAllIds().size(), - location)); + return () -> Collections.singletonList(new SchemasSplit(location)); } } /** {@link Split} implementation for {@link SchemasTable}. */ - private static class SchemasSplit implements Split { + private static class SchemasSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private SchemasSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private SchemasSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - - @Override public boolean equals(Object o) { if (this == o) { return true; @@ -198,7 +186,7 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof SchemasSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index b8ff9cd8bda8..991129038fa7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -37,6 +37,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -155,7 +156,7 @@ public Plan innerPlan() { } } - private static class SnapshotsSplit implements Split { + private static class SnapshotsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; @@ -165,12 +166,6 @@ private SnapshotsSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - // dummy 1, just 1 parallelism - return 1; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java index 682b5b775629..600c85a6bd66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.BigIntType; @@ -43,7 +44,6 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; -import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; @@ -125,33 +125,18 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - long rowCount; - try { - rowCount = new SnapshotManager(fileIO, location).snapshotCount(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return () -> - Collections.singletonList( - new StatisticTable.StatisticSplit(rowCount, location)); + return () -> Collections.singletonList(new StatisticTable.StatisticSplit(location)); } } - private static class StatisticSplit implements Split { + private static class StatisticSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private StatisticSplit(long rowCount, Path location) { + private StatisticSplit(Path location) { this.location = location; - this.rowCount = rowCount; - } - - @Override - public long rowCount() { - return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 0c1c4aa29925..9e311ae15c0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.tag.Tag; @@ -126,29 +127,20 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new TagsSplit(new TagManager(fileIO, location).tagCount(), location)); + return () -> Collections.singletonList(new TagsSplit(location)); } } - private static class TagsSplit implements Split { + private static class TagsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private TagsSplit(long rowCount, Path location) { - this.rowCount = rowCount; + private TagsSplit(Path location) { this.location = location; } - @Override - public long rowCount() { - return rowCount; - } - @Override public boolean equals(Object o) { if (this == o) {