Skip to content

Commit

Permalink
[FLINK-25920] Fix AbstractStreamingWriter sending after EOI
Browse files Browse the repository at this point in the history
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.

(cherry picked from commit 6d60f41)
  • Loading branch information
AHeise committed Nov 14, 2024
1 parent ff5e96f commit 3c968e9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -740,29 +740,31 @@ Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.open
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (BatchFileWriter.java:116)
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<T>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0)
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:106)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:125)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:111)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:130)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:167)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:90)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:157)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:158)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:122)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:101)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:115)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:122)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:98)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:142)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:145)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:143)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:145)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:144)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:213)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:109)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:202)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:203)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:141)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.shaded.guava31.com.google.common.collect.Lists.newArrayList(java.lang.Iterable)> in (AbstractStreamingWriter.java:164)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:120)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:134)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:141)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:117)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:183)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:186)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:184)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:186)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:185)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (AbstractStreamingWriter.java:182)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IN>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:131)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:171)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitTrigger.create(boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, java.util.List, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService)> has parameter of type <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService> in (PartitionCommitTrigger.java:0)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (PartitionCommitter.java:167)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (PartitionCommitter.java:172)
Expand Down Expand Up @@ -829,6 +831,7 @@ Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCo
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:43)
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:44)
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> in (StandardDeCompressors.java:46)
Static Initializer <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.BooleanSerializer.INSTANCE> in (AbstractStreamingWriter.java:74)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:52)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:56)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:56)
Expand Down
Loading

0 comments on commit 3c968e9

Please sign in to comment.