Skip to content

Commit

Permalink
[core] Expire partiitons add default delete num (#4652)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Dec 6, 2024
1 parent 69bdfe0 commit 3186788
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@
<td>Duration</td>
<td>The check interval of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-max-num</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The default deleted num of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The check interval of partition expiration.");

public static final ConfigOption<Integer> PARTITION_EXPIRATION_MAX_NUM =
key("partition.expiration-max-num")
.intType()
.defaultValue(100)
.withDescription("The default deleted num of partition expiration.");

public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
Expand Down Expand Up @@ -2126,6 +2132,10 @@ public Duration partitionExpireCheckInterval() {
return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
}

public int partitionExpireMaxNum() {
return options.get(PARTITION_EXPIRATION_MAX_NUM);
}

public PartitionExpireStrategy partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {
newScan(),
newCommit(commitUser),
metastoreClient,
options.endInputCheckPartitionExpire());
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PartitionExpire {
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
private int maxExpires;
private int maxExpireNum;

public PartitionExpire(
Duration expirationTime,
Expand All @@ -63,7 +63,8 @@ public PartitionExpire(
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient,
boolean endInputCheckPartitionExpire) {
boolean endInputCheckPartitionExpire,
int maxExpireNum) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
Expand All @@ -72,7 +73,7 @@ public PartitionExpire(
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
this.maxExpires = Integer.MAX_VALUE;
this.maxExpireNum = maxExpireNum;
}

public PartitionExpire(
Expand All @@ -81,17 +82,26 @@ public PartitionExpire(
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient) {
this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false);
@Nullable MetastoreClient metastoreClient,
int maxExpireNum) {
this(
expirationTime,
checkInterval,
strategy,
scan,
commit,
metastoreClient,
false,
maxExpireNum);
}

public PartitionExpire withLock(Lock lock) {
this.commit.withLock(lock);
return this;
}

public PartitionExpire withMaxExpires(int maxExpires) {
this.maxExpires = maxExpires;
public PartitionExpire withMaxExpireNum(int maxExpireNum) {
this.maxExpireNum = maxExpireNum;
return this;
}

Expand Down Expand Up @@ -145,6 +155,7 @@ private List<Map<String, String>> doExpire(

List<Map<String, String>> expired = new ArrayList<>();
if (!expiredPartValues.isEmpty()) {
// convert partition value to partition string, and limit the partition num
expired = convertToPartitionString(expiredPartValues);
LOG.info("Expire Partitions: {}", expired);
if (metastoreClient != null) {
Expand Down Expand Up @@ -175,7 +186,7 @@ private List<Map<String, String>> convertToPartitionString(
.sorted()
.map(s -> s.split(DELIMITER))
.map(strategy::toPartitionString)
.limit(Math.min(expiredPartValues.size(), maxExpires))
.limit(Math.min(expiredPartValues.size(), maxExpireNum))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public String[] call(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ExpirePartitionsAction(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ public String identifier() {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,43 @@ public void testNullPartitionExpire() {
.containsExactly("No expired partitions.");
}

@Test
public void testExpirePartitionsWithDefaultNum() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1',"
+ " 'partition.expiration-max-num'='2'"
+ ")");
FileStoreTable table = paimonTable("T");

sql("INSERT INTO T VALUES ('a', '2024-06-01')");
sql("INSERT INTO T VALUES ('b', '2024-06-02')");
sql("INSERT INTO T VALUES ('c', '2024-06-03')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09");

assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_formatter => 'yyyy-MM-dd')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ public InternalRow[] call(InternalRow args) {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
}

test("Paimon Procedure: expire partitions with default num") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
spark.sql(
s"""
|CREATE TABLE T (k STRING, pt STRING)
|TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', 'partition.expiration-max-num'='2')
|PARTITIONED BY (pt)
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T")

try {
// snapshot-1
inputData.addData(("a", "2024-06-01"))
stream.processAllAvailable()

// snapshot-2
inputData.addData(("b", "2024-06-02"))
stream.processAllAvailable()

// snapshot-3
inputData.addData(("c", "2024-06-03"))
stream.processAllAvailable()

// This partition never expires.
inputData.addData(("Never-expire", "9999-09-09"))
stream.processAllAvailable()

checkAnswer(
query(),
Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", "2024-06-03") :: Row(
"Never-expire",
"9999-09-09") :: Nil)
// call expire_partitions.
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd')"),
Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
)

checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", "9999-09-09") :: Nil)

} finally {
stream.stop()
}
}
}
}
}

0 comments on commit 3186788

Please sign in to comment.