Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Dec 18, 2024
1 parent 4460003 commit 75a3ae3
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public static Map<String, String> fillInPartitionOptions(
dynamicOptions.put(
CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(), String.valueOf(maxExpires));
}
// partition check interval is 0
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0");
return dynamicOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -29,12 +28,8 @@

import org.apache.flink.table.procedure.ProcedureContext;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;

/** A procedure to expire partitions. */
public class ExpirePartitionsProcedure extends ProcedureBase {
Expand Down Expand Up @@ -88,26 +83,11 @@ public String[] call(
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();

// check expiration time not null
Preconditions.checkNotNull(
fileStore.options().partitionExpireTime(),
"The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. ");

PartitionExpire partitionExpire =
new PartitionExpire(
fileStore.options().partitionExpireTime(),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
fileStore.options(), fileStore.partitionType()),
fileStore.newScan(),
fileStore.newCommit(""),
Optional.ofNullable(
fileStoreTable
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null),
fileStore.options().partitionExpireMaxNum());
fileStore.newPartitionExpire(fileStore.options().createCommitUser());
Preconditions.checkNotNull(
partitionExpire,
"Both the partition expiration time and partition field can not be null.");

List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -33,12 +32,8 @@
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;

/** A procedure to expire partitions. */
public class ExpirePartitionsProcedure extends ProcedureBase {
Expand Down Expand Up @@ -90,31 +85,15 @@ public String identifier() {
expirationTime,
maxExpires,
options);

Table table = table(tableId).copy(dynamicOptions);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();

// check expiration time not null
Preconditions.checkNotNull(
fileStore.options().partitionExpireTime(),
"The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. ");

PartitionExpire partitionExpire =
new PartitionExpire(
fileStore.options().partitionExpireTime(),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
fileStore.options(), fileStore.partitionType()),
fileStore.newScan(),
fileStore.newCommit(""),
Optional.ofNullable(
fileStoreTable
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null),
fileStore.options().partitionExpireMaxNum());
fileStore.newPartitionExpire(fileStore.options().createCommitUser());
Preconditions.checkNotNull(
partitionExpire,
"Both the partition expiration time and partition field can not be null.");

List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
assertThatThrownBy(() -> sql("CALL sys.expire_partitions(`table` => 'default.T')"))
.rootCause()
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("The partition expiration time is must been required");
.hasMessageContaining(
"Both the partition expiration time and partition field can not be null.");

// 'partition.timestamp-formatter' value using table property.
// 'partition.expiration-time' value using procedure parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.spark.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -33,12 +32,9 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;

Expand Down Expand Up @@ -97,31 +93,15 @@ public InternalRow[] call(InternalRow args) {
expirationTime,
maxExpires,
options);

table = table.copy(dynamicOptions);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();

// check expiration time not null
Preconditions.checkNotNull(
fileStore.options().partitionExpireTime(),
"The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. ");

PartitionExpire partitionExpire =
new PartitionExpire(
fileStore.options().partitionExpireTime(),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
fileStore.options(), fileStore.partitionType()),
fileStore.newScan(),
fileStore.newCommit(""),
Optional.ofNullable(
fileStoreTable
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null),
fileStore.options().partitionExpireMaxNum());
fileStore.newPartitionExpire(fileStore.options().createCommitUser());
Preconditions.checkNotNull(
partitionExpire,
"Both the partition expiration time and partition field can not be null.");

List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down

0 comments on commit 75a3ae3

Please sign in to comment.