Skip to content

Commit

Permalink
[core] check metastore only for done_partition markdown action (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Jun 3, 2024
1 parent b648ed8 commit 4185189
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues
.mapToObj(i -> partitionKeys.get(i) + ":" + partitionValues.get(i))
.collect(Collectors.joining(","));
LOG.warn(
"Parition {} can't be extract datetime to expire,Please check the partition expiration configuration or manually delete the partition using the drop-partition command. ",
"Partition {} can't be extract datetime to expire."
+ " Please check the partition expiration configuration or"
+ " manually delete the partition using the drop-partition command.",
paritionInfos);
}
return dateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,17 @@ public static PartitionMarkDone create(

MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();
checkNotNull(
metastoreClientFactory, "Cannot mark done partition for table without metastore.");
checkArgument(
coreOptions.partitionedTableInMetastore(),
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());

String partitionMarkDownAction = options.get(PARTITION_MARK_DONE_ACTION);
if (partitionMarkDownAction.contains("done-partition")) {
checkNotNull(
metastoreClientFactory,
"Cannot mark done partition for table without metastore.");
checkArgument(
coreOptions.partitionedTableInMetastore(),
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());
}

InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
Expand All @@ -116,7 +121,7 @@ public static PartitionMarkDone create(
idleToDone);

List<PartitionMarkDoneAction> actions =
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
Arrays.asList(partitionMarkDownAction.split(",")).stream()
.map(
action -> {
switch (action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ public class PartitionMarkDoneTrigger {

private final State state;
private final PartitionTimeExtractor timeExtractor;
private final long timeInternal;
private final long timeInterval;
private final long idleTime;
private final Map<String, Long> pendingPartitions;

public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInternal,
Duration timeInterval,
Duration idleTime)
throws Exception {
this(state, timeExtractor, timeInternal, idleTime, System.currentTimeMillis());
this(state, timeExtractor, timeInterval, idleTime, System.currentTimeMillis());
}

PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInternal,
Duration timeInterval,
Duration idleTime,
long currentTimeMillis)
throws Exception {
this.state = state;
this.timeExtractor = timeExtractor;
this.timeInternal = timeInternal.toMillis();
this.timeInterval = timeInterval.toMillis();
this.idleTime = idleTime.toMillis();
this.pendingPartitions = new HashMap<>();
state.restore().forEach(p -> pendingPartitions.put(p, currentTimeMillis));
Expand Down Expand Up @@ -93,7 +93,7 @@ public List<String> donePartitions(long currentTimeMillis) {
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
long partitionEndTime = partitionStartTime + timeInternal;
long partitionEndTime = partitionStartTime + timeInterval;
lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);

if (currentTimeMillis - lastUpdateTime > idleTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public void update(List<String> partitions) {
};

PartitionTimeExtractor extractor = new PartitionTimeExtractor("$dt", "yyyy-MM-dd");
Duration timeInternal = Duration.ofDays(1);
Duration timeInterval = Duration.ofDays(1);
Duration idleTime = Duration.ofMinutes(15);
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
state, extractor, timeInternal, idleTime, toEpochMillis("2024-02-01"));
state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-01"));

// test not reach partition end + idle time
trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
Expand Down Expand Up @@ -91,7 +91,7 @@ public void update(List<String> partitions) {
pendingPartitions.add("dt=2024-02-04");
trigger =
new PartitionMarkDoneTrigger(
state, extractor, timeInternal, idleTime, toEpochMillis("2024-02-06"));
state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-06"));
partitions = trigger.donePartitions(toEpochMillis("2024-02-06"));
assertThat(partitions).isEmpty();
partitions = trigger.donePartitions(toEpochMillis("2024-02-06") + idleTime.toMillis() + 1);
Expand Down

0 comments on commit 4185189

Please sign in to comment.