Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink]Support specifying time-pattern in ExpairePartition #3909

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
@@ -258,13 +258,15 @@ All available procedures are listed below.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
<li>timestamp_pattern: the pattern to get a timestamp from partitions.</li>
<li>expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.</li>
</td>
<td>
-- for Flink 1.18<br/><br/>
CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')<br/><br/>
CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')<br/><br/>
-- for Flink 1.19 and later<br/><br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')<br/><br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')<br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt $hm', expire_strategy => 'values-time')<br/><br/>
</td>
</tr>
<tr>
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ public void testExpirePartitionsProcedure() throws Exception {
sql("INSERT INTO T VALUES ('1', '2024-06-01')");
sql("INSERT INTO T VALUES ('2', '9024-06-01')");
assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01");
sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')");
sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')");
assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
}

Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ public interface ActionFactory extends Factory {
String EXPIRATIONTIME = "expiration_time";
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
String TIMESTAMP_PATTERN = "timestamp_pattern";

Optional<Action> create(MultipleParameterToolAdapter params);

Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ public ExpirePartitionsAction(
Map<String, String> catalogConfig,
String expirationTime,
String timestampFormatter,
String timestampPattern,
String expireStrategy) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
@@ -54,6 +55,7 @@ public ExpirePartitionsAction(
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore<?> fileStore = fileStoreTable.store();
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String expirationTime = params.get(EXPIRATIONTIME);
String timestampFormatter = params.get(TIMESTAMPFORMATTER);
String expireStrategy = params.get(EXPIRE_STRATEGY);
String timestampPattern = params.get(TIMESTAMP_PATTERN);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

@@ -53,6 +54,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
catalogConfig,
expirationTime,
timestampFormatter,
timestampPattern,
expireStrategy));
}

@@ -64,7 +66,8 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" expire_partitions --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --tag_name <tag_name> --expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>");
+ "--table <table_name> --tag_name <tag_name> --expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>"
+ "[--timestamp_pattern <timestamp_pattern>] [--expire_strategy <expire_strategy>]");
System.out.println();
}
}
Original file line number Diff line number Diff line change
@@ -55,6 +55,10 @@ public String identifier() {
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "timestamp_pattern",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "expire_strategy",
type = @DataTypeHint("STRING"),
@@ -65,13 +69,15 @@ public String identifier() {
String tableId,
String expirationTime,
String timestampFormatter,
String timestampPattern,
String expireStrategy)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
FileStore fileStore = fileStoreTable.store();
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

PartitionExpire partitionExpire =
new PartitionExpire(
Original file line number Diff line number Diff line change
@@ -43,9 +43,9 @@
public class ExpirePartitionsActionITCase extends ActionITCaseBase {

private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()};

private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "v"});
private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "dt", "hm"});

@BeforeEach
public void setUp() {
@@ -58,7 +58,7 @@ public void testExpirePartitionsAction() throws Exception {
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List<String> expected;
expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);

@@ -84,7 +84,42 @@ public void testExpirePartitionsAction() throws Exception {
plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);

expected = Arrays.asList("+I[2, 2024-12-31]");
expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);
}

@Test
public void testExpirePartitionsActionWithTimePartition() throws Exception {
FileStoreTable table = prepareTable();
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List<String> expected;
expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);

createAction(
ExpirePartitionsAction.class,
"expire_partitions",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--expiration_time",
"1 d",
"--timestamp_formatter",
"yyyy-MM-dd HH:mm",
"--timestamp_pattern",
"$dt $hm")
.run();

plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);

expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);
}
@@ -94,13 +129,14 @@ private FileStoreTable prepareTable() throws Exception {

RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
new String[] {"k", "v"});
String[] pk = {"k", "v"};
new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
new String[] {"k", "dt", "hm"});
String[] pk = {"k", "dt", "hm"};
String[] partitions = {"dt", "hm"};
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.singletonList("v"),
new ArrayList<>(Arrays.asList(partitions)),
new ArrayList<>(Arrays.asList(pk)),
Collections.singletonList("k"),
Collections.emptyMap());
@@ -110,8 +146,16 @@ private FileStoreTable prepareTable() throws Exception {
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(BinaryString.fromString("1"), BinaryString.fromString("2024-01-01")));
writeData(rowData(BinaryString.fromString("2"), BinaryString.fromString("2024-12-31")));
writeData(
rowData(
BinaryString.fromString("1"),
BinaryString.fromString("2024-01-01"),
BinaryString.fromString("01:00")));
writeData(
rowData(
BinaryString.fromString("2"),
BinaryString.fromString("9999-09-20"),
BinaryString.fromString("02:00")));

return table;
}
Original file line number Diff line number Diff line change
@@ -297,6 +297,56 @@ public void testPartitionExpireWithNonDateFormatPartition() throws Exception {
assertThat(read(table, consumerReadResult)).isEmpty();
}

@Test
public void testPartitionExpireWithTimePartition() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " hm STRING,"
+ " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hm) WITH ("
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
// Test there are no expired partitions.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_pattern => '$dt $hm'"
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
.containsExactlyInAnyOrder("No expired partitions.");

sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')");

Function<InternalRow, String> consumerReadResult =
(InternalRow row) ->
row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"1:2024-06-01:01:00",
"2:2024-06-02:02:00",
"Never-expire:9999-09-09:99:99");

// Show a list of expired partitions.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_pattern => '$dt $hm'"
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
.containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()