From 1c5dbffadc0f8f038ae33edb1774b1b42ad313b1 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Tue, 12 Nov 2024 23:28:34 +0800 Subject: [PATCH] [core] support delete stats in result of scan plan --- .../org/apache/paimon/io/DataFileMeta.java | 21 +++++++++++++++++ .../apache/paimon/manifest/ManifestEntry.java | 4 ++++ .../operation/AbstractFileStoreScan.java | 13 +++++++++++ .../paimon/operation/FileStoreScan.java | 2 ++ .../table/source/AbstractDataTableScan.java | 6 +++++ .../paimon/table/source/InnerTableScan.java | 5 ++++ .../table/source/snapshot/SnapshotReader.java | 2 ++ .../source/snapshot/SnapshotReaderImpl.java | 6 +++++ .../paimon/table/system/AuditLogTable.java | 6 +++++ .../paimon/table/system/FilesTable.java | 2 ++ .../operation/KeyValueFileStoreScanTest.java | 23 +++++++++++++++++++ .../paimon/table/system/FilesTableTest.java | 1 + 12 files changed, 91 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index b6cac5ae51304..5291db88bf8ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -409,6 +409,27 @@ public DataFileMeta rename(String newFileName) { valueStatsCols); } + public DataFileMeta withoutStats() { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + EMPTY_STATS, + EMPTY_STATS, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index f7c5c4639a6f2..30330f67dbb22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -121,6 +121,10 @@ public Identifier identifier() { file.embeddedIndex()); } + public ManifestEntry withoutStats() { + return new ManifestEntry(kind, partition, bucket, totalBuckets, file.withoutStats()); + } + @Override public boolean equals(Object o) { if (!(o instanceof ManifestEntry)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 683e6ffda481e..5f259e8283a4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -90,6 +90,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private ManifestCacheFilter manifestCacheFilter = null; private ScanMetrics scanMetrics = null; + private boolean withoutStatsInPlan; public AbstractFileStoreScan( ManifestsReader manifestsReader, @@ -105,6 +106,7 @@ public AbstractFileStoreScan( this.manifestFileFactory = manifestFileFactory; this.tableSchemas = new ConcurrentHashMap<>(); this.parallelism = parallelism; + this.withoutStatsInPlan = true; } @Override @@ -215,6 +217,12 @@ public FileStoreScan withMetrics(ScanMetrics metrics) { return this; } + @Override + public FileStoreScan withoutStatsInPlan(boolean withoutStatsInPlan) { + this.withoutStatsInPlan = withoutStatsInPlan; + return this; + } + @Nullable @Override public Integer parallelism() { @@ -291,6 +299,11 @@ public Snapshot snapshot() { @Override public List files() { + if (withoutStatsInPlan) { + return files.stream() + .map(ManifestEntry::withoutStats) + .collect(Collectors.toList()); + } return files; } }; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index bc0d7ff27301a..1d6e44d5204b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -81,6 +81,8 @@ public interface FileStoreScan { FileStoreScan withMetrics(ScanMetrics metrics); + FileStoreScan withoutStatsInPlan(boolean withoutStatsInPlan); + @Nullable Integer parallelism(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 6a8aa9265e5c6..dca3dbb8844e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -102,6 +102,12 @@ public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry) return this; } + @Override + public AbstractDataTableScan withoutStatsInPlan(boolean withoutStatsInPlan) { + snapshotReader.withoutStatsInPlan(withoutStatsInPlan); + return this; + } + public CoreOptions options() { return options; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 00a4fc0cde18b..83f482a644f6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -55,4 +55,9 @@ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) { // do nothing, should implement this if need return this; } + + default InnerTableScan withoutStatsInPlan(boolean withoutStatsInPlan) { + // do nothing, should implement this if need + return this; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 2dd02be04f7c5..4eac2ec58a2c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -85,6 +85,8 @@ public interface SnapshotReader { SnapshotReader withDataFileNameFilter(Filter fileNameFilter); + SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan); + SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks); SnapshotReader withMetricRegistry(MetricRegistry registry); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index f4591734b68e3..786f3af46fa1b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -264,6 +264,12 @@ public SnapshotReader withDataFileNameFilter(Filter fileNameFilter) { return this; } + @Override + public SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan) { + scan.withoutStatsInPlan(withoutStatsInPlan); + return this; + } + @Override public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { if (splitGenerator.alwaysRawConvertible()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index e0acd9fb38ea6..2b3aafc0ff893 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -342,6 +342,12 @@ public SnapshotReader withDataFileNameFilter(Filter fileNameFilter) { return this; } + @Override + public SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan) { + wrapped.withoutStatsInPlan(withoutStatsInPlan); + return this; + } + @Override public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 53d2078126737..be327dd3635a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -267,6 +267,8 @@ private TableScan.Plan tablePlan(FileStoreTable storeTable) { GenericRow.of(null, null, null, null, null, level)); }); } + + scan.withoutStatsInPlan(false); return scan.plan(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index ce17450538b1b..36eed12ade937 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link KeyValueFileStoreScan}. */ @@ -274,6 +275,28 @@ public void testWithManifestList() throws Exception { runTestExactMatch(scan, null, expected); } + @Test + public void testWithoutStatsInPlan() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(100, 0, (long) Math.abs(random.nextInt(1000))); + writeData(data, 0); + data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000); + writeData(data, 0); + data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000); + writeData(data, 0); + data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000); + Snapshot snapshot = writeData(data, 0); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + List files = scan.plan().files(); + + for (ManifestEntry manifestEntry : files) { + assertThat(manifestEntry.file().keyStats()).isEqualTo(EMPTY_STATS); + assertThat(manifestEntry.file().valueStats()).isEqualTo(EMPTY_STATS); + } + } + private void runTestExactMatch( FileStoreScan scan, Long expectedSnapshotId, Map expected) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java index 1a692270ac5be..abde5c6bccb82 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java @@ -87,6 +87,7 @@ public void before() throws Exception { SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema); table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); scan = table.store().newScan(); + scan.withoutStatsInPlan(false); Identifier filesTableId = identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER + FilesTable.FILES);