Skip to content

Commit

Permalink
[spark] Support specifying time-pattern in ExpairePartition procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
baokainan committed Aug 5, 2024
1 parent eb8b113 commit c577f1a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ This section introduce all available spark procedures about paimon.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
<li>timestamp_pattern: the pattern to get a timestamp from partitions.</li>
<li>expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.</li>
</td>
<td>CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')</td>
<td>CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter =>
'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')</td>
</tr>
<tr>
<td>create_tag</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("expiration_time", StringType),
ProcedureParameter.optional("timestamp_formatter", StringType),
ProcedureParameter.optional("timestamp_pattern", StringType),
ProcedureParameter.optional("expire_strategy", StringType)
};

Expand Down Expand Up @@ -78,7 +79,8 @@ public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String expirationTime = args.getString(1);
String timestampFormatter = args.isNullAt(2) ? null : args.getString(2);
String expireStrategy = args.isNullAt(3) ? null : args.getString(3);
String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
return modifyPaimonTable(
tableIdent,
table -> {
Expand All @@ -87,6 +89,7 @@ public InternalRow[] call(InternalRow args) {
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

PartitionExpire partitionExpire =
new PartitionExpire(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,74 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
}

test("Paimon procedure : expire partitions with specified time-pattern partitions.") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
spark.sql(s"""
|CREATE TABLE T (k STRING, pt STRING, hm STRING)
|TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1')
| PARTITIONED BY (hm, pt)
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T")

try {
// Show results : There are no expired partitions.
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$pt')"),
Row("No expired partitions.") :: Nil
)

// snapshot-1
inputData.addData(("a", "2024-06-01", "01:00"))
stream.processAllAvailable()
// snapshot-2
inputData.addData(("b", "2024-06-02", "02:00"))
stream.processAllAvailable()
// snapshot-3, never expires.
inputData.addData(("Never-expire", "9999-09-09", "99:99"))
stream.processAllAvailable()

checkAnswer(
query(),
Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", "02:00") :: Row(
"Never-expire",
"9999-09-09",
"99:99") :: Nil)

// Show a list of expired partitions.
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T'" +
", expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd HH:mm'" +
", timestamp_pattern => '$pt $hm')"),
Row("hm=01:00, pt=2024-06-01") :: Row("hm=02:00, pt=2024-06-02") :: Nil
)

checkAnswer(query(), Row("Never-expire", "9999-09-09", "99:99") :: Nil)

} finally {
stream.stop()
}
}
}
}
}

0 comments on commit c577f1a

Please sign in to comment.