Skip to content

Commit

Permalink
[core] Refactor exception handling of PartitionTimeExtractor (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jul 28, 2024
1 parent 6dbeff8 commit 95831a3
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.paimon.partition;

import org.apache.paimon.CoreOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.LocalDate;
Expand All @@ -39,8 +34,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
Expand All @@ -52,8 +45,6 @@
/** Time extractor to extract time from partition values. */
public class PartitionTimeExtractor {

private static final Logger LOG = LoggerFactory.getLogger(PartitionTimeExtractor.class);

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
Expand Down Expand Up @@ -98,36 +89,18 @@ public LocalDateTime extract(LinkedHashMap<String, String> spec) {
}

public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues) {
LocalDateTime dateTime = null;
try {
String timestampString;
if (pattern == null) {
timestampString = partitionValues.get(0).toString();
} else {
timestampString = pattern;
for (int i = 0; i < partitionKeys.size(); i++) {
timestampString =
timestampString.replaceAll(
"\\$" + partitionKeys.get(i),
partitionValues.get(i).toString());
}
String timestampString;
if (pattern == null) {
timestampString = partitionValues.get(0).toString();
} else {
timestampString = pattern;
for (int i = 0; i < partitionKeys.size(); i++) {
timestampString =
timestampString.replaceAll(
"\\$" + partitionKeys.get(i), partitionValues.get(i).toString());
}
dateTime = toLocalDateTime(timestampString, this.formatter);
} catch (Exception e) {
String partitionInfos =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + ":" + partitionValues.get(i))
.collect(Collectors.joining(","));
LOG.warn(
"Partition {} can't uses '{}' formatter to extract datetime to expire."
+ " Please check the partition expiration configuration or"
+ " manually delete the partition using the drop-partition command or"
+ " use 'update-time' expiration strategy by set {}, the strategy support non-date formatted partition.",
partitionInfos,
this.formatter,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
}
return dateTime;
return toLocalDateTime(timestampString, this.formatter);
}

private static LocalDateTime toLocalDateTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,25 @@
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* A partition expiration policy that compare the time extracted from the partition with the current
* time.
*/
public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy {

private static final Logger LOG =
LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);

private final PartitionTimeExtractor timeExtractor;

public PartitionValuesTimeExpireStrategy(CoreOptions options, RowType partitionType) {
Expand Down Expand Up @@ -64,8 +73,25 @@ private PartitionValuesTimePredicate(LocalDateTime expireDateTime) {
@Override
public boolean test(BinaryRow partition) {
Object[] array = convertPartition(partition);
LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
return partTime != null && expireDateTime.isAfter(partTime);
try {
LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
return expireDateTime.isAfter(partTime);
} catch (DateTimeParseException e) {
String partitionInfo =
IntStream.range(0, partitionKeys.size())
.mapToObj(i -> partitionKeys.get(i) + ":" + array[i])
.collect(Collectors.joining(","));
LOG.warn(
"Can't extract datetime from partition {}. If you want to configure partition expiration, please:\n"
+ " 1. Check the expiration configuration.\n"
+ " 2. Manually delete the partition using the drop-partition command if the partition"
+ " value is non-date formatted.\n"
+ " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.",
partitionInfo,
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package org.apache.paimon.partition;

import org.apache.paimon.testutils.assertj.PaimonAssertions;

import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link PartitionTimeExtractor}. */
public class PartitionTimeExtractorTest {
Expand Down Expand Up @@ -87,4 +91,18 @@ public void testFormatter() {
Collections.emptyList(), Collections.singletonList("20230101")))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
}

@Test
public void testExtractNonDateFormattedPartition() {
PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds", "yyyyMMdd");
assertThatThrownBy(
() ->
extractor.extract(
Collections.singletonList("ds"),
Collections.singletonList("unknown")))
.satisfies(
PaimonAssertions.anyCauseMatches(
DateTimeParseException.class,
"Text 'unknown' could not be parsed at index 0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
Expand All @@ -31,7 +32,9 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -123,8 +126,7 @@ public List<String> donePartitions(boolean endInput, long currentTimeMillis) {

long lastUpdateTime = entry.getValue();
long partitionStartTime =
timeExtractor
.extract(extractPartitionSpecFromPath(new Path(partition)))
extractDateTime(partition)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
Expand All @@ -139,6 +141,15 @@ public List<String> donePartitions(boolean endInput, long currentTimeMillis) {
return needDone;
}

@VisibleForTesting
LocalDateTime extractDateTime(String partition) {
try {
return timeExtractor.extract(extractPartitionSpecFromPath(new Path(partition)));
} catch (DateTimeParseException e) {
throw new RuntimeException("Can't extract datetime from partition " + partition, e);
}
}

public void snapshotState() throws Exception {
state.update(new ArrayList<>(pendingPartitions.keySet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.testutils.assertj.PaimonAssertions;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -32,6 +33,7 @@
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class PartitionMarkDoneTriggerTest {

Expand Down Expand Up @@ -142,6 +144,24 @@ public void testWithEndInput() throws Exception {
assertThat(partitions).containsOnly("dt=2024-02-02");
}

@Test
public void testParseNonDateFormattedPartition() throws Exception {
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
state,
extractor,
timeInterval,
idleTime,
toEpochMillis("2024-02-01"),
true);

assertThatThrownBy(() -> trigger.extractDateTime("unknown"))
.satisfies(
PaimonAssertions.anyCauseMatches(
RuntimeException.class,
"Can't extract datetime from partition unknown"));
}

private long toEpochMillis(String dt) {
return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
Expand Down

0 comments on commit 95831a3

Please sign in to comment.