diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 609ed12ff4522..7fb78f37c0d81 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -274,7 +274,7 @@ private void emit( int numberOfParallelSubtasks, long checkpointId, Collection committables) { - output.collect( + emit( new StreamRecord<>( new CommittableSummary<>( indexOfThisSubtask, @@ -284,13 +284,18 @@ private void emit( committables.size(), 0))); for (CommT committable : committables) { - output.collect( + emit( new StreamRecord<>( new CommittableWithLineage<>( committable, checkpointId, indexOfThisSubtask))); } } + private void emit(StreamRecord> message) { + LOG.debug("Sending message to committer: {}", message); + output.collect(message); + } + private WriterInitContext createInitContext(OptionalLong restoredCheckpointId) { return new InitContextImpl( getRuntimeContext(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 20bdbb9f3045a..eeac41eb3c98f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -24,6 +24,9 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -43,6 +46,9 @@ class CheckpointCommittableManagerImpl implements CheckpointCommittableMa private final int numberOfSubtasks; private final SinkCommitterMetricGroup metricGroup; + private static final Logger LOG = + LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class); + CheckpointCommittableManagerImpl( int subtaskId, int numberOfSubtasks, @@ -82,6 +88,7 @@ void addSummary(CommittableSummary summary) { SubtaskCommittableManager merged = subtasksCommittableManagers.merge( summary.getSubtaskId(), manager, SubtaskCommittableManager::merge); + LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, merged); } else { SubtaskCommittableManager existing = subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); @@ -90,6 +97,11 @@ void addSummary(CommittableSummary summary) { String.format( "Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", checkpointId, summary.getSubtaskId(), manager, existing)); + } else { + LOG.debug( + "Setting the summary for checkpointId {} with {}", + this.checkpointId, + manager); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index 381cec977f323..c647c8c5d9a7e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -204,4 +204,24 @@ SubtaskCommittableManager copy() { checkpointId, metricGroup); } + + @Override + public String toString() { + return "SubtaskCommittableManager{" + + "requests=" + + requests + + ", numExpectedCommittables=" + + numExpectedCommittables + + ", checkpointId=" + + checkpointId + + ", subtaskId=" + + subtaskId + + ", numDrained=" + + numDrained + + ", numFailed=" + + numFailed + + ", metricGroup=" + + metricGroup + + '}'; + } }