Skip to content

Commit

Permalink
ExpireSnapshotsProcedureTest/ExpireSnapshotsProcedureITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jul 17, 2024
1 parent 27a18a5 commit 1e9cd21
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public int expire() {
// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
// askwang-todo: 考虑 maxDelete 后,min 就有可能大于 maxExclusive,则直接 expireUtil(1,5)
// A:好像同时这么retainMin和retainMax和 maxDelete 的可能性能小。
maxExclusive = Math.min(maxExclusive, earliest + maxDeletes);

// snapshot(i).timeMillis + snapshot.time-retained >= system.currentTime
Expand Down Expand Up @@ -171,7 +172,8 @@ public int expireUntil(long earliestId, long endExclusiveId) {
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();

// 删除文件:删除一个 snapshot 下的文件,前提条件是下一个 snapshot 没有使用该文件
// askwang-todo: 为什么要这么遍历?
// askwang-done: 为什么要这么遍历?
// A:下一个 snapshot 的 deltaManifestList 记录决定是否有删除当前 snapshot 文件的操作。
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ public static List<Snapshot> findOverlappedSnapshots(
List<Snapshot> snapshots = new ArrayList<>();
int right = findPreviousTag(taggedSnapshots, endExclusive);
if (right >= 0) {
// askwang-todo: left 会多一个无效的值,参考 findNextOrEqualTagAskwang
// askwang-done: left 会多一个无效的值,参考 findNextOrEqualTagAskwang
// https://github.com/apache/paimon/pull/3755
int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0);
for (int i = left; i <= right; i++) {
snapshots.add(taggedSnapshots.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
/** A procedure to expire snapshots. */
public class ExpireSnapshotsProcedure extends BaseProcedure {

// askwang-todo: older_than 类型改为 StringType
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.paimon.spark.procedure

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.paimon.utils.SnapshotManager
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest

import java.sql.Timestamp

class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._
Expand All @@ -33,10 +36,11 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
withTempDir {
checkpointDir =>
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
spark.sql(
s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(Int, String)]
Expand Down Expand Up @@ -89,10 +93,11 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
withTempDir {
checkpointDir =>
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
spark.sql(
s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(Int, String)]
Expand Down Expand Up @@ -139,14 +144,15 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {

// min maybe large than maxExclusive
test("Paimon Procedure: max delete") {
// failAfter(streamingTimeout) {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'write-only'='true')
|""".stripMargin)
spark.sql(
s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'write-only'='true')
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(Int, String)]
Expand Down Expand Up @@ -208,11 +214,64 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
stream.processAllAvailable()

spark.sql(
"CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 5, retain_min => 2, max_deletes => 4)")
"CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 5, retain_min => 2, max_deletes => 4)")
} finally {
stream.stop()
}
}
// }
}
}

// ExpireSnapshotsProcedureITCase. test没通过.
test("copy ExpireSnapshotsProcedureITCase") {
withTable("word_count") {
withSQLConf() {
// spark.sql("SET TIME ZONE 'Asia/Shanghai';")
spark.sql(
s"CREATE TABLE word_count ( word STRING, cnt INT) using paimon " +
s"TBLPROPERTIES ('primary-key' = 'word', 'file.format'='parquet', 'num-sorted-run.compaction-trigger' = '9999')")

val table = loadTable("word_count")
val snapshotManager = table.snapshotManager()

// initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6)
for (i <- 0 until 6) {
sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")")
}

sql("select * from word_count").show

checkSnapshots(snapshotManager, 1, 6)

// retain_max => 5, expected snapshots (2, 3, 4, 5, 6)
sql("CALL sys.expire_snapshots(`table` => 'word_count', retain_max => 5)")
checkSnapshots(snapshotManager, 2, 6)

// askwang-todo: ts6 作为 procedure 参数一直不通过(如何传递 timestampType 类型)
val ts6 = new Timestamp(snapshotManager.latestSnapshot.timeMillis)
println("current time: " + System.currentTimeMillis())
println("ts6 : " + ts6)
// older_than => timestamp of snapshot 6, max_deletes => 1, expected snapshots (3, 4, 5, 6)
// earliest=2, min=min(max, 2)=2, max=max(6-1+1, 2+1)=3,所以删除snapshot-2
sql("CALL sys.expire_snapshots(`table` => 'word_count', older_than => '" + ts6 + "', max_deletes => 1)")
checkSnapshots(snapshotManager, 3, 6)

// older_than => timestamp of snapshot 6, retain_min => 3, expected snapshots (4, 5, 6)
// earliest=3, min=min(max,3)=3, max=min(6-3+1,max)=4,所以删除 snapshot-3
sql("CALL sys.expire_snapshots(`table` => 'word_count', older_than => '" + ts6 + "', retain_min => 3)")
checkSnapshots(snapshotManager, 4, 6)

// older_than => timestamp of snapshot 6, expected snapshots (6)
// min=min(max,4)=4, max=min(6-1+1, max)=6,所以删除 snapshot-4和snapshot-5
sql("CALL sys.expire_snapshots(`table` => 'word_count', older_than => '" + ts6 + "')")
checkSnapshots(snapshotManager, 6, 6)
}
}
}

def checkSnapshots(sm: SnapshotManager, earliest: Int, lastest: Int): Unit = {
assert(sm.snapshotCount() == (lastest - earliest + 1))
assert(sm.earliestSnapshotId() == earliest)
assert(sm.latestSnapshotId() == lastest)
}
}

0 comments on commit 1e9cd21

Please sign in to comment.