diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 64074a4c158c..e05cfde13191 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -37,6 +37,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Expire partitions. */ public class PartitionExpire { @@ -51,6 +52,7 @@ public class PartitionExpire { private LocalDateTime lastCheck; private final PartitionExpireStrategy strategy; private final boolean endInputCheckPartitionExpire; + private int maxExpires; public PartitionExpire( Duration expirationTime, @@ -68,6 +70,7 @@ public PartitionExpire( this.metastoreClient = metastoreClient; this.lastCheck = LocalDateTime.now(); this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; + this.maxExpires = Integer.MAX_VALUE; } public PartitionExpire( @@ -85,6 +88,11 @@ public PartitionExpire withLock(Lock lock) { return this; } + public PartitionExpire withMaxExpires(int maxExpires) { + this.maxExpires = maxExpires; + return this; + } + public List> expire(long commitIdentifier) { return expire(LocalDateTime.now(), commitIdentifier); } @@ -125,14 +133,16 @@ List> expire(LocalDateTime now, long commitIdentifier) { private List> doExpire( LocalDateTime expireDateTime, long commitIdentifier) { - List> expired = new ArrayList<>(); + List> partValues = new ArrayList<>(); for (PartitionEntry partition : strategy.selectExpiredPartitions(scan, expireDateTime)) { Object[] array = strategy.convertPartition(partition.partition()); - Map partString = strategy.toPartitionString(array); - expired.add(partString); - LOG.info("Expire Partition: {}", partString); + partValues.add(strategy.toPartitionValue(array)); } - if (!expired.isEmpty()) { + + List> expired = new ArrayList<>(); + if (!partValues.isEmpty()) { + expired = convertToPartitionString(partValues); + LOG.info("Expire Partition: {}", expired); if (metastoreClient != null) { deleteMetastorePartitions(expired); } @@ -153,4 +163,14 @@ private void deleteMetastorePartitions(List> partitions) { }); } } + + private List> convertToPartitionString(List> partValues) { + return partValues.stream() + .map(values -> String.join(",", values)) + .sorted() + .map(s -> s.split(",")) + .map(strategy::toPartitionString) + .limit(Math.min(partValues.size(), maxExpires)) + .collect(Collectors.toList()); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java index 529d8151046c..1912898e2a20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java @@ -26,6 +26,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -49,6 +50,14 @@ public Map toPartitionString(Object[] array) { return map; } + public List toPartitionValue(Object[] array) { + List list = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + list.add(array[i].toString()); + } + return list; + } + public Object[] convertPartition(BinaryRow partition) { return toObjectArrayConverter.convert(partition); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index abbf4f48698b..ee6075a927d3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -63,6 +63,10 @@ public String identifier() { name = "expire_strategy", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "max_expires", + type = @DataTypeHint("INTEGER"), + isOptional = true) }) public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call( ProcedureContext procedureContext, @@ -70,7 +74,8 @@ public String identifier() { String expirationTime, String timestampFormatter, String timestampPattern, - String expireStrategy) + String expireStrategy, + Integer maxExpires) throws Catalog.TableNotExistException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); FileStore fileStore = fileStoreTable.store(); @@ -93,6 +98,9 @@ public String identifier() { .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() ? new Row[] {Row.of("No expired partitions.")} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 71a6dc466ee0..243c71c5821a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -347,6 +347,61 @@ public void testPartitionExpireWithTimePartition() throws Exception { .containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99"); } + @Test + public void testSortAndLimitExpirePartition() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " hm STRING," + + " PRIMARY KEY (k, dt, hm) NOT ENFORCED" + + ") PARTITIONED BY (dt, hm) WITH (" + + " 'bucket' = '1'" + + ")"); + FileStoreTable table = paimonTable("T"); + // Test there are no expired partitions. + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')")) + .containsExactlyInAnyOrder("No expired partitions."); + + sql("INSERT INTO T VALUES ('3', '2024-06-02', '02:00')"); + sql("INSERT INTO T VALUES ('2', '2024-06-02', '01:00')"); + sql("INSERT INTO T VALUES ('4', '2024-06-03', '01:00')"); + sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')"); + // This partition never expires. + sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')"); + + Function consumerReadResult = + (InternalRow row) -> + row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2); + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder( + "1:2024-06-01:01:00", + "2:2024-06-02:01:00", + "3:2024-06-02:02:00", + "4:2024-06-03:01:00", + "Never-expire:9999-09-09:99:99"); + + // Show a list of expired partitions. + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd', max_expires => 3)")) + .containsExactly( + "dt=2024-06-01, hm=01:00", + "dt=2024-06-02, hm=01:00", + "dt=2024-06-02, hm=02:00"); + + assertThat(read(table, consumerReadResult)) + .containsExactly("4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"); + } + /** Return a list of expired partitions. */ public List callExpirePartitions(String callSql) { return sql(callSql).stream() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 81012abc9e0b..7b388227e5a4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -40,6 +40,7 @@ import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to expire partitions. */ @@ -51,7 +52,8 @@ public class ExpirePartitionsProcedure extends BaseProcedure { ProcedureParameter.required("expiration_time", StringType), ProcedureParameter.optional("timestamp_formatter", StringType), ProcedureParameter.optional("timestamp_pattern", StringType), - ProcedureParameter.optional("expire_strategy", StringType) + ProcedureParameter.optional("expire_strategy", StringType), + ProcedureParameter.optional("max_expires", IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -81,6 +83,7 @@ public InternalRow[] call(InternalRow args) { String timestampFormatter = args.isNullAt(2) ? null : args.getString(2); String timestampPattern = args.isNullAt(3) ? null : args.getString(3); String expireStrategy = args.isNullAt(4) ? null : args.getString(4); + Integer maxExpires = args.isNullAt(5) ? null : args.getInt(5); return modifyPaimonTable( tableIdent, table -> { @@ -105,6 +108,9 @@ public InternalRow[] call(InternalRow args) { .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() ? new InternalRow[] { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index db4696047f9d..4561e532f538 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -474,4 +474,81 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon procedure : sorted the expired partitions with max_expires.") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (k STRING, pt STRING, hm STRING) + |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1') + | PARTITIONED BY (pt,hm) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt", "hm") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // Show results : There are no expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("No expired partitions.") :: Nil + ) + + inputData.addData(("a", "2024-06-02", "02:00")) + stream.processAllAvailable() + inputData.addData(("b", "2024-06-02", "01:00")) + stream.processAllAvailable() + inputData.addData(("d", "2024-06-03", "01:00")) + stream.processAllAvailable() + inputData.addData(("c", "2024-06-01", "01:00")) + stream.processAllAvailable() + // this snapshot never expires. + inputData.addData(("Never-expire", "9999-09-09", "99:99")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-02", "02:00") :: Row("b", "2024-06-02", "01:00") :: Row( + "d", + "2024-06-03", + "01:00") :: Row("c", "2024-06-01", "01:00") :: Row( + "Never-expire", + "9999-09-09", + "99:99") :: Nil + ) + + // sorted result of limited expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd', max_expires => 3)"), + Row("pt=2024-06-01, hm=01:00") :: Row("pt=2024-06-02, hm=01:00") :: Row( + "pt=2024-06-02, hm=02:00") :: Nil + ) + + checkAnswer( + query(), + Row("d", "2024-06-03", "01:00") :: Row("Never-expire", "9999-09-09", "99:99") :: Nil) + } finally { + stream.stop() + } + } + } + } }