From c577f1a48e01bb3dea504e5f2ed822140858c8de Mon Sep 17 00:00:00 2001 From: baokainan Date: Mon, 5 Aug 2024 16:55:30 +0800 Subject: [PATCH] [spark] Support specifying time-pattern in ExpairePartition procedure --- docs/content/spark/procedures.md | 4 +- .../procedure/ExpirePartitionsProcedure.java | 5 +- .../ExpirePartitionsProcedureTest.scala | 70 +++++++++++++++++++ 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 3f9b3de852c0..c3b6292c71d6 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -73,9 +73,11 @@ This section introduce all available spark procedures about paimon.
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • +
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
  • - CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time') + CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => +'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time') create_tag diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 7501a9926431..81012abc9e0b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -50,6 +50,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure { ProcedureParameter.required("table", StringType), ProcedureParameter.required("expiration_time", StringType), ProcedureParameter.optional("timestamp_formatter", StringType), + ProcedureParameter.optional("timestamp_pattern", StringType), ProcedureParameter.optional("expire_strategy", StringType) }; @@ -78,7 +79,8 @@ public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String expirationTime = args.getString(1); String timestampFormatter = args.isNullAt(2) ? null : args.getString(2); - String expireStrategy = args.isNullAt(3) ? null : args.getString(3); + String timestampPattern = args.isNullAt(3) ? null : args.getString(3); + String expireStrategy = args.isNullAt(4) ? null : args.getString(4); return modifyPaimonTable( tableIdent, table -> { @@ -87,6 +89,7 @@ public InternalRow[] call(InternalRow args) { Map map = new HashMap<>(); map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy); map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); + map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); PartitionExpire partitionExpire = new PartitionExpire( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 1872abf6c6c5..45d321dedbb8 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -404,4 +404,74 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon procedure : expire partitions with specified time-pattern partitions.") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (k STRING, pt STRING, hm STRING) + |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1') + | PARTITIONED BY (hm, pt) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt", "hm") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // Show results : There are no expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$pt')"), + Row("No expired partitions.") :: Nil + ) + + // snapshot-1 + inputData.addData(("a", "2024-06-01", "01:00")) + stream.processAllAvailable() + // snapshot-2 + inputData.addData(("b", "2024-06-02", "02:00")) + stream.processAllAvailable() + // snapshot-3, never expires. + inputData.addData(("Never-expire", "9999-09-09", "99:99")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", "02:00") :: Row( + "Never-expire", + "9999-09-09", + "99:99") :: Nil) + + // Show a list of expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd HH:mm'" + + ", timestamp_pattern => '$pt $hm')"), + Row("hm=01:00, pt=2024-06-01") :: Row("hm=02:00, pt=2024-06-02") :: Nil + ) + + checkAnswer(query(), Row("Never-expire", "9999-09-09", "99:99") :: Nil) + + } finally { + stream.stop() + } + } + } + } }