diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index 36ca92382a86..0016619e2162 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -18,11 +18,6 @@ package org.apache.paimon.partition; -import org.apache.paimon.CoreOptions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.time.LocalDate; @@ -39,8 +34,6 @@ import java.util.List; import java.util.Locale; import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static java.time.temporal.ChronoField.DAY_OF_MONTH; import static java.time.temporal.ChronoField.HOUR_OF_DAY; @@ -52,8 +45,6 @@ /** Time extractor to extract time from partition values. */ public class PartitionTimeExtractor { - private static final Logger LOG = LoggerFactory.getLogger(PartitionTimeExtractor.class); - private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() .appendValue(YEAR, 1, 10, SignStyle.NORMAL) @@ -98,36 +89,18 @@ public LocalDateTime extract(LinkedHashMap spec) { } public LocalDateTime extract(List partitionKeys, List partitionValues) { - LocalDateTime dateTime = null; - try { - String timestampString; - if (pattern == null) { - timestampString = partitionValues.get(0).toString(); - } else { - timestampString = pattern; - for (int i = 0; i < partitionKeys.size(); i++) { - timestampString = - timestampString.replaceAll( - "\\$" + partitionKeys.get(i), - partitionValues.get(i).toString()); - } + String timestampString; + if (pattern == null) { + timestampString = partitionValues.get(0).toString(); + } else { + timestampString = pattern; + for (int i = 0; i < partitionKeys.size(); i++) { + timestampString = + timestampString.replaceAll( + "\\$" + partitionKeys.get(i), partitionValues.get(i).toString()); } - dateTime = toLocalDateTime(timestampString, this.formatter); - } catch (Exception e) { - String partitionInfos = - IntStream.range(0, partitionKeys.size()) - .mapToObj(i -> partitionKeys.get(i) + ":" + partitionValues.get(i)) - .collect(Collectors.joining(",")); - LOG.warn( - "Partition {} can't uses '{}' formatter to extract datetime to expire." - + " Please check the partition expiration configuration or" - + " manually delete the partition using the drop-partition command or" - + " use 'update-time' expiration strategy by set {}, the strategy support non-date formatted partition.", - partitionInfos, - this.formatter, - CoreOptions.PARTITION_EXPIRATION_STRATEGY.key()); } - return dateTime; + return toLocalDateTime(timestampString, this.formatter); } private static LocalDateTime toLocalDateTime( diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java index 5efe8b910e08..37cbed53043f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java @@ -26,9 +26,15 @@ import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * A partition expiration policy that compare the time extracted from the partition with the current @@ -36,6 +42,9 @@ */ public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy { + private static final Logger LOG = + LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class); + private final PartitionTimeExtractor timeExtractor; public PartitionValuesTimeExpireStrategy(CoreOptions options, RowType partitionType) { @@ -64,8 +73,25 @@ private PartitionValuesTimePredicate(LocalDateTime expireDateTime) { @Override public boolean test(BinaryRow partition) { Object[] array = convertPartition(partition); - LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array)); - return partTime != null && expireDateTime.isAfter(partTime); + try { + LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array)); + return expireDateTime.isAfter(partTime); + } catch (DateTimeParseException e) { + String partitionInfo = + IntStream.range(0, partitionKeys.size()) + .mapToObj(i -> partitionKeys.get(i) + ":" + array[i]) + .collect(Collectors.joining(",")); + LOG.warn( + "Can't extract datetime from partition {}. If you want to configure partition expiration, please:\n" + + " 1. Check the expiration configuration.\n" + + " 2. Manually delete the partition using the drop-partition command if the partition" + + " value is non-date formatted.\n" + + " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.", + partitionInfo, + CoreOptions.PartitionExpireStrategy.UPDATE_TIME, + CoreOptions.PARTITION_EXPIRATION_STRATEGY.key()); + return false; + } } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java index ca9b4869b44e..3f6cff6ee55d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java @@ -18,13 +18,17 @@ package org.apache.paimon.partition; +import org.apache.paimon.testutils.assertj.PaimonAssertions; + import org.junit.jupiter.api.Test; import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; import java.util.Arrays; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link PartitionTimeExtractor}. */ public class PartitionTimeExtractorTest { @@ -87,4 +91,18 @@ public void testFormatter() { Collections.emptyList(), Collections.singletonList("20230101"))) .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); } + + @Test + public void testExtractNonDateFormattedPartition() { + PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds", "yyyyMMdd"); + assertThatThrownBy( + () -> + extractor.extract( + Collections.singletonList("ds"), + Collections.singletonList("unknown"))) + .satisfies( + PaimonAssertions.anyCauseMatches( + DateTimeParseException.class, + "Text 'unknown' could not be parsed at index 0")); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java index ed168387ba7e..a39b5f18a4f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.partition; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionTimeExtractor; @@ -31,7 +32,9 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import java.time.Duration; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -123,8 +126,7 @@ public List donePartitions(boolean endInput, long currentTimeMillis) { long lastUpdateTime = entry.getValue(); long partitionStartTime = - timeExtractor - .extract(extractPartitionSpecFromPath(new Path(partition))) + extractDateTime(partition) .atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli(); @@ -139,6 +141,15 @@ public List donePartitions(boolean endInput, long currentTimeMillis) { return needDone; } + @VisibleForTesting + LocalDateTime extractDateTime(String partition) { + try { + return timeExtractor.extract(extractPartitionSpecFromPath(new Path(partition))); + } catch (DateTimeParseException e) { + throw new RuntimeException("Can't extract datetime from partition " + partition, e); + } + } + public void snapshotState() throws Exception { state.update(new ArrayList<>(pendingPartitions.keySet())); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java index f2f6f47c50d2..b00906d1c175 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.partition; import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.testutils.assertj.PaimonAssertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class PartitionMarkDoneTriggerTest { @@ -142,6 +144,24 @@ public void testWithEndInput() throws Exception { assertThat(partitions).containsOnly("dt=2024-02-02"); } + @Test + public void testParseNonDateFormattedPartition() throws Exception { + PartitionMarkDoneTrigger trigger = + new PartitionMarkDoneTrigger( + state, + extractor, + timeInterval, + idleTime, + toEpochMillis("2024-02-01"), + true); + + assertThatThrownBy(() -> trigger.extractDateTime("unknown")) + .satisfies( + PaimonAssertions.anyCauseMatches( + RuntimeException.class, + "Can't extract datetime from partition unknown")); + } + private long toEpochMillis(String dt) { return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN) .atZone(ZoneId.systemDefault())