From 4934ef87b152194c78779a56968866bfa651f187 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Tue, 10 Sep 2024 11:47:02 +0200 Subject: [PATCH] [FLINK-25920] Refactor & Revise SinkWriterOperatorTestBase The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate the state which was never clean. In particular, it also stored the input elements in state until EOI arrived and emitted them all at once. For state restoration tests, we emitted records after EOI arrived. This commit changed the writer state completely to just capture the record count, which is much more realistic than storing actual payload. The tests now directly assert on the state instead of output. This commit also introduces an adaptor for serializing basic types in the writer state and replaces the hard-to-maintain SinkAndSuppliers with an InspectableSink in the sink writer tests that require an abstraction on top of the different Sink flavors. (cherry picked from commit 4217408576552fc929d9c8331495a9282064cd9c) --- .../io/SimpleVersionedSerializerAdapter.java | 58 +++++ .../SinkV1TransformationTranslatorITCase.java | 8 +- .../graph/StreamingJobGraphGeneratorTest.java | 5 +- .../runtime/operators/sink/SinkTestUtil.java | 4 +- ...SinkV2CommitterOperatorDeprecatedTest.java | 3 - .../sink/SinkV2CommitterOperatorTest.java | 13 +- ...inkV2SinkWriterOperatorDeprecatedTest.java | 88 +++---- .../sink/SinkV2SinkWriterOperatorTest.java | 76 +++--- .../sink/SinkWriterOperatorTestBase.java | 217 ++++++++---------- .../runtime/operators/sink/TestSink.java | 98 +++----- .../runtime/operators/sink/TestSinkV2.java | 111 +++++---- .../WithAdapterCommitterOperatorTest.java | 13 +- .../WithAdapterSinkWriterOperatorTest.java | 84 +++---- .../operators/sink/deprecated/TestSinkV2.java | 82 +++---- .../exec/common/CommonExecSinkITCase.java | 5 +- .../test/streaming/runtime/SinkITCase.java | 7 +- .../SinkV2MetricsDeprecatedITCase.java | 1 - .../runtime/SinkV2MetricsITCase.java | 5 +- 18 files changed, 403 insertions(+), 475 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java new file mode 100644 index 0000000000000..03056be26a99d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Adapter for {@link TypeSerializer} to {@link SimpleVersionedSerializer}. The implementation is + * naive and should only be used for non-critical paths and tests. + */ +@Internal +public class SimpleVersionedSerializerAdapter + implements SimpleVersionedSerializer, Serializable { + private final TypeSerializer serializer; + + public SimpleVersionedSerializerAdapter(TypeSerializer serializer) { + this.serializer = serializer; + } + + public int getVersion() { + return serializer.snapshotConfiguration().getCurrentVersion(); + } + + public byte[] serialize(T value) throws IOException { + DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(10); + serializer.serialize(value, dataOutputSerializer); + return dataOutputSerializer.getCopyOfBuffer(); + } + + public T deserialize(int version, byte[] serialized) throws IOException { + DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(serialized); + T value = serializer.deserialize(dataInputDeserializer); + dataInputDeserializer.releaseArrays(); + return value; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java index d1cd770b7fb3a..7d3614a5cd29f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java @@ -122,13 +122,7 @@ public void generateWriterCommitterGlobalCommitterTopology() { public void generateWriterGlobalCommitterTopology() { final StreamGraph streamGraph = buildGraph( - TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .setGlobalCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .setDefaultGlobalCommitter() - .build(), + TestSink.newBuilder().setDefaultGlobalCommitter().build(), runtimeExecutionMode); final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 1e7c6f7ce3b45..c03384625ebc7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -54,6 +55,7 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -115,7 +117,6 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.StreamExchangeMode; -import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; @@ -2659,7 +2660,7 @@ public DataStream> addPreCommitTopology( @Override public SimpleVersionedSerializer getWriteResultSerializer() { - return new TestSinkV2.StringSerializer(); + return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java index b457b5a0b7825..cfdae58596c3b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java @@ -52,7 +52,7 @@ static List toBytes(Collection elements) { static byte[] toBytes(String obj) { try { return SimpleVersionedSerialization.writeVersionAndSerialize( - TestSinkV2.StringSerializer.INSTANCE, obj); + TestSinkV2.COMMITTABLE_SERIALIZER, obj); } catch (IOException e) { throw new IllegalStateException(e); } @@ -83,7 +83,7 @@ static String fromRecord(StreamRecord obj) { static String fromBytes(byte[] obj) { try { return SimpleVersionedSerialization.readVersionAndDeSerialize( - TestSinkV2.StringSerializer.INSTANCE, obj); + TestSinkV2.COMMITTABLE_SERIALIZER, obj); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java index 51d2fa0d8d026..3aacaa6d1f8e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java @@ -35,7 +35,6 @@ SinkAndCounters sinkWithPostCommit() { (TwoPhaseCommittingSink) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> committer.successfulCommits); @@ -47,7 +46,6 @@ SinkAndCounters sinkWithPostCommitWithRetry() { (TwoPhaseCommittingSink) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> 0); @@ -60,7 +58,6 @@ SinkAndCounters sinkWithoutPostCommit() { (TwoPhaseCommittingSink) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(false) .build(), () -> committer.successfulCommits); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 349e3961e4388..0112b5cf8625e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -30,7 +30,6 @@ SinkAndCounters sinkWithPostCommit() { (SupportsCommitter) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> committer.successfulCommits); @@ -42,7 +41,6 @@ SinkAndCounters sinkWithPostCommitWithRetry() { (SupportsCommitter) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> 0); @@ -52,12 +50,11 @@ SinkAndCounters sinkWithPostCommitWithRetry() { SinkAndCounters sinkWithoutPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( - (SupportsCommitter) - TestSinkV2.newBuilder() - .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) - .setWithPostCommitTopology(false) - .build(), + TestSinkV2.newBuilder() + .setCommitter(committer) + .setWithPostCommitTopology(false) + .build() + .asSupportsCommitter(), () -> committer.successfulCommits); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java index f7fff71c53b30..4e0c3ad0cec11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java @@ -18,16 +18,13 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2; -import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList; - -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -35,67 +32,49 @@ */ @Deprecated class SinkV2SinkWriterOperatorDeprecatedTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSinkV2.newBuilder().setWriter(sinkWriter).build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.newBuilder() - .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .setWriter(sinkWriter) + .build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSinkV2.DefaultSinkWriter sinkWriter = + new TestSinkV2.DefaultStatefulSinkWriter<>(); TestSinkV2.Builder builder = - TestSinkV2.newBuilder() - .setWriter(sinkWriter) + TestSinkV2.newBuilder() .setDefaultCommitter() - .setWithPostCommitTopology(true); + .setWithPostCommitTopology(true) + .setWriter(sinkWriter); if (withState) { builder.setWriterState(true); } if (stateName != null) { builder.setCompatibleStateNames(stateName); } - return new SinkAndSuppliers( - builder.build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSinkV2.StringSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter @@ -125,31 +104,30 @@ public void init(Sink.InitContext context) { } } - private static class SnapshottingBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; - boolean endOfInput = false; + static class InspectableSink extends AbstractInspectableSink> { + InspectableSink(TestSinkV2 sink) { + super(sink); + } + + @Override + public long getLastCheckpointId() { + return getSink().getWriter().lastCheckpointId; + } @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException { - this.endOfInput = endOfInput; + public List getRecordsOfCurrentCheckpoint() { + return getSink().getWriter().elements; } @Override - public List snapshotState(long checkpointId) throws IOException { - lastCheckpointId = checkpointId; - return super.snapshotState(checkpointId); + public List getWatermarks() { + return getSink().getWriter().watermarks; } @Override - public Collection prepareCommit() { - if (!endOfInput) { - return ImmutableList.of(); - } - List result = elements; - elements = new ArrayList<>(); - return result; + public int getRecordCountFromState() { + return ((TestSinkV2.DefaultStatefulSinkWriter) getSink().getWriter()) + .getRecordCount(); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index a9cd036269cde..9c855ca31cf46 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple3; @@ -30,67 +31,49 @@ import java.util.List; class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSinkV2.newBuilder().setWriter(sinkWriter).build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + return new InspectableSink(TestSinkV2.newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSinkV2.DefaultSinkWriter sinkWriter = + new TestSinkV2.DefaultStatefulSinkWriter<>(); TestSinkV2.Builder builder = - TestSinkV2.newBuilder() - .setWriter(sinkWriter) + TestSinkV2.newBuilder() .setDefaultCommitter() - .setWithPostCommitTopology(true); + .setWithPostCommitTopology(true) + .setWriter(sinkWriter); if (withState) { builder.setWriterState(true); } if (stateName != null) { builder.setCompatibleStateNames(stateName); } - return new SinkAndSuppliers( - builder.build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSinkV2.StringSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter @@ -147,4 +130,31 @@ public Collection prepareCommit() { return result; } } + + static class InspectableSink extends AbstractInspectableSink> { + InspectableSink(TestSinkV2 sink) { + super(sink); + } + + @Override + public long getLastCheckpointId() { + return getSink().getWriter().lastCheckpointId; + } + + @Override + public List getRecordsOfCurrentCheckpoint() { + return getSink().getWriter().elements; + } + + @Override + public List getWatermarks() { + return getSink().getWriter().watermarks; + } + + @Override + public int getRecordCountFromState() { + return ((TestSinkV2.DefaultStatefulSinkWriter) getSink().getWriter()) + .getRecordCount(); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index 2f497a2fd26ca..5452769eaf4b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -28,7 +28,6 @@ import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -66,8 +65,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.LongSupplier; -import java.util.function.Supplier; import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput; @@ -77,21 +74,21 @@ abstract class SinkWriterOperatorTestBase { @Test void testNotEmitCommittablesWithoutCommitter() throws Exception { - SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter(); + InspectableSink sink = sinkWithoutCommitter(); final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processElement(1, 1); assertThat(testHarness.getOutput()).isEmpty(); - assertThat(sinkAndSuppliers.elementSupplier.get()) + assertThat(sink.getRecordsOfCurrentCheckpoint()) .containsOnly("(1,1," + Long.MIN_VALUE + ")"); testHarness.prepareSnapshotPreBarrier(1); assertThat(testHarness.getOutput()).isEmpty(); // Elements are flushed - assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); testHarness.close(); } @@ -99,10 +96,10 @@ void testNotEmitCommittablesWithoutCommitter() throws Exception { void testWatermarkPropagatedToSinkWriter() throws Exception { final long initialTime = 0; - SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter(); + InspectableSink sink = sinkWithoutCommitter(); final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processWatermark(initialTime); @@ -110,7 +107,7 @@ void testWatermarkPropagatedToSinkWriter() throws Exception { assertThat(testHarness.getOutput()) .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - assertThat(sinkAndSuppliers.watermarkSupplier.get()) + assertThat(sink.getWatermarks()) .containsExactly( new org.apache.flink.api.common.eventtime.Watermark(initialTime), new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)); @@ -123,7 +120,7 @@ public void testTimeBasedBufferingSinkWriter() throws Exception { final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().sink)); + new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); testHarness.open(); @@ -150,7 +147,7 @@ public void testTimeBasedBufferingSinkWriter() throws Exception { void testEmitOnFlushWithCommitter() throws Exception { final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().sink)); + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); testHarness.open(); assertThat(testHarness.getOutput()).isEmpty(); @@ -168,7 +165,7 @@ void testEmitOnFlushWithCommitter() throws Exception { @Test void testEmitOnEndOfInputInBatchMode() throws Exception { final SinkWriterOperatorFactory writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().sink); + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); @@ -186,9 +183,10 @@ void testStateRestore(boolean stateful) throws Exception { final long initialTime = 0; - final SinkAndSuppliers sinkAndSuppliers = sinkWithSnapshottingWriter(stateful, null); + final InspectableSink sink = sinkWithState(stateful, null); + Sink sink2 = sink.getSink(); final OneInputStreamOperatorTestHarness> testHarness = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink); + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); testHarness.open(); @@ -199,39 +197,23 @@ void testStateRestore(boolean stateful) throws Exception { testHarness.prepareSnapshotPreBarrier(1L); OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); - // we see the watermark and the committable summary, so the committables must be stored in - // state - assertThat(testHarness.getOutput()).hasSize(2).contains(new Watermark(initialTime)); - assertThat(sinkAndSuppliers.lastCheckpointSupplier.getAsLong()) - .isEqualTo(stateful ? 1L : -1L); + assertThat(sink.getRecordCountFromState()).isEqualTo(2); + assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); testHarness.close(); - final SinkAndSuppliers restoredSink = sinkWithSnapshottingWriter(stateful, null); + final InspectableSink restoredSink = sinkWithState(stateful, null); final OneInputStreamOperatorTestHarness> - restoredTestHarness = createTestHarnessWithBufferingSinkWriter(restoredSink.sink); + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink())); restoredTestHarness.initializeState(snapshot); restoredTestHarness.open(); - // this will flush out the committables that were restored - restoredTestHarness.endInput(); - final long checkpointId = 2; - restoredTestHarness.prepareSnapshotPreBarrier(checkpointId); - restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId); - - if (stateful) { - assertBasicOutput(restoredTestHarness.getOutput(), 2, EOI); - } else { - assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue()) - .isInstanceOf(CommittableSummary.class) - .satisfies( - cs -> - SinkV2Assertions.assertThat((CommittableSummary) cs) - .hasOverallCommittables(0) - .hasPendingCommittables(0) - .hasFailedCommittables(0)); - } + // check that the previous state is correctly restored + assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); + restoredTestHarness.close(); } @@ -244,66 +226,46 @@ public void testLoadPreviousSinkState(boolean stateful) throws Exception { "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt"); - SinkAndSuppliers sinkAndSuppliers = - sinkWithSnapshottingWriter(stateful, DummySinkOperator.DUMMY_SINK_STATE_NAME); + InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + int expectedState = 5; final OneInputStreamOperatorTestHarness previousSink = new OneInputStreamOperatorTestHarness<>( - new DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()), + new CompatibleStateSinkOperator<>( + TestSinkV2.WRITER_SERIALIZER, expectedState), StringSerializer.INSTANCE); OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); - // 2. Load previous sink state and verify the output + // 2. Load previous sink state and verify state + Sink sink3 = sink.getSink(); final OneInputStreamOperatorTestHarness> compatibleWriterOperator = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink); - - final List expectedOutput1 = - stateful ? new ArrayList<>(previousSinkInputs) : new ArrayList<>(); - expectedOutput1.add(Tuple3.of(1, 1, Long.MIN_VALUE).toString()); + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink3)); // load the state from previous sink compatibleWriterOperator.initializeState(previousSinkState); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - compatibleWriterOperator.open(); - - compatibleWriterOperator.processElement(1, 1); - - // this will flush out the committables that were restored from previous sink - compatibleWriterOperator.endInput(); - compatibleWriterOperator.prepareSnapshotPreBarrier(1); - - OperatorSubtaskState operatorStateWithoutPreviousState = - compatibleWriterOperator.snapshot(1L, 1L); + // 3. do another snapshot and check if this also can be restored without compabitible state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); compatibleWriterOperator.close(); - assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput()); - - // 3. Restore the sink without previous sink's state - SinkAndSuppliers sinkAndSuppliers2 = - sinkWithSnapshottingWriter(stateful, DummySinkOperator.DUMMY_SINK_STATE_NAME); + // 4. Restore the sink without previous sink's state + InspectableSink sink2 = + sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); final OneInputStreamOperatorTestHarness> restoredSinkOperator = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink); - final List expectedOutput2 = - Arrays.asList( - Tuple3.of(2, 2, Long.MIN_VALUE).toString(), - Tuple3.of(3, 3, Long.MIN_VALUE).toString()); - - restoredSinkOperator.initializeState(operatorStateWithoutPreviousState); - - restoredSinkOperator.open(); + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink2.getSink())); - restoredSinkOperator.processElement(2, 2); - restoredSinkOperator.processElement(3, 3); + restoredSinkOperator.initializeState(snapshot); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - // this will flush out the committables that were restored - restoredSinkOperator.endInput(); - restoredSinkOperator.prepareSnapshotPreBarrier(2); - - assertEmitted(expectedOutput2, restoredSinkOperator.getOutput()); restoredSinkOperator.close(); } @@ -311,10 +273,10 @@ public void testLoadPreviousSinkState(boolean stateful) throws Exception { void testRestoreCommitterState() throws Exception { final List committables = Arrays.asList("state1", "state2"); - SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter(); + InspectableSink sink = sinkWithCommitter(); final OneInputStreamOperatorTestHarness committer = new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()), + new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), StringSerializer.INSTANCE); final OperatorSubtaskState committerState = @@ -322,7 +284,7 @@ void testRestoreCommitterState() throws Exception { final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.initializeState(committerState); @@ -362,16 +324,16 @@ void testRestoreCommitterState() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter(); + InspectableSink sink = sinkWithCommitter(); final OneInputStreamOperatorTestHarness> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processElement(1, 1); assertThat(testHarness.getOutput()).isEmpty(); final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly(record); + assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); testHarness.endInput(); @@ -380,7 +342,7 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex } assertEmitted(Collections.singletonList(record), testHarness.getOutput()); - assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); testHarness.close(); } @@ -556,13 +518,6 @@ private static void assertEmitted(List records, Queue output) { assertThat(committables).containsExactlyInAnyOrderElementsOf(records); } - private static OneInputStreamOperatorTestHarness> - createTestHarnessWithBufferingSinkWriter(Sink sink) throws Exception { - final SinkWriterOperatorFactory writerOperatorFactory = - new SinkWriterOperatorFactory<>(sink); - return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); - } - private static void assertBasicOutput( Collection queuedOutput, int numberOfCommittables, long checkpointId) { List output = fromOutput(queuedOutput); @@ -622,19 +577,22 @@ public void snapshotState(StateSnapshotContext context) throws Exception { } } - private static class DummySinkOperator extends AbstractStreamOperator + /** Writes state to test whether the sink can read from alternative state names. */ + private static class CompatibleStateSinkOperator extends AbstractStreamOperator implements OneInputStreamOperator { - static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state"; + static final String SINK_STATE_NAME = "compatible_sink_state"; static final ListStateDescriptor SINK_STATE_DESC = - new ListStateDescriptor<>( - DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); - ListState sinkState; - private final SimpleVersionedSerializer serializer; + new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); + ListState sinkState; + private final SimpleVersionedSerializer serializer; + private final T initialState; - public DummySinkOperator(SimpleVersionedSerializer serializer) { + public CompatibleStateSinkOperator( + SimpleVersionedSerializer serializer, T initialState) { this.serializer = serializer; + this.initialState = initialState; } public void initializeState(StateInitializationContext context) throws Exception { @@ -643,11 +601,14 @@ public void initializeState(StateInitializationContext context) throws Exception new SimpleVersionedListState<>( context.getOperatorStateStore().getListState(SINK_STATE_DESC), serializer); + if (!context.isRestored()) { + sinkState.add(initialState); + } } @Override - public void processElement(StreamRecord element) throws Exception { - sinkState.add(element.getValue()); + public void processElement(StreamRecord element) { + // do nothing } } @@ -672,32 +633,42 @@ public byte[] serialize(List obj) throws IOException { } } - abstract SinkAndSuppliers sinkWithoutCommitter(); + abstract InspectableSink sinkWithoutCommitter(); + + abstract InspectableSink sinkWithTimeBasedWriter(); + + abstract InspectableSink sinkWithState(boolean withState, String stateName); + + abstract InspectableSink sinkWithCommitter(); - abstract SinkAndSuppliers sinkWithTimeBasedWriter(); + /** + * Basic abstraction to access the different flavors of sinks. Remove once the older interfaces + * are removed. + */ + interface InspectableSink { + long getLastCheckpointId(); - abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName); + List getRecordsOfCurrentCheckpoint(); - abstract SinkAndSuppliers sinkWithCommitter(); + List getWatermarks(); - static class SinkAndSuppliers { - org.apache.flink.api.connector.sink2.Sink sink; - Supplier> elementSupplier; - Supplier> watermarkSupplier; - LongSupplier lastCheckpointSupplier; - Supplier> serializerSupplier; + int getRecordCountFromState(); - public SinkAndSuppliers( - org.apache.flink.api.connector.sink2.Sink sink, - Supplier> elementSupplier, - Supplier> watermarkSupplier, - LongSupplier lastCheckpointSupplier, - Supplier> serializerSupplier) { + Sink getSink(); + } + + abstract static class AbstractInspectableSink< + S extends org.apache.flink.api.connector.sink2.Sink> + implements InspectableSink { + private final S sink; + + protected AbstractInspectableSink(S sink) { this.sink = sink; - this.elementSupplier = elementSupplier; - this.watermarkSupplier = watermarkSupplier; - this.lastCheckpointSupplier = lastCheckpointSupplier; - this.serializerSupplier = serializerSupplier; + } + + @Override + public S getSink() { + return sink; } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java index 742e4438b5d36..d8a2db03f3086 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java @@ -25,7 +25,7 @@ import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.transformations.SinkV1Adapter; import javax.annotation.Nullable; @@ -56,13 +56,17 @@ * @deprecated Use {@link TestSinkV2} instead. */ @Deprecated -public class TestSink implements Sink { +public class TestSink implements Sink { + public static final SimpleVersionedSerializerAdapter COMMITTABLE_SERIALIZER = + TestSinkV2.COMMITTABLE_SERIALIZER; + public static final SimpleVersionedSerializerAdapter WRITER_SERIALIZER = + TestSinkV2.WRITER_SERIALIZER; public static final String END_OF_INPUT_STR = "end of input"; private final DefaultSinkWriter writer; - @Nullable private final SimpleVersionedSerializer writerStateSerializer; + @Nullable private final SimpleVersionedSerializer writerStateSerializer; @Nullable private final Committer committer; @@ -76,7 +80,7 @@ public class TestSink implements Sink { private TestSink( DefaultSinkWriter writer, - @Nullable SimpleVersionedSerializer writerStateSerializer, + @Nullable SimpleVersionedSerializer writerStateSerializer, @Nullable Committer committer, @Nullable SimpleVersionedSerializer committableSerializer, @Nullable GlobalCommitter globalCommitter, @@ -92,7 +96,7 @@ private TestSink( } @Override - public SinkWriter createWriter(InitContext context, List states) { + public SinkWriter createWriter(InitContext context, List states) { writer.init(context); writer.restoredFrom(states); writer.setProcessingTimerService(context.getProcessingTimeService()); @@ -120,7 +124,7 @@ public Optional> getGlobalCommittableSerialize } @Override - public Optional> getWriterStateSerializer() { + public Optional> getWriterStateSerializer() { return Optional.ofNullable(writerStateSerializer); } @@ -137,30 +141,30 @@ public org.apache.flink.api.connector.sink2.Sink asV2() { return SinkV1Adapter.wrap(this); } + public DefaultSinkWriter getWriter() { + return writer; + } + /** A builder class for {@link TestSink}. */ public static class Builder { - private DefaultSinkWriter writer = new DefaultSinkWriter(); + private DefaultSinkWriter writer = new DefaultSinkWriter<>(); - private SimpleVersionedSerializer writerStateSerializer; + private SimpleVersionedSerializer writerStateSerializer; private Committer committer; - private SimpleVersionedSerializer committableSerializer; - private GlobalCommitter globalCommitter; - private SimpleVersionedSerializer globalCommittableSerializer; - private Collection compatibleStateNames = Collections.emptyList(); public Builder setWriter(DefaultSinkWriter writer) { - this.writer = checkNotNull(writer); + this.writer = (DefaultSinkWriter) checkNotNull(writer); return (Builder) this; } public Builder withWriterState() { - this.writerStateSerializer = StringCommittableSerializer.INSTANCE; + this.writerStateSerializer = WRITER_SERIALIZER; return this; } @@ -169,39 +173,23 @@ public Builder setCommitter(Committer committer) { return this; } - public Builder setCommittableSerializer( - SimpleVersionedSerializer committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringCommittableSerializer.INSTANCE; return this; } public Builder setDefaultCommitter(Supplier> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringCommittableSerializer.INSTANCE; - return this; - } - - public Builder setGlobalCommittableSerializer( - SimpleVersionedSerializer globalCommittableSerializer) { - this.globalCommittableSerializer = globalCommittableSerializer; return this; } public Builder setDefaultGlobalCommitter() { this.globalCommitter = new DefaultGlobalCommitter(""); - this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE; return this; } public Builder setGlobalCommitter(Supplier> queueSupplier) { this.globalCommitter = new DefaultGlobalCommitter(queueSupplier); - this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE; return this; } @@ -219,9 +207,9 @@ public TestSink build() { writer, writerStateSerializer, committer, - committableSerializer, + committer == null && globalCommitter == null ? null : COMMITTABLE_SERIALIZER, globalCommitter, - globalCommittableSerializer, + globalCommitter == null ? null : COMMITTABLE_SERIALIZER, compatibleStateNames); } } @@ -230,7 +218,7 @@ public TestSink build() { /** Base class for out testing {@link SinkWriter Writers}. */ public static class DefaultSinkWriter - implements SinkWriter, Serializable { + implements SinkWriter, Serializable { protected List elements; @@ -238,6 +226,10 @@ public static class DefaultSinkWriter protected ProcessingTimeService processingTimerService; + private int recordCount; + + protected long lastCheckpointId = -1; + protected DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -247,6 +239,7 @@ protected DefaultSinkWriter() { public void write(T element, Context context) { elements.add( Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + recordCount++; } @Override @@ -262,20 +255,27 @@ public List prepareCommit(boolean flush) { } @Override - public List snapshotState(long checkpointId) throws IOException { - return Collections.emptyList(); + public List snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); } @Override public void close() throws Exception {} - void restoredFrom(List states) {} + void restoredFrom(List states) { + recordCount = states.isEmpty() ? 0 : states.get(0); + } void setProcessingTimerService(ProcessingTimeService processingTimerService) { this.processingTimerService = processingTimerService; } public void init(InitContext context) {} + + public int getRecordCount() { + return recordCount; + } } // -------------------------------------- Sink Committer --------------------------------------- @@ -393,30 +393,4 @@ public void endOfInput() { commit(Collections.singletonList(END_OF_INPUT_STR)); } } - - /** - * We introduce this {@link StringCommittableSerializer} is because that all the fields of - * {@link TestSink} should be serializable. - */ - public static class StringCommittableSerializer - implements SimpleVersionedSerializer, Serializable { - - public static final StringCommittableSerializer INSTANCE = - new StringCommittableSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index 2dc5cd73890c3..7b91247f54b03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -30,6 +32,7 @@ import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; @@ -37,7 +40,6 @@ import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; @@ -61,6 +63,10 @@ /** A {@link Sink} for all the sink related tests. */ public class TestSinkV2 implements Sink { + public static final SimpleVersionedSerializerAdapter COMMITTABLE_SERIALIZER = + new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); + public static final SimpleVersionedSerializerAdapter WRITER_SERIALIZER = + new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE); private final DefaultSinkWriter writer; @@ -81,11 +87,18 @@ public static Builder newBuilder() { return new Builder<>(); } + public static Builder newBuilder(DefaultSinkWriter writer) { + return new Builder().setWriter(writer); + } + + public SupportsCommitter asSupportsCommitter() { + throw new UnsupportedOperationException("No committter"); + } + /** A builder class for {@link TestSinkV2}. */ public static class Builder { private DefaultSinkWriter writer = null; private DefaultCommitter committer; - private SimpleVersionedSerializer committableSerializer; private boolean withPostCommitTopology = false; private boolean withPreCommitTopology = false; private boolean withWriterState = false; @@ -101,22 +114,14 @@ public Builder setCommitter(DefaultCommitter committer) { return this; } - public Builder setCommittableSerializer( - SimpleVersionedSerializer committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringSerializer.INSTANCE; return this; } public Builder setDefaultCommitter( Supplier>> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringSerializer.INSTANCE; return this; } @@ -155,7 +160,7 @@ public TestSinkV2 build() { if (!withPreCommitTopology) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, committableSerializer, committer); + writer, COMMITTABLE_SERIALIZER, committer); } else { // TwoPhaseCommittingSink with a stateless writer, pre commit topology, // committer @@ -163,9 +168,7 @@ public TestSinkV2 build() { writer instanceof DefaultCommittingSinkWriter, "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPreCommitTopology<>( - (DefaultCommittingSinkWriter) writer, - committableSerializer, - committer); + writer, COMMITTABLE_SERIALIZER, committer); } } else { if (withWriterState) { @@ -174,9 +177,9 @@ public TestSinkV2 build() { Preconditions.checkArgument( writer instanceof DefaultStatefulSinkWriter, "Please provide a DefaultStatefulSinkWriter instance"); - return new TestStatefulSinkV2( - (DefaultStatefulSinkWriter) writer, - committableSerializer, + return new TestStatefulSinkV2<>( + (DefaultStatefulSinkWriter) writer, + COMMITTABLE_SERIALIZER, committer, compatibleStateNames); } else { @@ -186,9 +189,7 @@ public TestSinkV2 build() { writer instanceof DefaultCommittingSinkWriter, "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( - (DefaultCommittingSinkWriter) writer, - committableSerializer, - committer); + writer, COMMITTABLE_SERIALIZER, committer); } } } @@ -215,6 +216,11 @@ public Committer createCommitter(CommitterInitContext context) { return committer; } + @Override + public SupportsCommitter asSupportsCommitter() { + return this; + } + @Override public SimpleVersionedSerializer getCommittableSerializer() { return committableSerializer; @@ -268,19 +274,19 @@ public DataStream> addPreCommitTopology( return withLineage.map(old -> old + "Transformed"); } }) - .returns(CommittableMessageTypeInfo.of(StringSerializer::new)); + .returns(CommittableMessageTypeInfo.of(() -> COMMITTABLE_SERIALIZER)); } @Override public SimpleVersionedSerializer getWriteResultSerializer() { - return new StringSerializer(); + return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); } } private static class TestStatefulSinkV2 extends TestSinkV2WithPostCommitTopology - implements SupportsWriterState, + implements SupportsWriterState, SupportsWriterState.WithCompatibleState { - private String compatibleState; + private final String compatibleState; public TestStatefulSinkV2( DefaultStatefulSinkWriter writer, @@ -297,8 +303,8 @@ public DefaultStatefulSinkWriter createWriter(InitContext context) { } @Override - public StatefulSinkWriter restoreWriter( - WriterInitContext context, Collection recoveredState) { + public StatefulSinkWriter restoreWriter( + WriterInitContext context, Collection recoveredState) { DefaultStatefulSinkWriter statefulWriter = (DefaultStatefulSinkWriter) getWriter(); @@ -307,8 +313,8 @@ public StatefulSinkWriter restoreWriter( } @Override - public SimpleVersionedSerializer getWriterStateSerializer() { - return new StringSerializer(); + public SimpleVersionedSerializer getWriterStateSerializer() { + return WRITER_SERIALIZER; } @Override @@ -326,6 +332,8 @@ public static class DefaultSinkWriter implements SinkWriter, Ser protected List watermarks; + public long lastCheckpointId = -1; + protected DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -380,15 +388,27 @@ public Collection prepareCommit() { */ protected static class DefaultStatefulSinkWriter extends DefaultCommittingSinkWriter - implements StatefulSinkWriter { + implements StatefulSinkWriter { + private int recordCount = 0; @Override - public List snapshotState(long checkpointId) throws IOException { - return elements; + public void write(InputT element, Context context) { + super.write(element, context); + recordCount++; + } + + public int getRecordCount() { + return recordCount; } - protected void restore(Collection recoveredState) { - this.elements = new ArrayList<>(recoveredState); + @Override + public List snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); + } + + protected void restore(Collection recoveredState) { + this.recordCount = recoveredState.isEmpty() ? 0 : recoveredState.iterator().next(); } } @@ -464,29 +484,4 @@ public void commit(Collection> committables) { }); } } - - /** - * We introduce this {@link StringSerializer} is because that all the fields of {@link - * TestSinkV2} should be serializable. - */ - public static class StringSerializer - implements SimpleVersionedSerializer, Serializable { - - public static final StringSerializer INSTANCE = new StringSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java index 6d4c3586f38c1..f817a6810afcb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java @@ -33,8 +33,6 @@ SinkAndCounters sinkWithPostCommit() { TestSink.newBuilder() .setCommitter(committer) .setDefaultGlobalCommitter() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .build() .asV2(), () -> committer.successfulCommits); @@ -47,8 +45,6 @@ SinkAndCounters sinkWithPostCommitWithRetry() { TestSink.newBuilder() .setCommitter(new TestSink.RetryOnceCommitter()) .setDefaultGlobalCommitter() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .build() .asV2(), () -> 0); @@ -59,12 +55,7 @@ SinkAndCounters sinkWithoutPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( (SupportsCommitter) - TestSink.newBuilder() - .setCommitter(committer) - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .build() - .asV2(), + TestSink.newBuilder().setCommitter(committer).build().asV2(), () -> committer.successfulCommits); } @@ -78,6 +69,6 @@ public List commit(List committables) { } @Override - public void close() throws Exception {} + public void close() {} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java index 5af5ac5a679a4..4e320816aea7f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java @@ -18,65 +18,46 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; -import java.util.Collections; import java.util.List; class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink( + TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSink.DefaultSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink( + TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter<>(); TestSink.Builder builder = TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter(); if (withState) { builder.withWriterState(); + if (stateName != null) { + builder.setCompatibleStateNames(stateName); + } } - if (stateName != null) { - builder.setCompatibleStateNames(stateName); - } - return new SinkAndSuppliers( - builder.build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter extends TestSink.DefaultSinkWriter @@ -103,30 +84,33 @@ public void onProcessingTime(long time) { } } - private static class SnapshottingBufferingSinkWriter - extends TestSink.DefaultSinkWriter { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; + class InspectableSink + extends AbstractInspectableSink> { + private final TestSink sink; + + InspectableSink(TestSink sink) { + super(sink.asV2()); + this.sink = sink; + } @Override - public List snapshotState(long checkpointId) { - lastCheckpointId = checkpointId; - return elements; + public long getLastCheckpointId() { + return sink.getWriter().lastCheckpointId; } @Override - void restoredFrom(List states) { - this.elements = new ArrayList<>(states); + public List getRecordsOfCurrentCheckpoint() { + return sink.getWriter().elements; } @Override - public List prepareCommit(boolean flush) { - if (!flush) { - return Collections.emptyList(); - } - List result = elements; - elements = new ArrayList<>(); - return result; + public List getWatermarks() { + return sink.getWriter().watermarks; + } + + @Override + public int getRecordCountFromState() { + return sink.getWriter().getRecordCount(); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java index 7e84326cae192..3374e6321ac93 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java @@ -26,10 +26,10 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; @@ -60,6 +60,10 @@ @Deprecated public class TestSinkV2 implements Sink { + public static final SimpleVersionedSerializerAdapter COMMITTABLE_SERIALIZER = + org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.COMMITTABLE_SERIALIZER; + public static final SimpleVersionedSerializerAdapter WRITER_SERIALIZER = + org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.WRITER_SERIALIZER; private final DefaultSinkWriter writer; private TestSinkV2(DefaultSinkWriter writer) { @@ -71,7 +75,7 @@ public SinkWriter createWriter(InitContext context) { return writer; } - DefaultSinkWriter getWriter() { + public DefaultSinkWriter getWriter() { return writer; } @@ -83,7 +87,6 @@ public static Builder newBuilder() { public static class Builder { private DefaultSinkWriter writer = null; private DefaultCommitter committer; - private SimpleVersionedSerializer committableSerializer; private boolean withPostCommitTopology = false; private boolean withWriterState = false; private String compatibleStateNames; @@ -98,22 +101,14 @@ public Builder setCommitter(DefaultCommitter committer) { return this; } - public Builder setCommittableSerializer( - SimpleVersionedSerializer committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringSerializer.INSTANCE; return this; } public Builder setDefaultCommitter( Supplier>> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringSerializer.INSTANCE; return this; } @@ -146,7 +141,7 @@ public TestSinkV2 build() { if (!withPostCommitTopology) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, committableSerializer, committer); + writer, COMMITTABLE_SERIALIZER, committer); } else { if (withWriterState) { // TwoPhaseCommittingSink with a stateful writer and a committer and post @@ -156,7 +151,7 @@ public TestSinkV2 build() { "Please provide a DefaultStatefulSinkWriter instance"); return new TestStatefulSinkV2( (DefaultStatefulSinkWriter) writer, - committableSerializer, + COMMITTABLE_SERIALIZER, committer, compatibleStateNames); } else { @@ -167,7 +162,7 @@ public TestSinkV2 build() { "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( (DefaultCommittingSinkWriter) writer, - committableSerializer, + COMMITTABLE_SERIALIZER, committer); } } @@ -225,7 +220,7 @@ public void addPostCommitTopology(DataStream> committ } private static class TestStatefulSinkV2 extends TestSinkV2WithPostCommitTopology - implements StatefulSink, StatefulSink.WithCompatibleState { + implements StatefulSink, StatefulSink.WithCompatibleState { private String compatibleState; public TestStatefulSinkV2( @@ -243,8 +238,8 @@ public DefaultStatefulSinkWriter createWriter(InitContext context) { } @Override - public StatefulSinkWriter restoreWriter( - InitContext context, Collection recoveredState) { + public StatefulSinkWriter restoreWriter( + InitContext context, Collection recoveredState) { DefaultStatefulSinkWriter statefulWriter = (DefaultStatefulSinkWriter) getWriter(); @@ -253,8 +248,8 @@ public StatefulSinkWriter restoreWriter( } @Override - public SimpleVersionedSerializer getWriterStateSerializer() { - return new StringSerializer(); + public SimpleVersionedSerializer getWriterStateSerializer() { + return WRITER_SERIALIZER; } @Override @@ -272,6 +267,8 @@ public static class DefaultSinkWriter implements SinkWriter, Ser public List watermarks; + public long lastCheckpointId = -1; + public DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -327,15 +324,27 @@ public Collection prepareCommit() { */ public static class DefaultStatefulSinkWriter extends DefaultCommittingSinkWriter - implements StatefulSink.StatefulSinkWriter { + implements StatefulSink.StatefulSinkWriter { + private int recordCount; @Override - public List snapshotState(long checkpointId) throws IOException { - return elements; + public List snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); } - protected void restore(Collection recoveredState) { - this.elements = new ArrayList<>(recoveredState); + @Override + public void write(InputT element, Context context) { + super.write(element, context); + recordCount++; + } + + public int getRecordCount() { + return recordCount; + } + + protected void restore(Collection recoveredState) { + this.recordCount = recoveredState.isEmpty() ? 0 : recoveredState.iterator().next(); } } @@ -411,29 +420,4 @@ public void commit(Collection> committables) { }); } } - - /** - * We introduce this {@link StringSerializer} is because that all the fields of {@link - * TestSinkV2} should be serializable. - */ - public static class StringSerializer - implements SimpleVersionedSerializer, Serializable { - - public static final StringSerializer INSTANCE = new StringSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java index 28e6e068b9f7b..0b56d26e19fdb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -491,10 +491,7 @@ private static void addElement(SharedReference> elements, T element) private static TestSink buildRecordWriterTestSink( TestSink.DefaultSinkWriter writer) { - return TestSink.newBuilder() - .setWriter(writer) - .setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE) - .build(); + return TestSink.newBuilder().setWriter(writer).build(); } private TableFactoryHarness.SinkBase buildRuntimeSinkProvider( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index 2eaa88b045c17..b863046863ae4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -116,6 +116,7 @@ public void init() { @Test public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); + final FiniteTestSource source = new FiniteTestSource<>(BOTH_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); @@ -174,6 +175,7 @@ public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exce @Test public void writerAndCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); + final FiniteTestSource source = new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); @@ -207,14 +209,13 @@ public void writerAndCommitterExecuteInBatchMode() throws Exception { @Test public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception { final StreamExecutionEnvironment env = buildStreamEnv(); + final FiniteTestSource source = new FiniteTestSource<>(GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA); env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO) .sinkTo( TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .setGlobalCommitter( (Supplier> & Serializable) () -> GLOBAL_COMMIT_QUEUE) @@ -241,8 +242,6 @@ public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception { env.fromData(SOURCE_DATA) .sinkTo( TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .setGlobalCommitter( (Supplier> & Serializable) () -> GLOBAL_COMMIT_QUEUE) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java index e46822b745c17..545446b0c13c9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java @@ -147,7 +147,6 @@ public void testCommitterMetrics() throws Exception { .sinkTo( TestSinkV2.newBuilder() .setCommitter(new MetricCommitter(beforeLatch, afterLatch)) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .build()) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java index 402058226b275..1eb845e94a7a3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java @@ -139,9 +139,8 @@ public void testCommitterMetrics() throws Exception { env.fromSequence(0, numCommittables - 1) .returns(BasicTypeInfo.LONG_TYPE_INFO) .sinkTo( - TestSinkV2.newBuilder() - .setCommitter(new MetricCommitter(beforeLatch, afterLatch)) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) + (TestSinkV2.newBuilder() + .setCommitter(new MetricCommitter(beforeLatch, afterLatch))) .build()) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync();