diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 3f9b3de852c0..c3b6292c71d6 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -73,9 +73,11 @@ This section introduce all available spark procedures about paimon.
table: the target table identifier. Cannot be empty.
expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
timestamp_formatter: the formatter to format timestamp from string.
+ timestamp_pattern: the pattern to get a timestamp from partitions.
expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
- CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time') |
+ CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter =>
+'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time') |
create_tag |
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 7501a9926431..81012abc9e0b 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -50,6 +50,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("expiration_time", StringType),
ProcedureParameter.optional("timestamp_formatter", StringType),
+ ProcedureParameter.optional("timestamp_pattern", StringType),
ProcedureParameter.optional("expire_strategy", StringType)
};
@@ -78,7 +79,8 @@ public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String expirationTime = args.getString(1);
String timestampFormatter = args.isNullAt(2) ? null : args.getString(2);
- String expireStrategy = args.isNullAt(3) ? null : args.getString(3);
+ String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
+ String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
return modifyPaimonTable(
tableIdent,
table -> {
@@ -87,6 +89,7 @@ public InternalRow[] call(InternalRow args) {
Map map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);
PartitionExpire partitionExpire =
new PartitionExpire(
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 1872abf6c6c5..45d321dedbb8 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -404,4 +404,74 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
}
+
+ test("Paimon procedure : expire partitions with specified time-pattern partitions.") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (k STRING, pt STRING, hm STRING)
+ |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1')
+ | PARTITIONED BY (hm, pt)
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(String, String, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("k", "pt", "hm")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T")
+
+ try {
+ // Show results : There are no expired partitions.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
+ ", timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$pt')"),
+ Row("No expired partitions.") :: Nil
+ )
+
+ // snapshot-1
+ inputData.addData(("a", "2024-06-01", "01:00"))
+ stream.processAllAvailable()
+ // snapshot-2
+ inputData.addData(("b", "2024-06-02", "02:00"))
+ stream.processAllAvailable()
+ // snapshot-3, never expires.
+ inputData.addData(("Never-expire", "9999-09-09", "99:99"))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", "02:00") :: Row(
+ "Never-expire",
+ "9999-09-09",
+ "99:99") :: Nil)
+
+ // Show a list of expired partitions.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T'" +
+ ", expiration_time => '1 d'" +
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm'" +
+ ", timestamp_pattern => '$pt $hm')"),
+ Row("hm=01:00, pt=2024-06-01") :: Row("hm=02:00, pt=2024-06-02") :: Nil
+ )
+
+ checkAnswer(query(), Row("Never-expire", "9999-09-09", "99:99") :: Nil)
+
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}