From b74f7754da3d6d490e691cb81f81b24b758ec689 Mon Sep 17 00:00:00 2001
From: herefree <841043203@qq.com>
Date: Tue, 6 Aug 2024 16:41:30 +0800
Subject: [PATCH] [flink] Support specifying time-pattern in ExpairePartition
(#3909)
---
docs/content/flink/procedures.md | 6 +-
.../ProcedurePositionalArgumentsITCase.java | 2 +-
.../paimon/flink/action/ActionFactory.java | 1 +
.../flink/action/ExpirePartitionsAction.java | 2 +
.../action/ExpirePartitionsActionFactory.java | 5 +-
.../procedure/ExpirePartitionsProcedure.java | 6 ++
.../action/ExpirePartitionsActionITCase.java | 64 ++++++++++++++++---
.../ExpirePartitionsProcedureITCase.java | 50 +++++++++++++++
8 files changed, 122 insertions(+), 14 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 2ef8f800febb4..3fae9b40512fa 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -258,13 +258,15 @@ All available procedures are listed below.
table: the target table identifier. Cannot be empty.
expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
timestamp_formatter: the formatter to format timestamp from string.
+ timestamp_pattern: the pattern to get a timestamp from partitions.
expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
-- for Flink 1.18
- CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')
+ CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')
-- for Flink 1.19 and later
- CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')
+ CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')
+ CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt $hm', expire_strategy => 'values-time')
|
diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 5e1acca3ada69..1db04aa1d392c 100644
--- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -74,7 +74,7 @@ public void testExpirePartitionsProcedure() throws Exception {
sql("INSERT INTO T VALUES ('1', '2024-06-01')");
sql("INSERT INTO T VALUES ('2', '9024-06-01')");
assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01");
- sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')");
+ sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')");
assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index bcc53aa92f00a..aeacc8ce68ee9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -56,6 +56,7 @@ public interface ActionFactory extends Factory {
String EXPIRATIONTIME = "expiration_time";
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
+ String TIMESTAMP_PATTERN = "timestamp_pattern";
Optional create(MultipleParameterToolAdapter params);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 5a41ba0f9c75b..9528bc137d6f2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -43,6 +43,7 @@ public ExpirePartitionsAction(
Map catalogConfig,
String expirationTime,
String timestampFormatter,
+ String timestampPattern,
String expireStrategy) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
@@ -54,6 +55,7 @@ public ExpirePartitionsAction(
Map map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore> fileStore = fileStoreTable.store();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
index c343d3b7ff4b9..3d0dfc265983b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
@@ -42,6 +42,7 @@ public Optional create(MultipleParameterToolAdapter params) {
String expirationTime = params.get(EXPIRATIONTIME);
String timestampFormatter = params.get(TIMESTAMPFORMATTER);
String expireStrategy = params.get(EXPIRE_STRATEGY);
+ String timestampPattern = params.get(TIMESTAMP_PATTERN);
Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
@@ -53,6 +54,7 @@ public Optional create(MultipleParameterToolAdapter params) {
catalogConfig,
expirationTime,
timestampFormatter,
+ timestampPattern,
expireStrategy));
}
@@ -64,7 +66,8 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" expire_partitions --warehouse --database "
- + "--table --tag_name --expiration_time --timestamp_formatter ");
+ + "--table --tag_name --expiration_time --timestamp_formatter "
+ + "[--timestamp_pattern ] [--expire_strategy ]");
System.out.println();
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index ae9eb139ef65f..abbf4f48698b7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -55,6 +55,10 @@ public String identifier() {
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
isOptional = true),
+ @ArgumentHint(
+ name = "timestamp_pattern",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(
name = "expire_strategy",
type = @DataTypeHint("STRING"),
@@ -65,6 +69,7 @@ public String identifier() {
String tableId,
String expirationTime,
String timestampFormatter,
+ String timestampPattern,
String expireStrategy)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
@@ -72,6 +77,7 @@ public String identifier() {
Map map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);
PartitionExpire partitionExpire =
new PartitionExpire(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
index 55a36637efbe4..9077204c0d849 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -43,9 +43,9 @@
public class ExpirePartitionsActionITCase extends ActionITCaseBase {
private static final DataType[] FIELD_TYPES =
- new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
+ new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()};
- private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "v"});
+ private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "dt", "hm"});
@BeforeEach
public void setUp() {
@@ -58,7 +58,7 @@ public void testExpirePartitionsAction() throws Exception {
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List expected;
- expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
+ expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");
assertThat(actual).isEqualTo(expected);
@@ -84,7 +84,42 @@ public void testExpirePartitionsAction() throws Exception {
plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
- expected = Arrays.asList("+I[2, 2024-12-31]");
+ expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testExpirePartitionsActionWithTimePartition() throws Exception {
+ FileStoreTable table = prepareTable();
+ TableScan.Plan plan = table.newReadBuilder().newScan().plan();
+ List actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
+ List expected;
+ expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");
+
+ assertThat(actual).isEqualTo(expected);
+
+ createAction(
+ ExpirePartitionsAction.class,
+ "expire_partitions",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--expiration_time",
+ "1 d",
+ "--timestamp_formatter",
+ "yyyy-MM-dd HH:mm",
+ "--timestamp_pattern",
+ "$dt $hm")
+ .run();
+
+ plan = table.newReadBuilder().newScan().plan();
+ actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
+
+ expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
assertThat(actual).isEqualTo(expected);
}
@@ -94,13 +129,14 @@ private FileStoreTable prepareTable() throws Exception {
RowType rowType =
RowType.of(
- new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
- new String[] {"k", "v"});
- String[] pk = {"k", "v"};
+ new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
+ new String[] {"k", "dt", "hm"});
+ String[] pk = {"k", "dt", "hm"};
+ String[] partitions = {"dt", "hm"};
FileStoreTable table =
createFileStoreTable(
rowType,
- Collections.singletonList("v"),
+ new ArrayList<>(Arrays.asList(partitions)),
new ArrayList<>(Arrays.asList(pk)),
Collections.singletonList("k"),
Collections.emptyMap());
@@ -110,8 +146,16 @@ private FileStoreTable prepareTable() throws Exception {
commit = writeBuilder.newCommit();
// 3 snapshots
- writeData(rowData(BinaryString.fromString("1"), BinaryString.fromString("2024-01-01")));
- writeData(rowData(BinaryString.fromString("2"), BinaryString.fromString("2024-12-31")));
+ writeData(
+ rowData(
+ BinaryString.fromString("1"),
+ BinaryString.fromString("2024-01-01"),
+ BinaryString.fromString("01:00")));
+ writeData(
+ rowData(
+ BinaryString.fromString("2"),
+ BinaryString.fromString("9999-09-20"),
+ BinaryString.fromString("02:00")));
return table;
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index d476096f00658..71a6dc466ee09 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -297,6 +297,56 @@ public void testPartitionExpireWithNonDateFormatPartition() throws Exception {
assertThat(read(table, consumerReadResult)).isEmpty();
}
+ @Test
+ public void testPartitionExpireWithTimePartition() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " hm STRING,"
+ + " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+ + ") PARTITIONED BY (dt, hm) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+ // Test there are no expired partitions.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_pattern => '$dt $hm'"
+ + ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
+ .containsExactlyInAnyOrder("No expired partitions.");
+
+ sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
+ sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')");
+
+ Function consumerReadResult =
+ (InternalRow row) ->
+ row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "1:2024-06-01:01:00",
+ "2:2024-06-02:02:00",
+ "Never-expire:9999-09-09:99:99");
+
+ // Show a list of expired partitions.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_pattern => '$dt $hm'"
+ + ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99");
+ }
+
/** Return a list of expired partitions. */
public List callExpirePartitions(String callSql) {
return sql(callSql).stream()