From cc077b7a37ec296f907a59054f0245f52d949f8f Mon Sep 17 00:00:00 2001 From: HunterXHunter <1356469429@qq.com> Date: Wed, 26 Jun 2024 15:58:17 +0800 Subject: [PATCH] [core] Expire partition procedure: show a list of expired partitions (#3595) --- .../paimon/operation/PartitionExpire.java | 17 +++-- .../procedure/ExpirePartitionsProcedure.java | 17 ++++- .../ExpirePartitionsProcedureITCase.java | 63 +++++++++++++++- .../procedure/ExpirePartitionsProcedure.java | 23 ++++-- .../ExpirePartitionsProcedureTest.scala | 73 ++++++++++++++++++- 5 files changed, 173 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 223ac4d054b9..2491a51d20b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -82,8 +82,8 @@ public PartitionExpire withLock(Lock lock) { return this; } - public void expire(long commitIdentifier) { - expire(LocalDateTime.now(), commitIdentifier); + public List> expire(long commitIdentifier) { + return expire(LocalDateTime.now(), commitIdentifier); } @VisibleForTesting @@ -92,20 +92,24 @@ void setLastCheck(LocalDateTime time) { } @VisibleForTesting - void expire(LocalDateTime now, long commitIdentifier) { + List> expire(LocalDateTime now, long commitIdentifier) { if (checkInterval.isZero() || now.isAfter(lastCheck.plus(checkInterval))) { - doExpire(now.minus(expirationTime), commitIdentifier); + List> expired = + doExpire(now.minus(expirationTime), commitIdentifier); lastCheck = now; + return expired; } + return null; } - private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) { + private List> doExpire( + LocalDateTime expireDateTime, long commitIdentifier) { List> expired = new ArrayList<>(); for (BinaryRow partition : readPartitions(expireDateTime)) { Object[] array = toObjectArrayConverter.convert(partition); Map partString = toPartitionString(array); expired.add(partString); - LOG.info("Expire Partition: " + partition); + LOG.info("Expire Partition: {}", partition); } if (expired.size() > 0) { if (metastoreClient != null) { @@ -113,6 +117,7 @@ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) { } commit.dropPartitions(expired, commitIdentifier); } + return expired; } private void deleteMetastorePartitions(List> partitions) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index a114d12a1406..f38c9ec529d1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -29,8 +29,11 @@ import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.types.Row; import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Optional; /** A procedure to expire partitions. */ @@ -46,7 +49,7 @@ public String identifier() { @ArgumentHint(name = "expiration_time", type = @DataTypeHint(value = "STRING")), @ArgumentHint(name = "timestamp_formatter", type = @DataTypeHint("STRING")) }) - public String[] call( + public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call( ProcedureContext procedureContext, String tableId, String expirationTime, @@ -69,7 +72,15 @@ public String[] call( .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); - partitionExpire.expire(Long.MAX_VALUE); - return new String[] {}; + List> expired = partitionExpire.expire(Long.MAX_VALUE); + return expired == null || expired.isEmpty() + ? new Row[] {Row.of("No expired partitions.")} + : expired.stream() + .map( + x -> { + String r = x.toString(); + return Row.of(r.substring(1, r.length() - 1)); + }) + .toArray(Row[]::new); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 348ec36a551d..38199a536a46 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.CatalogITCaseBase; import org.apache.paimon.table.FileStoreTable; @@ -26,6 +27,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -44,18 +47,70 @@ public void testExpirePartitionsProcedure() throws Exception { + ")"); FileStoreTable table = paimonTable("T"); sql("INSERT INTO T VALUES ('1', '2024-06-01')"); + // Never expire. sql("INSERT INTO T VALUES ('2', '9024-06-01')"); - assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01"); + Function consumerReadResult = + (InternalRow row) -> row.getString(0) + ":" + row.getString(1); + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01"); sql( "CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')"); - assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01"); + assertThat(read(table, consumerReadResult)).containsExactlyInAnyOrder("2:9024-06-01"); } - private List read(FileStoreTable table) throws IOException { + @Test + public void testShowExpirePartitionsProcedureResults() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " hm STRING," + + " PRIMARY KEY (k, dt, hm) NOT ENFORCED" + + ") PARTITIONED BY (dt, hm) WITH (" + + " 'bucket' = '1'" + + ")"); + FileStoreTable table = paimonTable("T"); + // Test there are no expired partitions. + List result = + sql( + "CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + .stream() + .map(row -> row.getField(0).toString()) + .collect(Collectors.toList()); + assertThat(result).containsExactlyInAnyOrder("No expired partitions."); + + sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')"); + sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')"); + // Never expire. + sql("INSERT INTO T VALUES ('3', '9024-06-01', '03:00')"); + + Function consumerReadResult = + (InternalRow row) -> + row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2); + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder( + "1:2024-06-01:01:00", "2:2024-06-02:02:00", "3:9024-06-01:03:00"); + + result = + sql( + "CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + .stream() + .map(row -> row.getField(0).toString()) + .collect(Collectors.toList()); + // Show a list of expired partitions. + assertThat(result) + .containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00"); + + assertThat(read(table, consumerReadResult)).containsExactlyInAnyOrder("3:9024-06-01:03:00"); + } + + private List read( + FileStoreTable table, Function consumerReadResult) + throws IOException { List ret = new ArrayList<>(); table.newRead() .createReader(table.newScan().plan().splits()) - .forEachRemaining(row -> ret.add(row.getString(0) + ":" + row.getString(1))); + .forEachRemaining(row -> ret.add(consumerReadResult.apply(row))); return ret; } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 16089937fc7e..fda6e74216cd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -27,12 +27,14 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -50,7 +52,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + new StructField("expired_partitions", StringType, true, Metadata.empty()) }); protected ExpirePartitionsProcedure(TableCatalog tableCatalog) { @@ -92,9 +94,20 @@ public InternalRow[] call(InternalRow args) { .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); - partitionExpire.expire(Long.MAX_VALUE); - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; + List> expired = partitionExpire.expire(Long.MAX_VALUE); + return expired == null || expired.isEmpty() + ? new InternalRow[] { + newInternalRow(UTF8String.fromString("No expired partitions.")) + } + : expired.stream() + .map( + x -> { + String r = x.toString(); + return newInternalRow( + UTF8String.fromString( + r.substring(1, r.length() - 1))); + }) + .toArray(InternalRow[]::new); }); } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 547e77a6e51f..331142b5d0c1 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest -/** IT Case for expire partitions procedure. */ +/** IT Case for [[ExpirePartitionsProcedure]]. */ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ @@ -69,7 +69,8 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest spark.sql( "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + ", timestamp_formatter => 'yyyy-MM-dd')"), - Row(true) :: Nil) + Row("pt=2024-06-01") :: Nil + ) checkAnswer(query(), Row("b", "9024-06-01") :: Nil) @@ -79,4 +80,72 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon procedure : expire partitions show a list of expired partitions.") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (k STRING, pt STRING, hm STRING) + |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1') + | PARTITIONED BY (pt,hm) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt", "hm") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // Show results : There are no expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("No expired partitions.") :: Nil + ) + + // snapshot-1 + inputData.addData(("a", "2024-06-01", "01:00")) + stream.processAllAvailable() + // snapshot-2 + inputData.addData(("b", "2024-06-02", "02:00")) + stream.processAllAvailable() + // snapshot-3, never expires. + inputData.addData(("c", "9024-06-03", "03:00")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", "02:00") :: Row( + "c", + "9024-06-03", + "03:00") :: Nil) + + // expire + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("pt=2024-06-01, hm=01:00") :: Row("pt=2024-06-02, hm=02:00") :: Nil + ) + + checkAnswer(query(), Row("c", "9024-06-03", "03:00") :: Nil) + + } finally { + stream.stop() + } + } + } + } }