diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 64074a4c158c..62a9b796476a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -37,12 +37,15 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Expire partitions. */ public class PartitionExpire { private static final Logger LOG = LoggerFactory.getLogger(PartitionExpire.class); + private static final String DELIMITER = ","; + private final Duration expirationTime; private final Duration checkInterval; private final FileStoreScan scan; @@ -51,6 +54,7 @@ public class PartitionExpire { private LocalDateTime lastCheck; private final PartitionExpireStrategy strategy; private final boolean endInputCheckPartitionExpire; + private int maxExpires; public PartitionExpire( Duration expirationTime, @@ -68,6 +72,7 @@ public PartitionExpire( this.metastoreClient = metastoreClient; this.lastCheck = LocalDateTime.now(); this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; + this.maxExpires = Integer.MAX_VALUE; } public PartitionExpire( @@ -85,6 +90,11 @@ public PartitionExpire withLock(Lock lock) { return this; } + public PartitionExpire withMaxExpires(int maxExpires) { + this.maxExpires = maxExpires; + return this; + } + public List> expire(long commitIdentifier) { return expire(LocalDateTime.now(), commitIdentifier); } @@ -125,14 +135,18 @@ List> expire(LocalDateTime now, long commitIdentifier) { private List> doExpire( LocalDateTime expireDateTime, long commitIdentifier) { - List> expired = new ArrayList<>(); - for (PartitionEntry partition : strategy.selectExpiredPartitions(scan, expireDateTime)) { + List partitionEntries = + strategy.selectExpiredPartitions(scan, expireDateTime); + List> expiredPartValues = new ArrayList<>(partitionEntries.size()); + for (PartitionEntry partition : partitionEntries) { Object[] array = strategy.convertPartition(partition.partition()); - Map partString = strategy.toPartitionString(array); - expired.add(partString); - LOG.info("Expire Partition: {}", partString); + expiredPartValues.add(strategy.toPartitionValue(array)); } - if (!expired.isEmpty()) { + + List> expired = new ArrayList<>(); + if (!expiredPartValues.isEmpty()) { + expired = convertToPartitionString(expiredPartValues); + LOG.info("Expire Partitions: {}", expired); if (metastoreClient != null) { deleteMetastorePartitions(expired); } @@ -153,4 +167,15 @@ private void deleteMetastorePartitions(List> partitions) { }); } } + + private List> convertToPartitionString( + List> expiredPartValues) { + return expiredPartValues.stream() + .map(values -> String.join(DELIMITER, values)) + .sorted() + .map(s -> s.split(DELIMITER)) + .map(strategy::toPartitionString) + .limit(Math.min(expiredPartValues.size(), maxExpires)) + .collect(Collectors.toList()); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java index 529d8151046c..2def5b43a959 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java @@ -26,6 +26,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -42,13 +43,21 @@ public PartitionExpireStrategy(RowType partitionType) { } public Map toPartitionString(Object[] array) { - Map map = new LinkedHashMap<>(); + Map map = new LinkedHashMap<>(partitionKeys.size()); for (int i = 0; i < partitionKeys.size(); i++) { map.put(partitionKeys.get(i), array[i].toString()); } return map; } + public List toPartitionValue(Object[] array) { + List list = new ArrayList<>(partitionKeys.size()); + for (int i = 0; i < partitionKeys.size(); i++) { + list.add(array[i].toString()); + } + return list; + } + public Object[] convertPartition(BinaryRow partition) { return toObjectArrayConverter.convert(partition); } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java new file mode 100644 index 000000000000..c0e5a65c49ef --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.operation.PartitionExpire; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.TimeUtils; + +import org.apache.flink.table.procedure.ProcedureContext; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; + +/** A procedure to expire partitions. */ +public class ExpirePartitionsProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "expire_partitions"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String expirationTime, + String timestampFormatter, + String timestampPattern, + String expireStrategy) + throws Catalog.TableNotExistException { + return call( + procedureContext, + tableId, + expirationTime, + timestampFormatter, + timestampPattern, + expireStrategy, + null); + } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String expirationTime, + String timestampFormatter, + String timestampPattern, + String expireStrategy, + Integer maxExpires) + throws Catalog.TableNotExistException { + FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); + FileStore fileStore = fileStoreTable.store(); + Map map = new HashMap<>(); + map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy); + map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); + map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); + + PartitionExpire partitionExpire = + new PartitionExpire( + TimeUtils.parseDuration(expirationTime), + Duration.ofMillis(0L), + createPartitionExpireStrategy( + CoreOptions.fromMap(map), fileStore.partitionType()), + fileStore.newScan(), + fileStore.newCommit(""), + Optional.ofNullable( + fileStoreTable + .catalogEnvironment() + .metastoreClientFactory()) + .map(MetastoreClient.Factory::create) + .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } + List> expired = partitionExpire.expire(Long.MAX_VALUE); + return expired == null || expired.isEmpty() + ? new String[] {"No expired partitions."} + : expired.stream() + .map( + x -> { + String r = x.toString(); + return r.substring(1, r.length() - 1); + }) + .toArray(String[]::new); + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java index da2eaef0bd49..a48de667bf3d 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java @@ -244,7 +244,10 @@ public void testExpirePartitionsProcedure() throws Exception { sql("INSERT INTO T VALUES ('1', '2024-06-01')"); sql("INSERT INTO T VALUES ('2', '9024-06-01')"); assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01"); - sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')"); + assertThat( + sql( + "CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')")) + .containsExactly(Row.of("dt=2024-06-01")); assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01"); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index abbf4f48698b..ee6075a927d3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -63,6 +63,10 @@ public String identifier() { name = "expire_strategy", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "max_expires", + type = @DataTypeHint("INTEGER"), + isOptional = true) }) public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call( ProcedureContext procedureContext, @@ -70,7 +74,8 @@ public String identifier() { String expirationTime, String timestampFormatter, String timestampPattern, - String expireStrategy) + String expireStrategy, + Integer maxExpires) throws Catalog.TableNotExistException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); FileStore fileStore = fileStoreTable.store(); @@ -93,6 +98,9 @@ public String identifier() { .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() ? new Row[] {Row.of("No expired partitions.")} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 71a6dc466ee0..bc2e84902f35 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -347,6 +347,61 @@ public void testPartitionExpireWithTimePartition() throws Exception { .containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99"); } + @Test + public void testSortAndLimitExpirePartition() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " hm STRING," + + " PRIMARY KEY (k, dt, hm) NOT ENFORCED" + + ") PARTITIONED BY (dt, hm) WITH (" + + " 'bucket' = '1'" + + ")"); + FileStoreTable table = paimonTable("T"); + // Test there are no expired partitions. + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')")) + .containsExactlyInAnyOrder("No expired partitions."); + + sql("INSERT INTO T VALUES ('3', '2024-06-02', '02:00')"); + sql("INSERT INTO T VALUES ('2', '2024-06-02', '01:00')"); + sql("INSERT INTO T VALUES ('4', '2024-06-03', '01:00')"); + sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')"); + // This partition never expires. + sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')"); + + Function consumerReadResult = + (InternalRow row) -> + row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2); + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder( + "1:2024-06-01:01:00", + "2:2024-06-02:01:00", + "3:2024-06-02:02:00", + "4:2024-06-03:01:00", + "Never-expire:9999-09-09:99:99"); + + // Show a list of expired partitions. + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd', max_expires => 3)")) + .containsExactly( + "dt=2024-06-01, hm=01:00", + "dt=2024-06-02, hm=01:00", + "dt=2024-06-02, hm=02:00"); + + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder("4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"); + } + /** Return a list of expired partitions. */ public List callExpirePartitions(String callSql) { return sql(callSql).stream() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 81012abc9e0b..7b388227e5a4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -40,6 +40,7 @@ import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to expire partitions. */ @@ -51,7 +52,8 @@ public class ExpirePartitionsProcedure extends BaseProcedure { ProcedureParameter.required("expiration_time", StringType), ProcedureParameter.optional("timestamp_formatter", StringType), ProcedureParameter.optional("timestamp_pattern", StringType), - ProcedureParameter.optional("expire_strategy", StringType) + ProcedureParameter.optional("expire_strategy", StringType), + ProcedureParameter.optional("max_expires", IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -81,6 +83,7 @@ public InternalRow[] call(InternalRow args) { String timestampFormatter = args.isNullAt(2) ? null : args.getString(2); String timestampPattern = args.isNullAt(3) ? null : args.getString(3); String expireStrategy = args.isNullAt(4) ? null : args.getString(4); + Integer maxExpires = args.isNullAt(5) ? null : args.getInt(5); return modifyPaimonTable( tableIdent, table -> { @@ -105,6 +108,9 @@ public InternalRow[] call(InternalRow args) { .metastoreClientFactory()) .map(MetastoreClient.Factory::create) .orElse(null)); + if (maxExpires != null) { + partitionExpire.withMaxExpires(maxExpires); + } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() ? new InternalRow[] { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index db4696047f9d..4561e532f538 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -474,4 +474,81 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon procedure : sorted the expired partitions with max_expires.") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (k STRING, pt STRING, hm STRING) + |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1') + | PARTITIONED BY (pt,hm) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt", "hm") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // Show results : There are no expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("No expired partitions.") :: Nil + ) + + inputData.addData(("a", "2024-06-02", "02:00")) + stream.processAllAvailable() + inputData.addData(("b", "2024-06-02", "01:00")) + stream.processAllAvailable() + inputData.addData(("d", "2024-06-03", "01:00")) + stream.processAllAvailable() + inputData.addData(("c", "2024-06-01", "01:00")) + stream.processAllAvailable() + // this snapshot never expires. + inputData.addData(("Never-expire", "9999-09-09", "99:99")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-02", "02:00") :: Row("b", "2024-06-02", "01:00") :: Row( + "d", + "2024-06-03", + "01:00") :: Row("c", "2024-06-01", "01:00") :: Row( + "Never-expire", + "9999-09-09", + "99:99") :: Nil + ) + + // sorted result of limited expired partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd', max_expires => 3)"), + Row("pt=2024-06-01, hm=01:00") :: Row("pt=2024-06-02, hm=01:00") :: Row( + "pt=2024-06-02, hm=02:00") :: Nil + ) + + checkAnswer( + query(), + Row("d", "2024-06-03", "01:00") :: Row("Never-expire", "9999-09-09", "99:99") :: Nil) + } finally { + stream.stop() + } + } + } + } }