Skip to content

Commit

Permalink
remove time_retained
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Sep 13, 2024
1 parent 6400f12 commit 72d2628
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TimeUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -46,8 +45,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("retain_max", IntegerType),
ProcedureParameter.optional("retain_min", IntegerType),
ProcedureParameter.optional("older_than", StringType),
ProcedureParameter.optional("max_deletes", IntegerType),
ProcedureParameter.optional("time_retained", StringType)
ProcedureParameter.optional("max_deletes", IntegerType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -78,8 +76,6 @@ public InternalRow[] call(InternalRow args) {
Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
String olderThanStr = args.isNullAt(3) ? null : args.getString(3);
Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
Duration timeRetained =
args.isNullAt(5) ? null : TimeUtils.parseDuration(args.getString(5));

return modifyPaimonTable(
tableIdent,
Expand All @@ -92,11 +88,7 @@ public InternalRow[] call(InternalRow args) {
if (retainMin != null) {
builder.snapshotRetainMin(retainMin);
}
if (!StringUtils.isBlank(olderThanStr) && timeRetained != null) {
throw new IllegalArgumentException(
"older_than and time_retained cannot be used together.");
}
if (!StringUtils.isBlank(olderThanStr)) {
if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
long olderThanMills;
// forward compatibility for timestamp type
if (StringUtils.isNumeric(olderThanStr)) {
Expand All @@ -115,9 +107,6 @@ public InternalRow[] call(InternalRow args) {
if (maxDeletes != null) {
builder.snapshotMaxDeletes(maxDeletes);
}
if (timeRetained != null) {
builder.snapshotTimeRetain(timeRetained);
}
int deleted = expireSnapshots.config(builder.build()).expire();
return new InternalRow[] {newInternalRow(deleted)};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,6 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
checkSnapshots(snapshotManager, 5, 5)
}

test("test new parameter time_retained") {
sql(
"CREATE TABLE T (a INT, b STRING) " +
"TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999' )")
val table = loadTable("T")
val snapshotManager = table.snapshotManager

// generate 5 snapshot
for (i <- 1 to 5) {
sql(s"INSERT INTO T VALUES ($i, '$i')")
}
checkSnapshots(snapshotManager, 1, 5)

// no snapshots expired
spark.sql(s"CALL paimon.sys.expire_snapshots(table => 'test.T', time_retained => '1h')")
checkSnapshots(snapshotManager, 1, 5)

// expire assert throw exception
val timestamp = snapshotManager.latestSnapshot().timeMillis
assertThrows[IllegalArgumentException] {
spark.sql(
s"CALL paimon.sys.expire_snapshots(table => 'test.T', older_than => '${timestamp.toString}', time_retained => '1h')")
}

// all snapshot are expired, keep latest snapshot
Thread.sleep(1000)
spark.sql(s"CALL paimon.sys.expire_snapshots(table => 'test.T', time_retained => '1s')")
checkSnapshots(snapshotManager, 5, 5)
}

def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
assertThat(sm.snapshotCount).isEqualTo(latest - earliest + 1)
assertThat(sm.earliestSnapshotId).isEqualTo(earliest)
Expand Down

0 comments on commit 72d2628

Please sign in to comment.