Skip to content

Commit

Permalink
[flink] Fix that 'values-time' partition expire might throw NPE (#4646)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Dec 5, 2024
1 parent 812ef05 commit 4bf2d9b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,34 @@ public boolean test(BinaryRow partition) {
LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
return expireDateTime.isAfter(partTime);
} catch (DateTimeParseException e) {
String partitionInfo =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + ":" + array[i])
.collect(Collectors.joining(","));
LOG.warn(
"Can't extract datetime from partition {}. If you want to configure partition expiration, please:\n"
+ " 1. Check the expiration configuration.\n"
+ " 2. Manually delete the partition using the drop-partition command if the partition"
+ " value is non-date formatted.\n"
+ " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.",
partitionInfo,
formatPartitionInfo(array),
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
return false;
} catch (NullPointerException e) {
// there might exist NULL partition value
LOG.warn(
"This partition {} cannot be expired because it contains null value. "
+ "You can try to drop it manually or use '{}' expiration strategy by set '{}'.",
formatPartitionInfo(array),
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
return false;
}
}

private String formatPartitionInfo(Object[] array) {
return IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + ":" + array[i])
.collect(Collectors.joining(","));
}

@Override
public boolean test(
long rowCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ public void testSortAndLimitExpirePartition() throws Exception {
.containsExactlyInAnyOrder("4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99");
}

@Test
public void testNullPartitionExpire() {
sql("CREATE TABLE T (k INT, ds STRING) PARTITIONED BY (ds);");
sql("INSERT INTO T VALUES (1, CAST (NULL AS STRING))");
assertThat(
callExpirePartitions(
"CALL sys.expire_partitions("
+ "`table` => 'default.T'"
+ ", expiration_time => '1 d'"
+ ", timestamp_formatter => 'yyyyMMdd')"))
.containsExactly("No expired partitions.");
}

/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
Expand Down

0 comments on commit 4bf2d9b

Please sign in to comment.