Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Partition Mark done should not be affected by compaction #4027

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) {

public ManifestCommittable(
long identifier,
Long watermark,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
List<CommitMessage> commitMessages) {
this.identifier = identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ public void markDone(String partition) throws Exception {
Path successPath = new Path(partitionPath, SUCCESS_FILE_NAME);

long currentTime = System.currentTimeMillis();
SuccessFile successFile = SuccessFile.safelyFromPath(fileIO, successPath);
if (successFile == null) {
successFile = new SuccessFile(currentTime, currentTime);
} else {
successFile = successFile.updateModificationTime(currentTime);
SuccessFile successFile = new SuccessFile(currentTime, currentTime);
if (fileIO.exists(successPath)) {
successFile =
SuccessFile.fromPath(fileIO, successPath).updateModificationTime(currentTime);
}
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws IOExce
}
}

public static SuccessFile fromPath(FileIO fileIO, Path path) throws IOException {
String json = fileIO.readFileUtf8(path);
return SuccessFile.fromJson(json);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
Expand All @@ -49,6 +51,7 @@ public class PartitionMarkDone implements Closeable {
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List<PartitionMarkDoneAction> actions;
private final boolean waitCompaction;

@Nullable
public static PartitionMarkDone create(
Expand Down Expand Up @@ -76,7 +79,14 @@ public static PartitionMarkDone create(
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(table, coreOptions);

return new PartitionMarkDone(partitionComputer, trigger, actions);
// if batch read skip level 0 files, we should wait compaction to mark done
// otherwise, some data may not be readable, and there might be data delays
boolean waitCompaction =
!table.primaryKeys().isEmpty()
&& (coreOptions.deletionVectorsEnabled()
|| coreOptions.mergeEngine() == MergeEngine.FIRST_ROW);

return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction);
}

private static boolean disablePartitionMarkDone(
Expand All @@ -91,30 +101,32 @@ private static boolean disablePartitionMarkDone(
return true;
}

List<String> partitionKeys = table.partitionKeys();
if (partitionKeys.isEmpty()) {
return true;
}

return false;
return table.partitionKeys().isEmpty();
}

public PartitionMarkDone(
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
List<PartitionMarkDoneAction> actions) {
List<PartitionMarkDoneAction> actions,
boolean waitCompaction) {
this.partitionComputer = partitionComputer;
this.trigger = trigger;
this.actions = actions;
this.waitCompaction = waitCompaction;
}

public void notifyCommittable(List<ManifestCommittable> committables) {
Set<BinaryRow> partitions = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.forEach(partitions::add);
for (CommitMessage commitMessage : committable.fileCommittables()) {
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
if (waitCompaction
|| !message.indexIncrement().isEmpty()
|| !message.newFilesIncrement().isEmpty()) {
partitions.add(message.partition());
}
}
if (committable.identifier() == Long.MAX_VALUE) {
endInput = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -57,53 +59,52 @@ public class PartitionMarkDoneTrigger {

private final State state;
private final PartitionTimeExtractor timeExtractor;
private long timeInterval;
private long idleTime;
private final boolean partitionMarkDoneWhenEndInput;
// can be null when markDoneWhenEndInput is true
@Nullable private final Long timeInterval;
// can be null when markDoneWhenEndInput is true
@Nullable private final Long idleTime;
private final boolean markDoneWhenEndInput;
private final Map<String, Long> pendingPartitions;

public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
Duration idleTime,
boolean partitionMarkDoneWhenEndInput)
@Nullable Duration timeInterval,
@Nullable Duration idleTime,
boolean markDoneWhenEndInput)
throws Exception {
this(
state,
timeExtractor,
timeInterval,
idleTime,
System.currentTimeMillis(),
partitionMarkDoneWhenEndInput);
markDoneWhenEndInput);
}

public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
Duration idleTime,
@Nullable Duration timeInterval,
@Nullable Duration idleTime,
long currentTimeMillis,
boolean partitionMarkDoneWhenEndInput)
boolean markDoneWhenEndInput)
throws Exception {
this.pendingPartitions = new HashMap<>();
this.state = state;
this.timeExtractor = timeExtractor;
if (timeInterval != null) {
this.timeInterval = timeInterval.toMillis();
}
if (idleTime != null) {
this.idleTime = idleTime.toMillis();
}
this.partitionMarkDoneWhenEndInput = partitionMarkDoneWhenEndInput;
this.timeInterval = timeInterval == null ? null : timeInterval.toMillis();
this.idleTime = idleTime == null ? null : idleTime.toMillis();
this.markDoneWhenEndInput = markDoneWhenEndInput;
state.restore().forEach(p -> pendingPartitions.put(p, currentTimeMillis));
}

public void notifyPartition(String partition) {
notifyPartition(partition, System.currentTimeMillis());
}

public void notifyPartition(String partition, long currentTimeMillis) {
@VisibleForTesting
void notifyPartition(String partition, long currentTimeMillis) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.put(partition, currentTimeMillis);
}
Expand All @@ -113,11 +114,16 @@ public List<String> donePartitions(boolean endInput) {
return donePartitions(endInput, System.currentTimeMillis());
}

public List<String> donePartitions(boolean endInput, long currentTimeMillis) {
if (endInput && partitionMarkDoneWhenEndInput) {
@VisibleForTesting
List<String> donePartitions(boolean endInput, long currentTimeMillis) {
if (endInput && markDoneWhenEndInput) {
return new ArrayList<>(pendingPartitions.keySet());
}

if (timeInterval == null || idleTime == null) {
return Collections.emptyList();
}

List<String> needDone = new ArrayList<>();
Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
while (iter.hasNext()) {
Expand Down
Loading
Loading