Skip to content

Commit

Permalink
[flink] Partition Mark done should not be affected by compaction (#4027)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 22, 2024
1 parent 4f4457a commit cd45b7c
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) {

public ManifestCommittable(
long identifier,
Long watermark,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
List<CommitMessage> commitMessages) {
this.identifier = identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ public void markDone(String partition) throws Exception {
Path successPath = new Path(partitionPath, SUCCESS_FILE_NAME);

long currentTime = System.currentTimeMillis();
SuccessFile successFile = SuccessFile.safelyFromPath(fileIO, successPath);
if (successFile == null) {
successFile = new SuccessFile(currentTime, currentTime);
} else {
successFile = successFile.updateModificationTime(currentTime);
SuccessFile successFile = new SuccessFile(currentTime, currentTime);
if (fileIO.exists(successPath)) {
successFile =
SuccessFile.fromPath(fileIO, successPath).updateModificationTime(currentTime);
}
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws IOExce
}
}

public static SuccessFile fromPath(FileIO fileIO, Path path) throws IOException {
String json = fileIO.readFileUtf8(path);
return SuccessFile.fromJson(json);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
Expand All @@ -49,6 +51,7 @@ public class PartitionMarkDone implements Closeable {
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List<PartitionMarkDoneAction> actions;
private final boolean waitCompaction;

@Nullable
public static PartitionMarkDone create(
Expand Down Expand Up @@ -76,7 +79,14 @@ public static PartitionMarkDone create(
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(table, coreOptions);

return new PartitionMarkDone(partitionComputer, trigger, actions);
// if batch read skip level 0 files, we should wait compaction to mark done
// otherwise, some data may not be readable, and there might be data delays
boolean waitCompaction =
!table.primaryKeys().isEmpty()
&& (coreOptions.deletionVectorsEnabled()
|| coreOptions.mergeEngine() == MergeEngine.FIRST_ROW);

return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction);
}

private static boolean disablePartitionMarkDone(
Expand All @@ -91,30 +101,32 @@ private static boolean disablePartitionMarkDone(
return true;
}

List<String> partitionKeys = table.partitionKeys();
if (partitionKeys.isEmpty()) {
return true;
}

return false;
return table.partitionKeys().isEmpty();
}

public PartitionMarkDone(
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
List<PartitionMarkDoneAction> actions) {
List<PartitionMarkDoneAction> actions,
boolean waitCompaction) {
this.partitionComputer = partitionComputer;
this.trigger = trigger;
this.actions = actions;
this.waitCompaction = waitCompaction;
}

public void notifyCommittable(List<ManifestCommittable> committables) {
Set<BinaryRow> partitions = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.forEach(partitions::add);
for (CommitMessage commitMessage : committable.fileCommittables()) {
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
if (waitCompaction
|| !message.indexIncrement().isEmpty()
|| !message.newFilesIncrement().isEmpty()) {
partitions.add(message.partition());
}
}
if (committable.identifier() == Long.MAX_VALUE) {
endInput = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -57,53 +59,52 @@ public class PartitionMarkDoneTrigger {

private final State state;
private final PartitionTimeExtractor timeExtractor;
private long timeInterval;
private long idleTime;
private final boolean partitionMarkDoneWhenEndInput;
// can be null when markDoneWhenEndInput is true
@Nullable private final Long timeInterval;
// can be null when markDoneWhenEndInput is true
@Nullable private final Long idleTime;
private final boolean markDoneWhenEndInput;
private final Map<String, Long> pendingPartitions;

public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
Duration idleTime,
boolean partitionMarkDoneWhenEndInput)
@Nullable Duration timeInterval,
@Nullable Duration idleTime,
boolean markDoneWhenEndInput)
throws Exception {
this(
state,
timeExtractor,
timeInterval,
idleTime,
System.currentTimeMillis(),
partitionMarkDoneWhenEndInput);
markDoneWhenEndInput);
}

public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
Duration idleTime,
@Nullable Duration timeInterval,
@Nullable Duration idleTime,
long currentTimeMillis,
boolean partitionMarkDoneWhenEndInput)
boolean markDoneWhenEndInput)
throws Exception {
this.pendingPartitions = new HashMap<>();
this.state = state;
this.timeExtractor = timeExtractor;
if (timeInterval != null) {
this.timeInterval = timeInterval.toMillis();
}
if (idleTime != null) {
this.idleTime = idleTime.toMillis();
}
this.partitionMarkDoneWhenEndInput = partitionMarkDoneWhenEndInput;
this.timeInterval = timeInterval == null ? null : timeInterval.toMillis();
this.idleTime = idleTime == null ? null : idleTime.toMillis();
this.markDoneWhenEndInput = markDoneWhenEndInput;
state.restore().forEach(p -> pendingPartitions.put(p, currentTimeMillis));
}

public void notifyPartition(String partition) {
notifyPartition(partition, System.currentTimeMillis());
}

public void notifyPartition(String partition, long currentTimeMillis) {
@VisibleForTesting
void notifyPartition(String partition, long currentTimeMillis) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.put(partition, currentTimeMillis);
}
Expand All @@ -113,11 +114,16 @@ public List<String> donePartitions(boolean endInput) {
return donePartitions(endInput, System.currentTimeMillis());
}

public List<String> donePartitions(boolean endInput, long currentTimeMillis) {
if (endInput && partitionMarkDoneWhenEndInput) {
@VisibleForTesting
List<String> donePartitions(boolean endInput, long currentTimeMillis) {
if (endInput && markDoneWhenEndInput) {
return new ArrayList<>(pendingPartitions.keySet());
}

if (timeInterval == null || idleTime == null) {
return Collections.emptyList();
}

List<String> needDone = new ArrayList<>();
Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
while (iter.hasNext()) {
Expand Down
Loading

0 comments on commit cd45b7c

Please sign in to comment.