From 17832169b415452d0c8e4792ade69489e2a1ec30 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Tue, 12 Nov 2024 23:28:34 +0800 Subject: [PATCH] [core] support delete stats in result of scan plan --- .../org/apache/paimon/io/DataFileMeta.java | 21 ++++++++++++++++++ .../apache/paimon/manifest/ManifestEntry.java | 4 ++++ .../operation/AbstractFileStoreScan.java | 13 +++++++++++ .../paimon/operation/FileStoreScan.java | 2 ++ .../table/source/AbstractDataTableScan.java | 6 +++++ .../paimon/table/source/InnerTableScan.java | 5 +++++ .../paimon/table/source/ReadBuilder.java | 3 +++ .../paimon/table/source/ReadBuilderImpl.java | 11 ++++++++++ .../table/source/snapshot/SnapshotReader.java | 2 ++ .../source/snapshot/SnapshotReaderImpl.java | 6 +++++ .../paimon/table/system/AuditLogTable.java | 6 +++++ .../paimon/table/system/FilesTable.java | 1 + .../operation/KeyValueFileStoreScanTest.java | 22 +++++++++++++++++++ .../source/ContinuousFileStoreSource.java | 2 +- .../paimon/flink/source/FlinkTableSource.java | 8 ++++++- .../flink/source/StaticFileStoreSource.java | 2 +- .../source/operator/MonitorFunction.java | 2 +- 17 files changed, 112 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index b6cac5ae5130..bb9e45ff002d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -409,6 +409,27 @@ public DataFileMeta rename(String newFileName) { valueStatsCols); } + public DataFileMeta copyWithoutStats() { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + EMPTY_STATS, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + Collections.emptyList()); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index f7c5c4639a6f..ee5dc2c34421 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -121,6 +121,10 @@ public Identifier identifier() { file.embeddedIndex()); } + public ManifestEntry copyWithoutStats() { + return new ManifestEntry(kind, partition, bucket, totalBuckets, file.copyWithoutStats()); + } + @Override public boolean equals(Object o) { if (!(o instanceof ManifestEntry)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 683e6ffda481..0e1f9357e312 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -90,6 +90,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private ManifestCacheFilter manifestCacheFilter = null; private ScanMetrics scanMetrics = null; + private boolean dropStats; public AbstractFileStoreScan( ManifestsReader manifestsReader, @@ -105,6 +106,7 @@ public AbstractFileStoreScan( this.manifestFileFactory = manifestFileFactory; this.tableSchemas = new ConcurrentHashMap<>(); this.parallelism = parallelism; + this.dropStats = false; } @Override @@ -215,6 +217,12 @@ public FileStoreScan withMetrics(ScanMetrics metrics) { return this; } + @Override + public FileStoreScan dropStats() { + this.dropStats = true; + return this; + } + @Nullable @Override public Integer parallelism() { @@ -291,6 +299,11 @@ public Snapshot snapshot() { @Override public List files() { + if (dropStats) { + return files.stream() + .map(ManifestEntry::copyWithoutStats) + .collect(Collectors.toList()); + } return files; } }; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index bc0d7ff27301..e643bf1617b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -81,6 +81,8 @@ public interface FileStoreScan { FileStoreScan withMetrics(ScanMetrics metrics); + FileStoreScan dropStats(); + @Nullable Integer parallelism(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 6a8aa9265e5c..24c6943f546f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -102,6 +102,12 @@ public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry) return this; } + @Override + public AbstractDataTableScan dropStats() { + snapshotReader.dropStats(); + return this; + } + public CoreOptions options() { return options; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 00a4fc0cde18..c2425ff16f97 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -55,4 +55,9 @@ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) { // do nothing, should implement this if need return this; } + + default InnerTableScan dropStats() { + // do nothing, should implement this if need + return this; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 91d5f1004e91..0c1386ce441d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -150,6 +150,9 @@ default ReadBuilder withProjection(int[][] projection) { */ ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtasks); + /** Delete stats in scan plan result. */ + ReadBuilder dropStats(); + /** Create a {@link TableScan} to perform batch planning. */ TableScan newScan(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index 577b0a20a99b..95bfe6f24bc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -51,6 +51,8 @@ public class ReadBuilderImpl implements ReadBuilder { private @Nullable RowType readType; + private boolean dropStats = false; + public ReadBuilderImpl(InnerTable table) { this.table = table; } @@ -124,6 +126,12 @@ public ReadBuilder withBucketFilter(Filter bucketFilter) { return this; } + @Override + public ReadBuilder dropStats() { + this.dropStats = true; + return this; + } + @Override public TableScan newScan() { InnerTableScan tableScan = configureScan(table.newScan()); @@ -156,6 +164,9 @@ private InnerTableScan configureScan(InnerTableScan scan) { if (bucketFilter != null) { scan.withBucketFilter(bucketFilter); } + if (dropStats) { + scan.dropStats(); + } return scan; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 2dd02be04f7c..b59cf98bbb4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -85,6 +85,8 @@ public interface SnapshotReader { SnapshotReader withDataFileNameFilter(Filter fileNameFilter); + SnapshotReader dropStats(); + SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks); SnapshotReader withMetricRegistry(MetricRegistry registry); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index f4591734b68e..7ce537ee52ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -264,6 +264,12 @@ public SnapshotReader withDataFileNameFilter(Filter fileNameFilter) { return this; } + @Override + public SnapshotReader dropStats() { + scan.dropStats(); + return this; + } + @Override public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { if (splitGenerator.alwaysRawConvertible()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index e0acd9fb38ea..ae7e11c3d96d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -342,6 +342,12 @@ public SnapshotReader withDataFileNameFilter(Filter fileNameFilter) { return this; } + @Override + public SnapshotReader dropStats() { + wrapped.dropStats(); + return this; + } + @Override public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 53d207812673..ae1567052b7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -267,6 +267,7 @@ private TableScan.Plan tablePlan(FileStoreTable storeTable) { GenericRow.of(null, null, null, null, null, level)); }); } + return scan.plan(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index ce17450538b1..2fd8c10cd944 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link KeyValueFileStoreScan}. */ @@ -274,6 +275,27 @@ public void testWithManifestList() throws Exception { runTestExactMatch(scan, null, expected); } + @Test + public void testDropStatsInPlan() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(100, 0, (long) Math.abs(random.nextInt(1000))); + writeData(data, 0); + data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000); + writeData(data, 0); + data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000); + writeData(data, 0); + data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000); + Snapshot snapshot = writeData(data, 0); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()).dropStats(); + List files = scan.plan().files(); + + for (ManifestEntry manifestEntry : files) { + assertThat(manifestEntry.file().valueStats()).isEqualTo(EMPTY_STATS); + } + } + private void runTestExactMatch( FileStoreScan scan, Long expectedSnapshotId, Map expected) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 559976921e2e..b7eb1d625ce3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -77,7 +77,7 @@ public SplitEnumerator restoreEnu nextSnapshotId = checkpoint.currentSnapshotId(); splits = checkpoint.splits(); } - StreamTableScan scan = readBuilder.newStreamScan(); + StreamTableScan scan = readBuilder.dropStats().newStreamScan(); if (metricGroup(context) != null) { ((StreamDataTableScan) scan) .withMetricsRegistry(new FlinkMetricRegistry(context.metricGroup())); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 2be0248f3ce8..9bfd36fdfaa8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -175,6 +175,7 @@ protected void scanSplitsForInference() { List partitionEntries = table.newReadBuilder() .withFilter(predicate) + .dropStats() .newScan() .listPartitionEntries(); long totalSize = 0; @@ -188,7 +189,12 @@ protected void scanSplitsForInference() { new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount); } else { List splits = - table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); + table.newReadBuilder() + .withFilter(predicate) + .dropStats() + .newScan() + .plan() + .splits(); splitStatistics = new SplitStatistics( splits.size(), splits.stream().mapToLong(Split::rowCount).sum()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java index af425aab5e46..c388a6dccbbc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java @@ -87,7 +87,7 @@ public SplitEnumerator restoreEnu private List getSplits(SplitEnumeratorContext context) { FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator(); - TableScan scan = readBuilder.newScan(); + TableScan scan = readBuilder.dropStats().newScan(); // register scan metrics if (context.metricGroup() != null) { ((InnerTableScan) scan) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 3805f6f8c536..f21922670471 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -106,7 +106,7 @@ public MonitorFunction( @Override public void initializeState(FunctionInitializationContext context) throws Exception { - this.scan = readBuilder.newStreamScan(); + this.scan = readBuilder.dropStats().newStreamScan(); this.checkpointState = context.getOperatorStateStore()