diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 8957b9996916..0b4ebba19cb4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -196,7 +196,8 @@ public SnapshotDeletion newSnapshotDeletion() { pathFactory(), manifestFileFactory().create(), manifestListFactory().create(), - newIndexFileHandler()); + newIndexFileHandler(), + newStatsFileHandler()); } @Override @@ -211,7 +212,8 @@ public TagDeletion newTagDeletion() { pathFactory(), manifestFileFactory().create(), manifestListFactory().create(), - newIndexFileHandler()); + newIndexFileHandler(), + newStatsFileHandler()); } public abstract Comparator newKeyComparator(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index e2c094ea0fca..2f7817f03203 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -29,6 +29,7 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FileUtils; @@ -64,6 +65,8 @@ public abstract class FileDeletionBase { protected final ManifestFile manifestFile; protected final ManifestList manifestList; protected final IndexFileHandler indexFileHandler; + protected final StatsFileHandler statsFileHandler; + protected final Map> deletionBuckets; protected final Executor ioExecutor; @@ -72,13 +75,14 @@ public FileDeletionBase( FileStorePathFactory pathFactory, ManifestFile manifestFile, ManifestList manifestList, - IndexFileHandler indexFileHandler) { + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler) { this.fileIO = fileIO; this.pathFactory = pathFactory; this.manifestFile = manifestFile; this.manifestList = manifestList; this.indexFileHandler = indexFileHandler; - + this.statsFileHandler = statsFileHandler; this.deletionBuckets = new HashMap<>(); this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL; } @@ -194,6 +198,11 @@ protected void cleanUnusedManifests( indexFileHandler.deleteManifest(indexManifest); } } + + // clean statistics + if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) { + statsFileHandler.deleteStats(snapshot.statistics()); + } } /** @@ -290,6 +299,11 @@ public Set manifestSkippingSet(List skippingSnapshots) { .map(IndexFileMeta::fileName) .forEach(skippingSet::add); } + + // statistics + if (skippingSnapshot.statistics() != null) { + skippingSet.add(skippingSnapshot.statistics()); + } } return skippingSet; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 19d6affa4aaf..4ddf96091a0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -229,6 +229,11 @@ private List getUsedFilesForSnapshot(Snapshot snapshot) { .map(IndexFileMeta::fileName) .forEach(files::add); } + + // try to read statistic + if (snapshot.statistics() != null) { + files.add(snapshot.statistics()); + } } catch (IOException e) { throw new RuntimeException(e); } @@ -315,6 +320,7 @@ private List listPaimonFileDirs() { paimonFileDirs.add(new Path(location, "manifest")); paimonFileDirs.add(new Path(location, "index")); + paimonFileDirs.add(new Path(location, "statistics")); paimonFileDirs.addAll(listAndCleanDataDirs(location, partitionKeysNum)); return paimonFileDirs; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index 084845a50dff..23224f1c1378 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.TagManager; @@ -58,8 +59,9 @@ public SnapshotDeletion( FileStorePathFactory pathFactory, ManifestFile manifestFile, ManifestList manifestList, - IndexFileHandler indexFileHandler) { - super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler); + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler) { + super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 96b0a3ee61c8..7ef258046d82 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.slf4j.Logger; @@ -51,8 +52,9 @@ public TagDeletion( FileStorePathFactory pathFactory, ManifestFile manifestFile, ManifestList manifestList, - IndexFileHandler indexFileHandler) { - super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler); + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler) { + super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java index b240c8e146db..0ca23fad5d96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java @@ -87,4 +87,8 @@ public void deleteStats(long snapshotId) { statsFile.delete(snapshot.statistics()); } } + + public void deleteStats(String statistic) { + statsFile.delete(statistic); + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index d0f4093e45ef..d8f121f6e133 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.Decimal +import org.apache.paimon.fs.{FileIO, Path} import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.stats.ColStats import org.apache.paimon.utils.DateTimeUtils @@ -26,6 +27,8 @@ import org.apache.spark.sql.Row import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Assertions +import java.util.concurrent.TimeUnit + abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { test("Paimon analyze: analyze table only") { @@ -244,4 +247,43 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { spark.sql(s"SELECT * from T ORDER BY id"), Row("1", "a", 1, 1) :: Row("2", "aaa", 1, 2) :: Nil) } + + test("Paimon analyze: statistics expire and clean") { + spark.sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + val table = loadTable("T") + val tableLocation = table.location() + val fileIO = table.fileIO() + + spark.sql(s"INSERT INTO T VALUES ('1', 'a')") + spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") + + spark.sql(s"INSERT INTO T VALUES ('2', 'b')") + spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") + Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO)) + + // test expire statistic + spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1)") + Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO)) + + val orphanStats = new Path(tableLocation, "statistics/stats-orphan-0") + fileIO.writeFileUtf8(orphanStats, "x") + Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO)) + + // test clean orhan statistic + Thread.sleep(1001) + val older_than = DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), + 3) + spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')") + Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO)) + } + + private def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = { + fileIO.listStatus(new Path(tableLocation, "statistics")).length + } }