Skip to content

Commit

Permalink
[flink] Support specifying time-pattern in ExpairePartition (apache#3909
Browse files Browse the repository at this point in the history
)
  • Loading branch information
herefree authored and wxplovecc committed Aug 6, 2024
1 parent 2f7a83a commit b74f775
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 14 deletions.
6 changes: 4 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ All available procedures are listed below.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
<li>timestamp_pattern: the pattern to get a timestamp from partitions.</li>
<li>expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.</li>
</td>
<td>
-- for Flink 1.18<br/><br/>
CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')<br/><br/>
CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')<br/><br/>
-- for Flink 1.19 and later<br/><br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')<br/><br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')<br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt $hm', expire_strategy => 'values-time')<br/><br/>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testExpirePartitionsProcedure() throws Exception {
sql("INSERT INTO T VALUES ('1', '2024-06-01')");
sql("INSERT INTO T VALUES ('2', '9024-06-01')");
assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", "2:9024-06-01");
sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 'values-time')");
sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 'values-time')");
assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public interface ActionFactory extends Factory {
String EXPIRATIONTIME = "expiration_time";
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
String TIMESTAMP_PATTERN = "timestamp_pattern";

Optional<Action> create(MultipleParameterToolAdapter params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public ExpirePartitionsAction(
Map<String, String> catalogConfig,
String expirationTime,
String timestampFormatter,
String timestampPattern,
String expireStrategy) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
Expand All @@ -54,6 +55,7 @@ public ExpirePartitionsAction(
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore<?> fileStore = fileStoreTable.store();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String expirationTime = params.get(EXPIRATIONTIME);
String timestampFormatter = params.get(TIMESTAMPFORMATTER);
String expireStrategy = params.get(EXPIRE_STRATEGY);
String timestampPattern = params.get(TIMESTAMP_PATTERN);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

Expand All @@ -53,6 +54,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
catalogConfig,
expirationTime,
timestampFormatter,
timestampPattern,
expireStrategy));
}

Expand All @@ -64,7 +66,8 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" expire_partitions --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --tag_name <tag_name> --expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>");
+ "--table <table_name> --tag_name <tag_name> --expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>"
+ "[--timestamp_pattern <timestamp_pattern>] [--expire_strategy <expire_strategy>]");
System.out.println();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public String identifier() {
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "timestamp_pattern",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "expire_strategy",
type = @DataTypeHint("STRING"),
Expand All @@ -65,13 +69,15 @@ public String identifier() {
String tableId,
String expirationTime,
String timestampFormatter,
String timestampPattern,
String expireStrategy)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
FileStore fileStore = fileStoreTable.store();
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter);
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern);

PartitionExpire partitionExpire =
new PartitionExpire(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
public class ExpirePartitionsActionITCase extends ActionITCaseBase {

private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()};

private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "v"});
private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"k", "dt", "hm"});

@BeforeEach
public void setUp() {
Expand All @@ -58,7 +58,7 @@ public void testExpirePartitionsAction() throws Exception {
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List<String> expected;
expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);

Expand All @@ -84,7 +84,42 @@ public void testExpirePartitionsAction() throws Exception {
plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);

expected = Arrays.asList("+I[2, 2024-12-31]");
expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);
}

@Test
public void testExpirePartitionsActionWithTimePartition() throws Exception {
FileStoreTable table = prepareTable();
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);
List<String> expected;
expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);

createAction(
ExpirePartitionsAction.class,
"expire_partitions",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--expiration_time",
"1 d",
"--timestamp_formatter",
"yyyy-MM-dd HH:mm",
"--timestamp_pattern",
"$dt $hm")
.run();

plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(), ROW_TYPE);

expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");

assertThat(actual).isEqualTo(expected);
}
Expand All @@ -94,13 +129,14 @@ private FileStoreTable prepareTable() throws Exception {

RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
new String[] {"k", "v"});
String[] pk = {"k", "v"};
new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
new String[] {"k", "dt", "hm"});
String[] pk = {"k", "dt", "hm"};
String[] partitions = {"dt", "hm"};
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.singletonList("v"),
new ArrayList<>(Arrays.asList(partitions)),
new ArrayList<>(Arrays.asList(pk)),
Collections.singletonList("k"),
Collections.emptyMap());
Expand All @@ -110,8 +146,16 @@ private FileStoreTable prepareTable() throws Exception {
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(BinaryString.fromString("1"), BinaryString.fromString("2024-01-01")));
writeData(rowData(BinaryString.fromString("2"), BinaryString.fromString("2024-12-31")));
writeData(
rowData(
BinaryString.fromString("1"),
BinaryString.fromString("2024-01-01"),
BinaryString.fromString("01:00")));
writeData(
rowData(
BinaryString.fromString("2"),
BinaryString.fromString("9999-09-20"),
BinaryString.fromString("02:00")));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,56 @@ public void testPartitionExpireWithNonDateFormatPartition() throws Exception {
assertThat(read(table, consumerReadResult)).isEmpty();
}

@Test
public void testPartitionExpireWithTimePartition() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " hm STRING,"
+ " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hm) WITH ("
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
// Test there are no expired partitions.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_pattern => '$dt $hm'"
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
.containsExactlyInAnyOrder("No expired partitions.");

sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
// This partition never expires.
sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')");

Function<InternalRow, String> consumerReadResult =
(InternalRow row) ->
row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder(
"1:2024-06-01:01:00",
"2:2024-06-02:02:00",
"Never-expire:9999-09-09:99:99");

// Show a list of expired partitions.
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_pattern => '$dt $hm'"
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm')"))
.containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00");

assertThat(read(table, consumerReadResult))
.containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down

0 comments on commit b74f775

Please sign in to comment.