From a79a649eaa6ad851078dd1f5630fc3009d0c1cc3 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Mon, 16 Sep 2024 22:30:59 +0200 Subject: [PATCH] [FLINK-36287] Disallow UC for inner sink channels Between sink writer, the committer, and any pre/post commit topology (including global committer), we don't send actual payload but just committables. These committables must be committed on notifyCheckpointCompleted. However, if a barrier overtakes these committables, they may only be read after the RPC call has been made leading to violations. In particular, we could even have these issues during a final checkpoint. This commit generalizes the way, we disable UC for broadcast and pointwise connections, such that the SinkTransformationTranslator can also disable it for other distribution pattern. This commit also Simplifies committer with UC disabled: Without unaligned checkpoints, we receive all committables of a given upstream task before the respective barrier. Thus, when the barrier reaches the committer, all committables of a specific checkpoint must have been received. Committing happens even later on notifyCheckpointComplete. Added an assertion that verifies that all committables are indeed collected on commit. Note that this change also works when we recover a sink with channel state as it will only be called on the next (partially aligned) checkpoint. (cherry picked from commit 9d21878befa72f3c2b8f005c137544e2467d5a5e) --- .../api/graph/StreamGraphGenerator.java | 9 +-- .../ForwardForConsecutiveHashPartitioner.java | 5 ++ .../partitioner/StreamPartitioner.java | 15 +++++ .../SinkTransformationTranslator.java | 65 +++++++++++++++++-- ...inkTransformationTranslatorITCaseBase.java | 6 ++ .../SinkV1TransformationTranslatorITCase.java | 8 ++- .../sink/CommitterOperatorTestBase.java | 42 +----------- .../test/streaming/runtime/SinkITCase.java | 34 ++++++++-- 8 files changed, 124 insertions(+), 60 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 46cd69ce9fbf1..389a56acdf079 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -70,7 +70,6 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; import org.apache.flink.streaming.api.transformations.WithBoundedness; -import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator; import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator; import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator; @@ -287,7 +286,8 @@ public StreamGraph generate() { setFineGrainedGlobalStreamExchangeMode(streamGraph); for (StreamNode node : streamGraph.getStreamNodes()) { - if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) { + if (node.getInEdges().stream() + .anyMatch(e -> !e.getPartitioner().isSupportsUnalignedCheckpoint())) { for (StreamEdge edge : node.getInEdges()) { edge.setSupportsUnalignedCheckpoints(false); } @@ -303,11 +303,6 @@ public StreamGraph generate() { return builtStreamGraph; } - private boolean shouldDisableUnalignedCheckpointing(StreamEdge edge) { - StreamPartitioner partitioner = edge.getPartitioner(); - return partitioner.isPointwise() || partitioner.isBroadcast(); - } - private void setDynamic(final StreamGraph graph) { Optional schedulerTypeOptional = executionConfig.getSchedulerType(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java index d179ac26beedd..50dcadd4e28da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitioner.java @@ -101,6 +101,11 @@ public boolean isPointwise() { return true; } + @Override + public void disableUnalignedCheckpoints() { + hashPartitioner.disableUnalignedCheckpoints(); + } + @Override public int selectChannel(SerializationDelegate> record) { throw new RuntimeException( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java index e073a0d502a2b..7a07c8bfa9e00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java @@ -34,6 +34,13 @@ public abstract class StreamPartitioner protected int numberOfChannels; + /** + * By default, all partitioner except {@link #isBroadcast()} or {@link #isPointwise()} support + * unaligned checkpoints. However, transformations may disable unaligned checkpoints for + * specific cases. + */ + private boolean supportsUnalignedCheckpoint = true; + @Override public void setup(int numberOfChannels) { this.numberOfChannels = numberOfChannels; @@ -78,4 +85,12 @@ public SubtaskStateMapper getUpstreamSubtaskStateMapper() { public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper(); public abstract boolean isPointwise(); + + public boolean isSupportsUnalignedCheckpoint() { + return supportsUnalignedCheckpoint && !isPointwise() && !isBroadcast(); + } + + public void disableUnalignedCheckpoints() { + this.supportsUnalignedCheckpoint = false; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index aee412dc8a1e6..f3c0400476aa7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.TransformationTranslator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.streaming.api.transformations.SinkTransformation; @@ -48,10 +49,14 @@ import javax.annotation.Nullable; +import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; @@ -170,11 +175,9 @@ private void expand() { sink instanceof SupportsConcurrentExecutionAttempts); } - final List> sinkTransformations = - executionEnvironment - .getTransformations() - .subList(sizeBefore, executionEnvironment.getTransformations().size()); - sinkTransformations.forEach(context::transform); + getSinkTransformations(sizeBefore).forEach(context::transform); + + disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore)); // Remove all added sink subtransformations to avoid duplications and allow additional // expansions @@ -185,6 +188,58 @@ private void expand() { } } + private List> getSinkTransformations(int sizeBefore) { + return executionEnvironment + .getTransformations() + .subList(sizeBefore, executionEnvironment.getTransformations().size()); + } + + /** + * Disables UC for all connections of operators within the sink expansion. This is necessary + * because committables need to be at the respective operators on notifyCheckpointComplete + * or else we can't commit all side-effects, which violates the contract of + * notifyCheckpointComplete. + */ + private void disallowUnalignedCheckpoint(List> sinkTransformations) { + Optional> writerOpt = + sinkTransformations.stream().filter(SinkExpander::isWriter).findFirst(); + Preconditions.checkState(writerOpt.isPresent(), "Writer transformation not found."); + Transformation writer = writerOpt.get(); + int indexOfWriter = sinkTransformations.indexOf(writer); + + // check all transformation after the writer and recursively disable UC for all inputs + // up to the writer + Set seen = new HashSet<>(writer.getId()); + Queue> pending = + new ArrayDeque<>( + sinkTransformations.subList( + indexOfWriter + 1, sinkTransformations.size())); + + while (!pending.isEmpty()) { + Transformation current = pending.poll(); + seen.add(current.getId()); + + for (Transformation input : current.getInputs()) { + if (input instanceof PartitionTransformation) { + ((PartitionTransformation) input) + .getPartitioner() + .disableUnalignedCheckpoints(); + } + if (seen.add(input.getId())) { + pending.add(input); + } + } + } + } + + private static boolean isWriter(Transformation t) { + if (!(t instanceof OneInputTransformation)) { + return false; + } + return ((OneInputTransformation) t).getOperatorFactory() + instanceof SinkWriterOperatorFactory; + } + private void addCommittingTopology( Sink sink, DataStream inputStream) { SupportsCommitter committingSink = (SupportsCommitter) sink; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java index bcfa377595da7..039df435a48b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -111,6 +112,7 @@ public void generateWriterCommitterTopology() { findNodeName(streamGraph, name -> name.contains("Committer")); assertThat(streamGraph.getStreamNodes().size(), equalTo(3)); + assertNoUnalignedOutput(writerNode); validateTopology( writerNode, @@ -213,6 +215,10 @@ void validateTopology( assertThat(dest.getSlotSharingGroup(), equalTo(SLOT_SHARE_GROUP)); } + protected static void assertNoUnalignedOutput(StreamNode src) { + Assertions.assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints()); + } + StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) { return buildGraph(sink, runtimeExecutionMode, true); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java index 7d3614a5cd29f..6659fbebc1862 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java @@ -86,8 +86,8 @@ public void generateWriterCommitterGlobalCommitterTopology() { SinkWriterOperatorFactory.class, PARALLELISM, -1); + assertNoUnalignedOutput(writerNode); - StreamNode lastNode; if (runtimeExecutionMode == RuntimeExecutionMode.STREAMING) { // in streaming writer and committer are merged into one operator assertThat(streamGraph.getStreamNodes().size(), equalTo(4)); @@ -100,12 +100,12 @@ public void generateWriterCommitterGlobalCommitterTopology() { CommitterOperatorFactory.class, PARALLELISM, -1); + assertNoUnalignedOutput(committerNode); } - lastNode = committerNode; final StreamNode globalCommitterNode = findGlobalCommitter(streamGraph); validateTopology( - lastNode, + committerNode, SimpleVersionedSerializerTypeSerializerProxy.class, globalCommitterNode, SimpleOperatorFactory.class, @@ -139,6 +139,7 @@ public void generateWriterGlobalCommitterTopology() { final StreamNode committerNode = findCommitter(streamGraph); final StreamNode globalCommitterNode = findGlobalCommitter(streamGraph); + assertNoUnalignedOutput(writerNode); validateTopology( writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, @@ -147,6 +148,7 @@ public void generateWriterGlobalCommitterTopology() { PARALLELISM, -1); + assertNoUnalignedOutput(committerNode); validateTopology( committerNode, SimpleVersionedSerializerTypeSerializerProxy.class, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index 6e7674b47a960..dec6347847517 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -40,6 +40,7 @@ import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; abstract class CommitterOperatorTestBase { @@ -86,7 +87,7 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { } @Test - void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception { + void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { SinkAndCounters sinkAndCounters = sinkWithPostCommit(); final OneInputStreamOperatorTestHarness< CommittableMessage, CommittableMessage> @@ -109,8 +110,7 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exceptio final CommittableWithLineage second = new CommittableWithLineage<>("2", 1L, 1); testHarness.processElement(new StreamRecord<>(second)); - // Trigger commit Retry - testHarness.getProcessingTimeService().setCurrentTime(2000); + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); final List output = fromOutput(testHarness.getOutput()); assertThat(output).hasSize(3); @@ -126,42 +126,6 @@ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exceptio testHarness.close(); } - @Test - void testImmediatelyCommitLateCommittables() throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - - final OneInputStreamOperatorTestHarness< - CommittableMessage, CommittableMessage> - testHarness = createTestHarness(sinkAndCounters.sink, false, true); - testHarness.open(); - - final CommittableSummary committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - - // Receive notify checkpoint completed before the last data. This might happen for unaligned - // checkpoints. - testHarness.notifyOfCompletedCheckpoint(1); - - assertThat(testHarness.getOutput()).isEmpty(); - - final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); - - // Commit elements with lower or equal the latest checkpoint id immediately - testHarness.processElement(new StreamRecord<>(first)); - - final List output = fromOutput(testHarness.getOutput()); - assertThat(output).hasSize(2); - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); - SinkV2Assertions.assertThat(toCommittableSummary(output.get(0))) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()) - .hasPendingCommittables(0); - SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1))) - .isEqualTo(copyCommittableWithDifferentOrigin(first, 0)); - testHarness.close(); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index 76eacb20d7df0..aa658df5aca3d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -24,6 +24,8 @@ import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.runtime.operators.sink.TestSink; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; import org.apache.flink.streaming.util.FiniteTestSource; @@ -142,7 +144,7 @@ void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Excepti () -> GLOBAL_COMMIT_QUEUE) .build()); - env.execute(); + executeAndVerifyStreamGraph(env); // TODO: At present, for a bounded scenario, the occurrence of final checkpoint is not a // deterministic event, so @@ -172,7 +174,7 @@ void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception { () -> GLOBAL_COMMIT_QUEUE) .build()); - env.execute(); + executeAndVerifyStreamGraph(env); assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE); @@ -193,7 +195,8 @@ void writerAndCommitterExecuteInStreamingMode() throws Exception { .setDefaultCommitter( (Supplier> & Serializable) () -> COMMIT_QUEUE) .build()); - env.execute(); + + executeAndVerifyStreamGraph(env); assertThat(COMMIT_QUEUE) .containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE); } @@ -242,7 +245,9 @@ void writerAndCommitterExecuteInBatchMode() throws Exception { .setDefaultCommitter( (Supplier> & Serializable) () -> COMMIT_QUEUE) .build()); - env.execute(); + + executeAndVerifyStreamGraph(env); + assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE); } @@ -261,7 +266,7 @@ void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { () -> GLOBAL_COMMIT_QUEUE) .build()); - env.execute(); + executeAndVerifyStreamGraph(env); // TODO: At present, for a bounded scenario, the occurrence of final checkpoint is not a // deterministic event, so @@ -285,12 +290,29 @@ void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { (Supplier> & Serializable) () -> GLOBAL_COMMIT_QUEUE) .build()); - env.execute(); + + executeAndVerifyStreamGraph(env); assertThat(GLOBAL_COMMIT_QUEUE) .containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE); } + private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) throws Exception { + StreamGraph streamGraph = env.getStreamGraph(); + assertNoUnalignedCheckpointInSink(streamGraph); + env.execute(streamGraph); + } + + private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) { + // all the edges out of the sink nodes should not support unaligned checkpoints + // we rely on other tests that this property is correctly used. + assertThat(streamGraph.getStreamNodes()) + .filteredOn(t -> t.getOperatorName().contains("Sink")) + .flatMap(StreamNode::getOutEdges) + .allMatch(e -> !e.supportsUnalignedCheckpoints()) + .isNotEmpty(); + } + private static List getSplitGlobalCommittedData() { return GLOBAL_COMMIT_QUEUE.stream() .flatMap(x -> Arrays.stream(x.split("\\+")))