Skip to content

Commit

Permalink
load table props first
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Dec 7, 2024
1 parent 8484bb4 commit 1172cc2
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ public PartitionExpire withLock(Lock lock) {
return this;
}

public PartitionExpire withMaxExpireNum(int maxExpireNum) {
this.maxExpireNum = maxExpireNum;
return this;
}

public List<Map<String, String>> expire(long commitIdentifier) {
return expire(LocalDateTime.now(), commitIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.TimeUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.procedure.ProcedureContext;

Expand Down Expand Up @@ -61,6 +64,7 @@ public String[] call(
timestampFormatter,
timestampPattern,
expireStrategy,
null,
null);
}

Expand All @@ -71,21 +75,45 @@ public String[] call(
String timestampFormatter,
String timestampPattern,
String expireStrategy,
Integer maxExpires)
Integer maxExpires,
String options)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
Map<String, String> dynamicOptions = new HashMap<>();
if (!StringUtils.isNullOrWhitespaceOnly(expireStrategy)) {
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
}
if (!StringUtils.isNullOrWhitespaceOnly(timestampFormatter)) {
dynamicOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
}
if (!StringUtils.isNullOrWhitespaceOnly(timestampPattern)) {
dynamicOptions.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);
}
if (!StringUtils.isNullOrWhitespaceOnly(expirationTime)) {
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), expirationTime);
}
if (maxExpires != null) {
dynamicOptions.put(
CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(), String.valueOf(maxExpires));
}
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}

Table table = table(tableId).copy(dynamicOptions);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

// check expiration time not null
Preconditions.checkNotNull(
fileStore.options().partitionExpireTime(),
"The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. ");

PartitionExpire partitionExpire =
new PartitionExpire(
TimeUtils.parseDuration(expirationTime),
fileStore.options().partitionExpireTime(),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
CoreOptions.fromMap(map), fileStore.partitionType()),
fileStore.options(), fileStore.partitionType()),
fileStore.newScan(),
fileStore.newCommit(""),
Optional.ofNullable(
Expand All @@ -95,9 +123,7 @@ public String[] call(
.map(MetastoreClient.Factory::create)
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpireNum(maxExpires);
}

List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new String[] {"No expired partitions."}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.TimeUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
Expand All @@ -50,7 +53,10 @@ public String identifier() {
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "expiration_time", type = @DataTypeHint(value = "STRING")),
@ArgumentHint(
name = "expiration_time",
type = @DataTypeHint(value = "STRING"),
isOptional = true),
@ArgumentHint(
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
Expand All @@ -66,7 +72,8 @@ public String identifier() {
@ArgumentHint(
name = "max_expires",
type = @DataTypeHint("INTEGER"),
isOptional = true)
isOptional = true),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
})
public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call(
ProcedureContext procedureContext,
Expand All @@ -75,21 +82,45 @@ public String identifier() {
String timestampFormatter,
String timestampPattern,
String expireStrategy,
Integer maxExpires)
Integer maxExpires,
String options)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
Map<String, String> dynamicOptions = new HashMap<>();
if (!StringUtils.isNullOrWhitespaceOnly(expireStrategy)) {
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
}
if (!StringUtils.isNullOrWhitespaceOnly(timestampFormatter)) {
dynamicOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
}
if (!StringUtils.isNullOrWhitespaceOnly(timestampPattern)) {
dynamicOptions.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);
}
if (!StringUtils.isNullOrWhitespaceOnly(expirationTime)) {
dynamicOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), expirationTime);
}
if (maxExpires != null) {
dynamicOptions.put(
CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(), String.valueOf(maxExpires));
}
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}

Table table = table(tableId).copy(dynamicOptions);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

// check expiration time not null
Preconditions.checkNotNull(
fileStore.options().partitionExpireTime(),
"The partition expiration time is must been required, you can set it by configuring the property 'partition.expiration-time' or adding the 'expiration_time' parameter in procedure. ");

PartitionExpire partitionExpire =
new PartitionExpire(
TimeUtils.parseDuration(expirationTime),
fileStore.options().partitionExpireTime(),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
CoreOptions.fromMap(map), fileStore.partitionType()),
fileStore.options(), fileStore.partitionType()),
fileStore.newScan(),
fileStore.newCommit(""),
Optional.ofNullable(
Expand All @@ -99,9 +130,7 @@ public String identifier() {
.map(MetastoreClient.Factory::create)
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpireNum(maxExpires);
}

List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new Row[] {Row.of("No expired partitions.")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT Case for {@link ExpirePartitionsProcedure}. */
public class ExpirePartitionsProcedureITCase extends CatalogITCaseBase {
Expand Down Expand Up @@ -452,6 +453,90 @@ public void testExpirePartitionsWithDefaultNum() throws Exception {
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

@Test
public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1', "
+ " 'write-only' = 'true', "
+ " 'partition.timestamp-formatter' = 'yyyy-MM-dd', "
+ " 'partition.expiration-max-num'='2'"
+ ")");
FileStoreTable table = paimonTable("T");

sql("INSERT INTO T VALUES ('a', '2024-06-01')");
sql("INSERT INTO T VALUES ('b', '2024-06-02')");
sql("INSERT INTO T VALUES ('c', '2024-06-03')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09");

// no 'partition.expiration-time' value in table property or procedure parameter.
assertThatThrownBy(() -> sql("CALL sys.expire_partitions(`table` => 'default.T')"))
.rootCause()
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("The partition expiration time is must been required");

// 'partition.timestamp-formatter' value using table property.
// 'partition.expiration-time' value using procedure parameter.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

@Test
public void testExpirePartitionsUseOptionsParam() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");

sql("INSERT INTO T VALUES ('a', '2024-06-01')");
sql("INSERT INTO T VALUES ('b', '2024-06-02')");
sql("INSERT INTO T VALUES ('c', '2024-06-03')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09");

// set conf in options.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", options => 'partition.expiration-time = 1d,"
+ " partition.expiration-max-num = 2, "
+ " partition.timestamp-formatter = yyyy-MM-dd')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down
Loading

0 comments on commit 1172cc2

Please sign in to comment.