Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Expire partiitons add default delete num #4652

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@
<td>Duration</td>
<td>The check interval of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-max-num</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The default deleted num of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The check interval of partition expiration.");

public static final ConfigOption<Integer> PARTITION_EXPIRATION_MAX_NUM =
key("partition.expiration-max-num")
.intType()
.defaultValue(100)
.withDescription("The default deleted num of partition expiration.");

public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
key("partition.timestamp-formatter")
.stringType()
Expand Down Expand Up @@ -2126,6 +2132,10 @@ public Duration partitionExpireCheckInterval() {
return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
}

public int partitionExpireMaxNum() {
return options.get(PARTITION_EXPIRATION_MAX_NUM);
}

public PartitionExpireStrategy partitionExpireStrategy() {
return options.get(PARTITION_EXPIRATION_STRATEGY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {
newScan(),
newCommit(commitUser),
metastoreClient,
options.endInputCheckPartitionExpire());
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PartitionExpire {
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
private int maxExpires;
private int maxExpireNum;

public PartitionExpire(
Duration expirationTime,
Expand All @@ -63,7 +63,8 @@ public PartitionExpire(
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient,
boolean endInputCheckPartitionExpire) {
boolean endInputCheckPartitionExpire,
int maxExpireNum) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
Expand All @@ -72,7 +73,7 @@ public PartitionExpire(
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
this.maxExpires = Integer.MAX_VALUE;
this.maxExpireNum = maxExpireNum;
}

public PartitionExpire(
Expand All @@ -81,17 +82,26 @@ public PartitionExpire(
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient) {
this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false);
@Nullable MetastoreClient metastoreClient,
int maxExpireNum) {
this(
expirationTime,
checkInterval,
strategy,
scan,
commit,
metastoreClient,
false,
maxExpireNum);
}

public PartitionExpire withLock(Lock lock) {
this.commit.withLock(lock);
return this;
}

public PartitionExpire withMaxExpires(int maxExpires) {
this.maxExpires = maxExpires;
public PartitionExpire withMaxExpireNum(int maxExpireNum) {
this.maxExpireNum = maxExpireNum;
return this;
}

Expand Down Expand Up @@ -145,6 +155,7 @@ private List<Map<String, String>> doExpire(

List<Map<String, String>> expired = new ArrayList<>();
if (!expiredPartValues.isEmpty()) {
// convert partition value to partition string, and limit the partition num
expired = convertToPartitionString(expiredPartValues);
LOG.info("Expire Partitions: {}", expired);
if (metastoreClient != null) {
Expand Down Expand Up @@ -175,7 +186,7 @@ private List<Map<String, String>> convertToPartitionString(
.sorted()
.map(s -> s.split(DELIMITER))
.map(strategy::toPartitionString)
.limit(Math.min(expiredPartValues.size(), maxExpires))
.limit(Math.min(expiredPartValues.size(), maxExpireNum))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public String[] call(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ExpirePartitionsAction(
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ public String identifier() {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,43 @@ public void testNullPartitionExpire() {
.containsExactly("No expired partitions.");
}

@Test
public void testExpirePartitionsWithDefaultNum() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1',"
+ " 'partition.expiration-max-num'='2'"
+ ")");
FileStoreTable table = paimonTable("T");

sql("INSERT INTO T VALUES ('a', '2024-06-01')");
sql("INSERT INTO T VALUES ('b', '2024-06-02')");
sql("INSERT INTO T VALUES ('c', '2024-06-03')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09");

assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_formatter => 'yyyy-MM-dd')"))
.containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ public InternalRow[] call(InternalRow args) {
.catalogEnvironment()
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
.orElse(null),
fileStore.options().partitionExpireMaxNum());
if (maxExpires != null) {
partitionExpire.withMaxExpires(maxExpires);
partitionExpire.withMaxExpireNum(maxExpires);
}
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
}

test("Paimon Procedure: expire partitions with default num") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
spark.sql(
s"""
|CREATE TABLE T (k STRING, pt STRING)
|TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', 'partition.expiration-max-num'='2')
|PARTITIONED BY (pt)
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T")

try {
// snapshot-1
inputData.addData(("a", "2024-06-01"))
stream.processAllAvailable()

// snapshot-2
inputData.addData(("b", "2024-06-02"))
stream.processAllAvailable()

// snapshot-3
inputData.addData(("c", "2024-06-03"))
stream.processAllAvailable()

// This partition never expires.
inputData.addData(("Never-expire", "9999-09-09"))
stream.processAllAvailable()

checkAnswer(
query(),
Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", "2024-06-03") :: Row(
"Never-expire",
"9999-09-09") :: Nil)
// call expire_partitions.
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd')"),
Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
)

checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", "9999-09-09") :: Nil)

} finally {
stream.stop()
}
}
}
}
}
Loading