diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index db2422980825e..276abf73a6b10 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -740,29 +740,31 @@ Method calls method in (BatchFileWriter.java:116) Method has generic parameter type > with type argument depending on in (BatchFileWriter.java:0) Method has parameter of type in (BatchFileWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:106) +Method calls method in (AbstractStreamingWriter.java:125) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:111) +Method calls method in (AbstractStreamingWriter.java:130) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:167) -Method calls method in (AbstractStreamingWriter.java:90) -Method calls method in (AbstractStreamingWriter.java:157) -Method calls method in (AbstractStreamingWriter.java:158) -Method calls constructor (org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:122) -Method calls method in (AbstractStreamingWriter.java:101) -Method calls method in (AbstractStreamingWriter.java:115) -Method calls method in (AbstractStreamingWriter.java:122) -Method calls method in (AbstractStreamingWriter.java:98) -Method calls method in (AbstractStreamingWriter.java:142) -Method calls method in (AbstractStreamingWriter.java:145) -Method calls method in (AbstractStreamingWriter.java:143) -Method calls method in (AbstractStreamingWriter.java:145) -Method calls method in (AbstractStreamingWriter.java:144) +Method calls method in (AbstractStreamingWriter.java:213) +Method calls method in (AbstractStreamingWriter.java:109) +Method calls method in (AbstractStreamingWriter.java:202) +Method calls method in (AbstractStreamingWriter.java:203) +Method calls constructor (org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:141) +Method calls method in (AbstractStreamingWriter.java:164) +Method calls method in (AbstractStreamingWriter.java:120) +Method calls method in (AbstractStreamingWriter.java:134) +Method calls method in (AbstractStreamingWriter.java:141) +Method calls method in (AbstractStreamingWriter.java:117) +Method calls method in (AbstractStreamingWriter.java:183) +Method calls method in (AbstractStreamingWriter.java:186) +Method calls method in (AbstractStreamingWriter.java:184) +Method calls method in (AbstractStreamingWriter.java:186) +Method calls method in (AbstractStreamingWriter.java:185) +Method calls method in (AbstractStreamingWriter.java:182) Method has generic parameter type > with type argument depending on in (AbstractStreamingWriter.java:0) Method has parameter of type in (AbstractStreamingWriter.java:0) -Method calls method in (AbstractStreamingWriter.java:131) +Method calls method in (AbstractStreamingWriter.java:171) Method has parameter of type in (PartitionCommitTrigger.java:0) Method calls method in (PartitionCommitter.java:167) Method calls method in (PartitionCommitter.java:172) @@ -829,6 +831,7 @@ Static Initializer ()> calls method in (StandardDeCompressors.java:43) Static Initializer ()> calls method in (StandardDeCompressors.java:44) Static Initializer ()> calls method in (StandardDeCompressors.java:46) +Static Initializer ()> gets field in (AbstractStreamingWriter.java:74) Static Initializer ()> calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:52) Static Initializer ()> calls constructor (org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:56) Static Initializer ()> gets field in (PartitionTimeCommitTrigger.java:56) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java index 8a326311ce038..e01b9c9ac53c0 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.file.table.stream; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -33,6 +36,12 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -58,6 +67,16 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe protected transient long currentWatermark; + /** + * Used to remember that EOI has already happened so that we don't emit the last committables of + * the final checkpoints twice. + */ + private static final ListStateDescriptor END_OF_INPUT_STATE_DESC = + new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); + + private boolean endOfInput; + private ListState endOfInputState; + public AbstractStreamingWriter( long bucketCheckInterval, StreamingFileSink.BucketsBuilder< @@ -123,6 +142,27 @@ public void bucketInactive(Bucket bucket) { bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + + // Figure out if we have seen end of input before and if we should anything downstream. We + // have the following + // cases: + // 1. state is empty: + // - First time initialization + // - Restoring from a previous version of Flink that didn't handle EOI + // - Upscaled from a final or regular checkpoint + // In all cases, we regularly handle EOI, potentially resulting in unnecessary . + // 2. state is not empty: + // - This implies Flink restores from a version that handles EOI. + // - If there is one entry, no rescaling happened (for this subtask), so if it's true, + // we recover from a final checkpoint (for this subtask) and can ignore another EOI + // else we have a regular checkpoint. + // - If there are multiple entries, Flink downscaled, and we need to check if all are + // true and do the same as above. As soon as one entry is false, we regularly start + // the writer and potentially emit duplicate summaries if we indeed recovered from a + // final checkpoint. + endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); + List previousState = Lists.newArrayList(endOfInputState.get()); + endOfInput = !previousState.isEmpty() && !previousState.contains(false); } @Override @@ -139,6 +179,7 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void processElement(StreamRecord element) throws Exception { + checkState(!endOfInput, "Received element after endOfInput: %s", element); helper.onElement( element.getValue(), getProcessingTimeService().getCurrentProcessingTime(), @@ -149,15 +190,20 @@ public void processElement(StreamRecord element) throws Exception { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - commitUpToCheckpoint(checkpointId); + if (!this.endOfInput) { + commitUpToCheckpoint(checkpointId); + } } @Override public void endInput() throws Exception { - buckets.onProcessingTime(Long.MAX_VALUE); - helper.snapshotState(Long.MAX_VALUE); - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - commitUpToCheckpoint(Long.MAX_VALUE); + if (!this.endOfInput) { + this.endOfInput = true; + buckets.onProcessingTime(Long.MAX_VALUE); + helper.snapshotState(Long.MAX_VALUE); + output.emitWatermark(new Watermark(Long.MAX_VALUE)); + commitUpToCheckpoint(Long.MAX_VALUE); + } } @Override