Skip to content

Commit

Permalink
[FLINK-25920] Handle duplicate EOI in Sink
Browse files Browse the repository at this point in the history
In case of a failure after final checkpoint, EOI is called twice.

SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code.

Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation.

(cherry picked from commit 37e6724)
  • Loading branch information
AHeise committed Nov 14, 2024
1 parent 4934ef8 commit 054e20e
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
Expand Down Expand Up @@ -51,6 +52,8 @@
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.UserCodeClassLoader;

import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -65,7 +68,7 @@

/**
* An operator that processes records to be written into a {@link
* org.apache.flink.api.connector.sink.Sink}. It also has a way to process committables with the
* org.apache.flink.api.connector.sink2.Sink}. It also has a way to process committables with the
* same parallelism or send them downstream to a {@link CommitterOperator} with a different
* parallelism.
*
Expand All @@ -88,6 +91,13 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
@Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
private final List<CommT> legacyCommittables = new ArrayList<>();

/**
* Used to remember that EOI has already happened so that we don't emit the last committables of
* the final checkpoints twice.
*/
private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);

/** The runtime information of the input element. */
private final Context<InputT> context;

Expand All @@ -105,6 +115,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
private final MailboxExecutor mailboxExecutor;

private boolean endOfInput = false;
/**
* Remembers the endOfInput state for (final) checkpoints iff the operator emits committables.
*/
@Nullable private ListState<Boolean> endOfInputState;

SinkWriterOperator(
Sink<InputT> sink,
Expand Down Expand Up @@ -145,17 +159,48 @@ public void initializeState(StateInitializationContext context) throws Exception
legacyCommitterState.clear();
}
}

sinkWriter = writerStateHandler.createWriter(initContext, context);

if (emitDownstream) {
// Figure out if we have seen end of input before and if we can suppress creating
// transactions and sending them downstream to the CommitterOperator. We have the
// following
// cases:
// 1. state is empty:
// - First time initialization
// - Restoring from a previous version of Flink that didn't handle EOI
// - Upscaled from a final or regular checkpoint
// In all cases, we regularly handle EOI, potentially resulting in duplicate summaries
// that the CommitterOperator needs to handle.
// 2. state is not empty:
// - This implies Flink restores from a version that handles EOI.
// - If there is one entry, no rescaling happened (for this subtask), so if it's true,
// we recover from a final checkpoint (for this subtask) and can ignore another EOI
// else we have a regular checkpoint.
// - If there are multiple entries, Flink downscaled, and we need to check if all are
// true and do the same as above. As soon as one entry is false, we regularly start
// the writer and potentially emit duplicate summaries if we indeed recovered from a
// final checkpoint.
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
endOfInput = !previousState.isEmpty() && !previousState.contains(false);
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
writerStateHandler.snapshotState(context.getCheckpointId());
if (endOfInputState != null) {
endOfInputState.clear();
endOfInputState.add(this.endOfInput);
}
}

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
checkState(!endOfInput, "Received element after endOfInput: %s", element);
context.element = element;
sinkWriter.write(element.getValue(), context);
}
Expand All @@ -180,9 +225,14 @@ public void processWatermark(Watermark mark) throws Exception {

@Override
public void endInput() throws Exception {
endOfInput = true;
sinkWriter.flush(true);
emitCommittables(CommittableMessage.EOI);
if (!endOfInput) {
endOfInput = true;
if (endOfInputState != null) {
endOfInputState.add(true);
}
sinkWriter.flush(true);
emitCommittables(CommittableMessage.EOI);
}
}

private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;

Expand Down Expand Up @@ -72,18 +73,24 @@ Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
return subtasksCommittableManagers.values();
}

void upsertSummary(CommittableSummary<CommT> summary) {
void addSummary(CommittableSummary<CommT> summary) {
long checkpointId = summary.getCheckpointIdOrEOI();
SubtaskCommittableManager<CommT> manager =
new SubtaskCommittableManager<>(
summary.getNumberOfCommittables(),
subtaskId,
summary.getCheckpointIdOrEOI(),
metricGroup);
SubtaskCommittableManager<CommT> existing =
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
if (existing != null) {
throw new UnsupportedOperationException(
"Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920");
summary.getNumberOfCommittables(), subtaskId, checkpointId, metricGroup);
if (checkpointId == CommittableMessage.EOI) {
SubtaskCommittableManager<CommT> merged =
subtasksCommittableManagers.merge(
summary.getSubtaskId(), manager, SubtaskCommittableManager::merge);
} else {
SubtaskCommittableManager<CommT> existing =
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
if (existing != null) {
throw new UnsupportedOperationException(
String.format(
"Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920",
checkpointId, summary.getSubtaskId(), manager, existing));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void addSummary(CommittableSummary<CommT> summary) {
numberOfSubtasks,
summary.getCheckpointIdOrEOI(),
metricGroup))
.upsertSummary(summary);
.addSummary(summary);
}

private void addCommittable(CommittableWithLineage<CommT> committable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testAddSummary() {
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty();

final CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 1L, 1, 0, 0);
checkpointCommittables.upsertSummary(first);
checkpointCommittables.addSummary(first);
assertThat(checkpointCommittables.getSubtaskCommittableManagers())
.hasSize(1)
.satisfiesExactly(
Expand All @@ -60,16 +60,16 @@ void testAddSummary() {

// Add different subtask id
final CommittableSummary<Integer> third = new CommittableSummary<>(2, 1, 2L, 2, 1, 1);
checkpointCommittables.upsertSummary(third);
checkpointCommittables.addSummary(third);
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2);
}

@Test
void testCommit() throws IOException, InterruptedException {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables =
new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0));
checkpointCommittables.upsertSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0));
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0));
checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 2, 0, 0));
checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 1L, 1));
checkpointCommittables.addCommittable(new CommittableWithLineage<>(4, 1L, 2));

Expand All @@ -96,10 +96,10 @@ void testCommit() throws IOException, InterruptedException {
void testUpdateCommittableSummary() {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables =
new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0));
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0, 0));
assertThatThrownBy(
() ->
checkpointCommittables.upsertSummary(
checkpointCommittables.addSummary(
new CommittableSummary<>(1, 1, 1L, 2, 0, 0)))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("FLINK-25920");
Expand All @@ -114,7 +114,7 @@ public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) {
final CheckpointCommittableManagerImpl<Integer> original =
new CheckpointCommittableManagerImpl<>(
subtaskId, numberOfSubtasks, checkpointId, METRIC_GROUP);
original.upsertSummary(
original.addSummary(
new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0));

CheckpointCommittableManagerImpl<Integer> copy = original.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,31 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;

import org.junit.Before;
import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -46,7 +57,7 @@
/**
* Integration test for {@link org.apache.flink.api.connector.sink.Sink} run time implementation.
*/
public class SinkITCase extends AbstractTestBase {
class SinkITCase extends AbstractTestBase {
static final List<Integer> SOURCE_DATA =
Arrays.asList(
895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970,
Expand Down Expand Up @@ -107,14 +118,17 @@ public class SinkITCase extends AbstractTestBase {
COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean()
&& GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();

@Before
@RegisterExtension
private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

@BeforeEach
public void init() {
COMMIT_QUEUE.clear();
GLOBAL_COMMIT_QUEUE.clear();
}

@Test
public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception {
void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception {
final StreamExecutionEnvironment env = buildStreamEnv();

final FiniteTestSource<Integer> source =
Expand Down Expand Up @@ -149,7 +163,7 @@ public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws
}

@Test
public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
final StreamExecutionEnvironment env = buildBatchEnv();

env.fromData(SOURCE_DATA)
Expand All @@ -173,7 +187,7 @@ public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exce
}

@Test
public void writerAndCommitterExecuteInStreamingMode() throws Exception {
void writerAndCommitterExecuteInStreamingMode() throws Exception {
final StreamExecutionEnvironment env = buildStreamEnv();

final FiniteTestSource<Integer> source =
Expand All @@ -191,8 +205,42 @@ public void writerAndCommitterExecuteInStreamingMode() throws Exception {
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
}

/**
* Creates a bounded stream with a failing committer. The test verifies that the Sink correctly
* recovers and handles multiple endInput().
*/
@Test
void duplicateEndInput() throws Exception {
// we need at least 2 attempts but add a bit of a safety margin for unexpected retries
int maxAttempts = 10;
final Configuration conf = new Configuration();
conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, maxAttempts);

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.enableCheckpointing(100);

AtomicBoolean failedOnce = new AtomicBoolean(false);
List<String> committed = new ArrayList<>();
FailingOnceCommitter committer =
new FailingOnceCommitter(
sharedObjects.add(failedOnce), sharedObjects.add(committed));
env.<Object>fromData("bounded")
.sinkTo(TestSinkV2.newBuilder().setCommitter(committer).build());

JobClient jobClient = env.executeAsync();
// wait for job to finish including restarts
jobClient.getJobExecutionResult().get();

// check that we error'ed once as expected
Assertions.assertThat(failedOnce).isTrue();
// but also eventually succeed to commit (size > 1 in case of unexpected retries)
Assertions.assertThat(committed).isNotEmpty();
}

@Test
public void writerAndCommitterExecuteInBatchMode() throws Exception {
void writerAndCommitterExecuteInBatchMode() throws Exception {
final StreamExecutionEnvironment env = buildBatchEnv();

env.fromData(SOURCE_DATA)
Expand All @@ -207,7 +255,7 @@ public void writerAndCommitterExecuteInBatchMode() throws Exception {
}

@Test
public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
final StreamExecutionEnvironment env = buildStreamEnv();

final FiniteTestSource<Integer> source =
Expand Down Expand Up @@ -236,7 +284,7 @@ public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
}

@Test
public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
final StreamExecutionEnvironment env = buildBatchEnv();

env.fromData(SOURCE_DATA)
Expand Down Expand Up @@ -271,4 +319,26 @@ private StreamExecutionEnvironment buildBatchEnv() {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
return env;
}

static class FailingOnceCommitter extends TestSinkV2.DefaultCommitter {
private final SharedReference<AtomicBoolean> failedOnce;
private final SharedReference<List<String>> committed;

public FailingOnceCommitter(
SharedReference<AtomicBoolean> failedOnce,
SharedReference<List<String>> committed) {
this.failedOnce = failedOnce;
this.committed = committed;
}

@Override
public void commit(Collection<CommitRequest<String>> committables) {
if (failedOnce.get().compareAndSet(false, true)) {
throw new RuntimeException("Fail to commit");
}
for (CommitRequest<String> committable : committables) {
this.committed.get().add(committable.getCommittable());
}
}
}
}

0 comments on commit 054e20e

Please sign in to comment.