Skip to content

Commit

Permalink
[core] Add statistic expire and clean (#2777)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jan 24, 2024
1 parent 7531f18 commit cedccd5
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public SnapshotDeletion newSnapshotDeletion() {
pathFactory(),
manifestFileFactory().create(),
manifestListFactory().create(),
newIndexFileHandler());
newIndexFileHandler(),
newStatsFileHandler());
}

@Override
Expand All @@ -211,7 +212,8 @@ public TagDeletion newTagDeletion() {
pathFactory(),
manifestFileFactory().create(),
manifestListFactory().create(),
newIndexFileHandler());
newIndexFileHandler(),
newStatsFileHandler());
}

public abstract Comparator<InternalRow> newKeyComparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;

Expand Down Expand Up @@ -64,6 +65,8 @@ public abstract class FileDeletionBase {
protected final ManifestFile manifestFile;
protected final ManifestList manifestList;
protected final IndexFileHandler indexFileHandler;
protected final StatsFileHandler statsFileHandler;

protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
protected final Executor ioExecutor;

Expand All @@ -72,13 +75,14 @@ public FileDeletionBase(
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler) {
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
this.manifestFile = manifestFile;
this.manifestList = manifestList;
this.indexFileHandler = indexFileHandler;

this.statsFileHandler = statsFileHandler;
this.deletionBuckets = new HashMap<>();
this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL;
}
Expand Down Expand Up @@ -194,6 +198,11 @@ protected void cleanUnusedManifests(
indexFileHandler.deleteManifest(indexManifest);
}
}

// clean statistics
if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) {
statsFileHandler.deleteStats(snapshot.statistics());
}
}

/**
Expand Down Expand Up @@ -290,6 +299,11 @@ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
.map(IndexFileMeta::fileName)
.forEach(skippingSet::add);
}

// statistics
if (skippingSnapshot.statistics() != null) {
skippingSet.add(skippingSnapshot.statistics());
}
}

return skippingSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ private List<String> getUsedFilesForSnapshot(Snapshot snapshot) {
.map(IndexFileMeta::fileName)
.forEach(files::add);
}

// try to read statistic
if (snapshot.statistics() != null) {
files.add(snapshot.statistics());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -315,6 +320,7 @@ private List<Path> listPaimonFileDirs() {

paimonFileDirs.add(new Path(location, "manifest"));
paimonFileDirs.add(new Path(location, "index"));
paimonFileDirs.add(new Path(location, "statistics"));
paimonFileDirs.addAll(listAndCleanDataDirs(location, partitionKeysNum));

return paimonFileDirs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -58,8 +59,9 @@ public SnapshotDeletion(
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler);
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileStorePathFactory;

import org.slf4j.Logger;
Expand All @@ -51,8 +52,9 @@ public TagDeletion(
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler);
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ public void deleteStats(long snapshotId) {
statsFile.delete(snapshot.statistics());
}
}

public void deleteStats(String statistic) {
statsFile.delete(statistic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.data.Decimal
import org.apache.paimon.fs.{FileIO, Path}
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.stats.ColStats
import org.apache.paimon.utils.DateTimeUtils
Expand All @@ -26,6 +27,8 @@ import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Assertions

import java.util.concurrent.TimeUnit

abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

test("Paimon analyze: analyze table only") {
Expand Down Expand Up @@ -244,4 +247,43 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
spark.sql(s"SELECT * from T ORDER BY id"),
Row("1", "a", 1, 1) :: Row("2", "aaa", 1, 2) :: Nil)
}

test("Paimon analyze: statistics expire and clean") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

val table = loadTable("T")
val tableLocation = table.location()
val fileIO = table.fileIO()

spark.sql(s"INSERT INTO T VALUES ('1', 'a')")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

spark.sql(s"INSERT INTO T VALUES ('2', 'b')")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")
Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO))

// test expire statistic
spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1)")
Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO))

val orphanStats = new Path(tableLocation, "statistics/stats-orphan-0")
fileIO.writeFileUtf8(orphanStats, "x")
Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO))

// test clean orhan statistic
Thread.sleep(1001)
val older_than = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')")
Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO))
}

private def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
fileIO.listStatus(new Path(tableLocation, "statistics")).length
}
}

0 comments on commit cedccd5

Please sign in to comment.