Skip to content

Commit

Permalink
[core] Expire partition procedure: show a list of expired partitions (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang authored Jun 26, 2024
1 parent af4294f commit cc077b7
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public PartitionExpire withLock(Lock lock) {
return this;
}

public void expire(long commitIdentifier) {
expire(LocalDateTime.now(), commitIdentifier);
public List<Map<String, String>> expire(long commitIdentifier) {
return expire(LocalDateTime.now(), commitIdentifier);
}

@VisibleForTesting
Expand All @@ -92,27 +92,32 @@ void setLastCheck(LocalDateTime time) {
}

@VisibleForTesting
void expire(LocalDateTime now, long commitIdentifier) {
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
if (checkInterval.isZero() || now.isAfter(lastCheck.plus(checkInterval))) {
doExpire(now.minus(expirationTime), commitIdentifier);
List<Map<String, String>> expired =
doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
return expired;
}
return null;
}

private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
private List<Map<String, String>> doExpire(
LocalDateTime expireDateTime, long commitIdentifier) {
List<Map<String, String>> expired = new ArrayList<>();
for (BinaryRow partition : readPartitions(expireDateTime)) {
Object[] array = toObjectArrayConverter.convert(partition);
Map<String, String> partString = toPartitionString(array);
expired.add(partString);
LOG.info("Expire Partition: " + partition);
LOG.info("Expire Partition: {}", partition);
}
if (expired.size() > 0) {
if (metastoreClient != null) {
deleteMetastorePartitions(expired);
}
commit.dropPartitions(expired, commitIdentifier);
}
return expired;
}

private void deleteMetastorePartitions(List<Map<String, String>> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** A procedure to expire partitions. */
Expand All @@ -46,7 +49,7 @@ public String identifier() {
@ArgumentHint(name = "expiration_time", type = @DataTypeHint(value = "STRING")),
@ArgumentHint(name = "timestamp_formatter", type = @DataTypeHint("STRING"))
})
public String[] call(
public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call(
ProcedureContext procedureContext,
String tableId,
String expirationTime,
Expand All @@ -69,7 +72,15 @@ public String[] call(
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
partitionExpire.expire(Long.MAX_VALUE);
return new String[] {};
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new Row[] {Row.of("No expired partitions.")}
: expired.stream()
.map(
x -> {
String r = x.toString();
return Row.of(r.substring(1, r.length() - 1));
})
.toArray(Row[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;

Expand All @@ -26,6 +27,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -44,18 +47,70 @@ public void testExpirePartitionsProcedure() throws Exception {
+ ")");
FileStoreTable table = paimonTable("T");
sql("INSERT INTO T VALUES ('1', '2024-06-01')");
// Never expire.
sql("INSERT INTO T VALUES ('2', '9024-06-01')");
assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01");
Function<InternalRow, String> consumerReadResult =
(InternalRow row) -> row.getString(0) + ":" + row.getString(1);
assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01");
sql(
"CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')");
assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
assertThat(read(table, consumerReadResult)).containsExactlyInAnyOrder("2:9024-06-01");
}

private List<String> read(FileStoreTable table) throws IOException {
@Test
public void testShowExpirePartitionsProcedureResults() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " hm STRING,"
+ " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hm) WITH ("
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
// Test there are no expired partitions.
List<String> result =
sql(
"CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")
.stream()
.map(row -> row.getField(0).toString())
.collect(Collectors.toList());
assertThat(result).containsExactlyInAnyOrder("No expired partitions.");

sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
// Never expire.
sql("INSERT INTO T VALUES ('3', '9024-06-01', '03:00')");

Function<InternalRow, String> consumerReadResult =
(InternalRow row) ->
row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"1:2024-06-01:01:00", "2:2024-06-02:02:00", "3:9024-06-01:03:00");

result =
sql(
"CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")
.stream()
.map(row -> row.getField(0).toString())
.collect(Collectors.toList());
// Show a list of expired partitions.
assertThat(result)
.containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00");

assertThat(read(table, consumerReadResult)).containsExactlyInAnyOrder("3:9024-06-01:03:00");
}

private List<String> read(
FileStoreTable table, Function<InternalRow, String> consumerReadResult)
throws IOException {
List<String> ret = new ArrayList<>();
table.newRead()
.createReader(table.newScan().plan().splits())
.forEachRemaining(row -> ret.add(row.getString(0) + ":" + row.getString(1)));
.forEachRemaining(row -> ret.add(consumerReadResult.apply(row)));
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.spark.sql.types.DataTypes.StringType;
Expand All @@ -50,7 +52,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
new StructField("expired_partitions", StringType, true, Metadata.empty())
});

protected ExpirePartitionsProcedure(TableCatalog tableCatalog) {
Expand Down Expand Up @@ -92,9 +94,20 @@ public InternalRow[] call(InternalRow args) {
.metastoreClientFactory())
.map(MetastoreClient.Factory::create)
.orElse(null));
partitionExpire.expire(Long.MAX_VALUE);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
List<Map<String, String>> expired = partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new InternalRow[] {
newInternalRow(UTF8String.fromString("No expired partitions."))
}
: expired.stream()
.map(
x -> {
String r = x.toString();
return newInternalRow(
UTF8String.fromString(
r.substring(1, r.length() - 1)));
})
.toArray(InternalRow[]::new);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest

/** IT Case for expire partitions procedure. */
/** IT Case for [[ExpirePartitionsProcedure]]. */
class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._
Expand Down Expand Up @@ -69,7 +69,8 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd')"),
Row(true) :: Nil)
Row("pt=2024-06-01") :: Nil
)

checkAnswer(query(), Row("b", "9024-06-01") :: Nil)

Expand All @@ -79,4 +80,72 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest
}
}
}

test("Paimon procedure : expire partitions show a list of expired partitions.") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
spark.sql(s"""
|CREATE TABLE T (k STRING, pt STRING, hm STRING)
|TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1')
| PARTITIONED BY (pt,hm)
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(String, String, String)]
val stream = inputData
.toDS()
.toDF("k", "pt", "hm")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T")

try {
// Show results : There are no expired partitions.
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd')"),
Row("No expired partitions.") :: Nil
)

// snapshot-1
inputData.addData(("a", "2024-06-01", "01:00"))
stream.processAllAvailable()
// snapshot-2
inputData.addData(("b", "2024-06-02", "02:00"))
stream.processAllAvailable()
// snapshot-3, never expires.
inputData.addData(("c", "9024-06-03", "03:00"))
stream.processAllAvailable()

checkAnswer(
query(),
Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", "02:00") :: Row(
"c",
"9024-06-03",
"03:00") :: Nil)

// expire
checkAnswer(
spark.sql(
"CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" +
", timestamp_formatter => 'yyyy-MM-dd')"),
Row("pt=2024-06-01, hm=01:00") :: Row("pt=2024-06-02, hm=02:00") :: Nil
)

checkAnswer(query(), Row("c", "9024-06-03", "03:00") :: Nil)

} finally {
stream.stop()
}
}
}
}
}

0 comments on commit cc077b7

Please sign in to comment.