Skip to content

Commit

Permalink
[core] Retain number in Snapshot expire need guard check (#3203)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Apr 12, 2024
1 parent ac62732 commit 8bc4fae
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.Changelog;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -106,6 +107,9 @@ public int expire() {
return 0;
}

Preconditions.checkArgument(
retainMax >= retainMin, "retainMax must greater than retainMin.");

// the min snapshot to retain from 'changelog.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - retainMax + 1, earliestChangelogId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -106,6 +107,9 @@ public int expire() {
return 0;
}

Preconditions.checkArgument(
retainMax >= retainMin, "retainMax must greater than retainMin.");

// the min snapshot to retain from 'snapshot.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,57 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
}
}
}

test("Paimon Procedure: expire snapshots retainMax retainMin value check") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T ORDER BY a")

try {
// snapshot-1
inputData.addData((1, "a"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Nil)

// snapshot-2
inputData.addData((2, "b"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

// snapshot-3
inputData.addData((2, "b2"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)

// expire assert throw exception
assertThrows[IllegalArgumentException] {
spark.sql(
"CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 2, retain_min => 3)")
}
} finally {
stream.stop()
}
}
}
}
}

0 comments on commit 8bc4fae

Please sign in to comment.