From 054e20eacfe7299f86924ae7be23a5c35f2c5c49 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Thu, 5 Sep 2024 15:25:12 +0200 Subject: [PATCH] [FLINK-25920] Handle duplicate EOI in Sink In case of a failure after final checkpoint, EOI is called twice. SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code. Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation. (cherry picked from commit 37e6724813bd92f8d323ddec80308241f693a5e1) --- .../operators/sink/SinkWriterOperator.java | 58 +++++++++++- .../CheckpointCommittableManagerImpl.java | 27 +++--- .../committables/CommittableCollector.java | 2 +- .../CheckpointCommittableManagerImplTest.java | 14 +-- .../test/streaming/runtime/SinkITCase.java | 90 ++++++++++++++++--- 5 files changed, 159 insertions(+), 32 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 7aedf7a737095..609ed12ff4522 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -51,6 +52,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + import javax.annotation.Nullable; import java.io.IOException; @@ -65,7 +68,7 @@ /** * An operator that processes records to be written into a {@link - * org.apache.flink.api.connector.sink.Sink}. It also has a way to process committables with the + * org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the * same parallelism or send them downstream to a {@link CommitterOperator} with a different * parallelism. * @@ -88,6 +91,13 @@ class SinkWriterOperator extends AbstractStreamOperator committableSerializer; private final List legacyCommittables = new ArrayList<>(); + /** + * Used to remember that EOI has already happened so that we don't emit the last committables of + * the final checkpoints twice. + */ + private static final ListStateDescriptor END_OF_INPUT_STATE_DESC = + new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); + /** The runtime information of the input element. */ private final Context context; @@ -105,6 +115,10 @@ class SinkWriterOperator extends AbstractStreamOperator endOfInputState; SinkWriterOperator( Sink sink, @@ -145,17 +159,48 @@ public void initializeState(StateInitializationContext context) throws Exception legacyCommitterState.clear(); } } + sinkWriter = writerStateHandler.createWriter(initContext, context); + + if (emitDownstream) { + // Figure out if we have seen end of input before and if we can suppress creating + // transactions and sending them downstream to the CommitterOperator. We have the + // following + // cases: + // 1. state is empty: + // - First time initialization + // - Restoring from a previous version of Flink that didn't handle EOI + // - Upscaled from a final or regular checkpoint + // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries + // that the CommitterOperator needs to handle. + // 2. state is not empty: + // - This implies Flink restores from a version that handles EOI. + // - If there is one entry, no rescaling happened (for this subtask), so if it's true, + // we recover from a final checkpoint (for this subtask) and can ignore another EOI + // else we have a regular checkpoint. + // - If there are multiple entries, Flink downscaled, and we need to check if all are + // true and do the same as above. As soon as one entry is false, we regularly start + // the writer and potentially emit duplicate summaries if we indeed recovered from a + // final checkpoint. + endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); + ArrayList previousState = Lists.newArrayList(endOfInputState.get()); + endOfInput = !previousState.isEmpty() && !previousState.contains(false); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); + if (endOfInputState != null) { + endOfInputState.clear(); + endOfInputState.add(this.endOfInput); + } } @Override public void processElement(StreamRecord element) throws Exception { + checkState(!endOfInput, "Received element after endOfInput: %s", element); context.element = element; sinkWriter.write(element.getValue(), context); } @@ -180,9 +225,14 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { - endOfInput = true; - sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + if (!endOfInput) { + endOfInput = true; + if (endOfInputState != null) { + endOfInputState.add(true); + } + sinkWriter.flush(true); + emitCommittables(CommittableMessage.EOI); + } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index b8a740b63cb31..20bdbb9f3045a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -20,6 +20,7 @@ import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; @@ -72,18 +73,24 @@ Collection> getSubtaskCommittableManagers() { return subtasksCommittableManagers.values(); } - void upsertSummary(CommittableSummary summary) { + void addSummary(CommittableSummary summary) { + long checkpointId = summary.getCheckpointIdOrEOI(); SubtaskCommittableManager manager = new SubtaskCommittableManager<>( - summary.getNumberOfCommittables(), - subtaskId, - summary.getCheckpointIdOrEOI(), - metricGroup); - SubtaskCommittableManager existing = - subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); - if (existing != null) { - throw new UnsupportedOperationException( - "Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920"); + summary.getNumberOfCommittables(), subtaskId, checkpointId, metricGroup); + if (checkpointId == CommittableMessage.EOI) { + SubtaskCommittableManager merged = + subtasksCommittableManagers.merge( + summary.getSubtaskId(), manager, SubtaskCommittableManager::merge); + } else { + SubtaskCommittableManager existing = + subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); + if (existing != null) { + throw new UnsupportedOperationException( + String.format( + "Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", + checkpointId, summary.getSubtaskId(), manager, existing)); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 801c84468506a..2dac78c71ea7b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -241,7 +241,7 @@ private void addSummary(CommittableSummary summary) { numberOfSubtasks, summary.getCheckpointIdOrEOI(), metricGroup)) - .upsertSummary(summary); + .addSummary(summary); } private void addCommittable(CommittableWithLineage committable) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index 5bcebda497f85..6c8687b63c92c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -47,7 +47,7 @@ void testAddSummary() { assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty(); final CommittableSummary first = new CommittableSummary<>(1, 1, 1L, 1, 0, 0); - checkpointCommittables.upsertSummary(first); + checkpointCommittables.addSummary(first); assertThat(checkpointCommittables.getSubtaskCommittableManagers()) .hasSize(1) .satisfiesExactly( @@ -60,7 +60,7 @@ void testAddSummary() { // Add different subtask id final CommittableSummary third = new CommittableSummary<>(2, 1, 2L, 2, 1, 1); - checkpointCommittables.upsertSummary(third); + checkpointCommittables.addSummary(third); assertThat(checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2); } @@ -68,8 +68,8 @@ void testAddSummary() { void testCommit() throws IOException, InterruptedException { final CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP); - checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); - checkpointCommittables.upsertSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 1L, 1)); checkpointCommittables.addCommittable(new CommittableWithLineage<>(4, 1L, 2)); @@ -96,10 +96,10 @@ void testCommit() throws IOException, InterruptedException { void testUpdateCommittableSummary() { final CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP); - checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); + checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0)); assertThatThrownBy( () -> - checkpointCommittables.upsertSummary( + checkpointCommittables.addSummary( new CommittableSummary<>(1, 1, 1L, 2, 0, 0))) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("FLINK-25920"); @@ -114,7 +114,7 @@ public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) { final CheckpointCommittableManagerImpl original = new CheckpointCommittableManagerImpl<>( subtaskId, numberOfSubtasks, checkpointId, METRIC_GROUP); - original.upsertSummary( + original.addSummary( new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0)); CheckpointCommittableManagerImpl copy = original.copy(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index b863046863ae4..3802de026ed2c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -20,20 +20,31 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.runtime.operators.sink.TestSink; +import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; -import org.junit.Before; -import org.junit.Test; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -46,7 +57,7 @@ /** * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run time implementation. */ -public class SinkITCase extends AbstractTestBase { +class SinkITCase extends AbstractTestBase { static final List SOURCE_DATA = Arrays.asList( 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, @@ -107,14 +118,17 @@ public class SinkITCase extends AbstractTestBase { COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean(); - @Before + @RegisterExtension + private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); + + @BeforeEach public void init() { COMMIT_QUEUE.clear(); GLOBAL_COMMIT_QUEUE.clear(); } @Test - public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { + void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final FiniteTestSource source = @@ -149,7 +163,7 @@ public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws } @Test - public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { + void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -173,7 +187,7 @@ public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exce } @Test - public void writerAndCommitterExecuteInStreamingMode() throws Exception { + void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final FiniteTestSource source = @@ -191,8 +205,42 @@ public void writerAndCommitterExecuteInStreamingMode() throws Exception { containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray())); } + /** + * Creates a bounded stream with a failing committer. The test verifies that the Sink correctly + * recovers and handles multiple endInput(). + */ + @Test + void duplicateEndInput() throws Exception { + // we need at least 2 attempts but add a bit of a safety margin for unexpected retries + int maxAttempts = 10; + final Configuration conf = new Configuration(); + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, maxAttempts); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(conf); + env.enableCheckpointing(100); + + AtomicBoolean failedOnce = new AtomicBoolean(false); + List committed = new ArrayList<>(); + FailingOnceCommitter committer = + new FailingOnceCommitter( + sharedObjects.add(failedOnce), sharedObjects.add(committed)); + env.fromData("bounded") + .sinkTo(TestSinkV2.newBuilder().setCommitter(committer).build()); + + JobClient jobClient = env.executeAsync(); + // wait for job to finish including restarts + jobClient.getJobExecutionResult().get(); + + // check that we error'ed once as expected + Assertions.assertThat(failedOnce).isTrue(); + // but also eventually succeed to commit (size > 1 in case of unexpected retries) + Assertions.assertThat(committed).isNotEmpty(); + } + @Test - public void writerAndCommitterExecuteInBatchMode() throws Exception { + void writerAndCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -207,7 +255,7 @@ public void writerAndCommitterExecuteInBatchMode() throws Exception { } @Test - public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { + void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); final FiniteTestSource source = @@ -236,7 +284,7 @@ public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { } @Test - public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { + void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { final StreamExecutionEnvironment env = buildBatchEnv(); env.fromData(SOURCE_DATA) @@ -271,4 +319,26 @@ private StreamExecutionEnvironment buildBatchEnv() { env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; } + + static class FailingOnceCommitter extends TestSinkV2.DefaultCommitter { + private final SharedReference failedOnce; + private final SharedReference> committed; + + public FailingOnceCommitter( + SharedReference failedOnce, + SharedReference> committed) { + this.failedOnce = failedOnce; + this.committed = committed; + } + + @Override + public void commit(Collection> committables) { + if (failedOnce.get().compareAndSet(false, true)) { + throw new RuntimeException("Fail to commit"); + } + for (CommitRequest committable : committables) { + this.committed.get().add(committable.getCommittable()); + } + } + } }