diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java index eab6cc288dec..913e07d8a9fd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java @@ -57,6 +57,8 @@ public static Map fillInPartitionOptions( dynamicOptions.put( CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(), String.valueOf(maxExpires)); } + // partition check interval is 0 + dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0"); return dynamicOptions; } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 759e45f2b3a2..1742024eed3c 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -29,12 +28,8 @@ import org.apache.flink.table.procedure.ProcedureContext; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; - -import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; /** A procedure to expire partitions. */ public class ExpirePartitionsProcedure extends ProcedureBase { @@ -88,26 +83,11 @@ public String[] call( FileStoreTable fileStoreTable = (FileStoreTable) table; FileStore fileStore = fileStoreTable.store(); - // check expiration time not null - Preconditions.checkNotNull( - fileStore.options().partitionExpireTime(), - "The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. "); - PartitionExpire partitionExpire = - new PartitionExpire( - fileStore.options().partitionExpireTime(), - Duration.ofMillis(0L), - createPartitionExpireStrategy( - fileStore.options(), fileStore.partitionType()), - fileStore.newScan(), - fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), - fileStore.options().partitionExpireMaxNum()); + fileStore.newPartitionExpire(fileStore.options().createCommitUser()); + Preconditions.checkNotNull( + partitionExpire, + "Both the partition expiration time and partition field can not be null."); List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 97d526f2bb9e..662be40fb16f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -33,12 +32,8 @@ import org.apache.flink.table.procedure.ProcedureContext; import org.apache.flink.types.Row; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; - -import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; /** A procedure to expire partitions. */ public class ExpirePartitionsProcedure extends ProcedureBase { @@ -90,31 +85,15 @@ public String identifier() { expirationTime, maxExpires, options); - Table table = table(tableId).copy(dynamicOptions); FileStoreTable fileStoreTable = (FileStoreTable) table; FileStore fileStore = fileStoreTable.store(); - // check expiration time not null - Preconditions.checkNotNull( - fileStore.options().partitionExpireTime(), - "The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. "); - PartitionExpire partitionExpire = - new PartitionExpire( - fileStore.options().partitionExpireTime(), - Duration.ofMillis(0L), - createPartitionExpireStrategy( - fileStore.options(), fileStore.partitionType()), - fileStore.newScan(), - fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), - fileStore.options().partitionExpireMaxNum()); + fileStore.newPartitionExpire(fileStore.options().createCommitUser()); + Preconditions.checkNotNull( + partitionExpire, + "Both the partition expiration time and partition field can not be null."); List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 78bc49f22f8f..0d7d35478320 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -484,7 +484,8 @@ public void testExpirePartitionsLoadTablePropsFirst() throws Exception { assertThatThrownBy(() -> sql("CALL sys.expire_partitions(`table` => 'default.T')")) .rootCause() .isInstanceOf(NullPointerException.class) - .hasMessageContaining("The partition expiration time is must been required"); + .hasMessageContaining( + "Both the partition expiration time and partition field can not be null."); // 'partition.timestamp-formatter' value using table property. // 'partition.expiration-time' value using procedure parameter. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index ed689a77e9e0..701637a688c2 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -19,7 +19,6 @@ package org.apache.paimon.spark.procedure; import org.apache.paimon.FileStore; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; @@ -33,12 +32,9 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; -import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -97,31 +93,15 @@ public InternalRow[] call(InternalRow args) { expirationTime, maxExpires, options); - table = table.copy(dynamicOptions); FileStoreTable fileStoreTable = (FileStoreTable) table; FileStore fileStore = fileStoreTable.store(); - // check expiration time not null - Preconditions.checkNotNull( - fileStore.options().partitionExpireTime(), - "The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. "); - PartitionExpire partitionExpire = - new PartitionExpire( - fileStore.options().partitionExpireTime(), - Duration.ofMillis(0L), - createPartitionExpireStrategy( - fileStore.options(), fileStore.partitionType()), - fileStore.newScan(), - fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), - fileStore.options().partitionExpireMaxNum()); + fileStore.newPartitionExpire(fileStore.options().createCommitUser()); + Preconditions.checkNotNull( + partitionExpire, + "Both the partition expiration time and partition field can not be null."); List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty()