Skip to content

Commit

Permalink
[FLINK-25920] Improve logging in committable handling of the sink
Browse files Browse the repository at this point in the history
(cherry picked from commit 2cdd3f0)
  • Loading branch information
AHeise committed Nov 14, 2024
1 parent 054e20e commit ff5e96f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private void emit(
int numberOfParallelSubtasks,
long checkpointId,
Collection<CommT> committables) {
output.collect(
emit(
new StreamRecord<>(
new CommittableSummary<>(
indexOfThisSubtask,
Expand All @@ -284,13 +284,18 @@ private void emit(
committables.size(),
0)));
for (CommT committable : committables) {
output.collect(
emit(
new StreamRecord<>(
new CommittableWithLineage<>(
committable, checkpointId, indexOfThisSubtask)));
}
}

private void emit(StreamRecord<CommittableMessage<CommT>> message) {
LOG.debug("Sending message to committer: {}", message);
output.collect(message);
}

private WriterInitContext createInitContext(OptionalLong restoredCheckpointId) {
return new InitContextImpl(
getRuntimeContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -43,6 +46,9 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa
private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;

private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);

CheckpointCommittableManagerImpl(
int subtaskId,
int numberOfSubtasks,
Expand Down Expand Up @@ -82,6 +88,7 @@ void addSummary(CommittableSummary<CommT> summary) {
SubtaskCommittableManager<CommT> merged =
subtasksCommittableManagers.merge(
summary.getSubtaskId(), manager, SubtaskCommittableManager::merge);
LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, merged);
} else {
SubtaskCommittableManager<CommT> existing =
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
Expand All @@ -90,6 +97,11 @@ void addSummary(CommittableSummary<CommT> summary) {
String.format(
"Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920",
checkpointId, summary.getSubtaskId(), manager, existing));
} else {
LOG.debug(
"Setting the summary for checkpointId {} with {}",
this.checkpointId,
manager);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,24 @@ SubtaskCommittableManager<CommT> copy() {
checkpointId,
metricGroup);
}

@Override
public String toString() {
return "SubtaskCommittableManager{"
+ "requests="
+ requests
+ ", numExpectedCommittables="
+ numExpectedCommittables
+ ", checkpointId="
+ checkpointId
+ ", subtaskId="
+ subtaskId
+ ", numDrained="
+ numDrained
+ ", numFailed="
+ numFailed
+ ", metricGroup="
+ metricGroup
+ '}';
}
}

0 comments on commit ff5e96f

Please sign in to comment.