From 41e592e5f9cc508424ad3dce75c51617ae621d9b Mon Sep 17 00:00:00 2001 From: Michael Koepf <47541996+michaelkoepf@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:33:06 +0100 Subject: [PATCH 1/2] [flink] Upgraded sink connector to new API version - Upgraded sink connector from deprecated RichSinkFunction to new Sink interface - Adapted corresponding unit test cases Issue #132 --- .../fluss/connector/flink/sink/FlinkSink.java | 115 ++++++++++++++++++ .../connector/flink/sink/FlinkTableSink.java | 16 ++- .../AppendSinkWriter.java} | 26 ++-- .../FlinkSinkWriter.java} | 47 ++----- .../UpsertSinkWriter.java} | 17 ++- ...tionTest.java => FlinkSinkWriterTest.java} | 115 +++++++++++++++--- 6 files changed, 256 insertions(+), 80 deletions(-) create mode 100644 fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java rename fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/{AppendSinkFunction.java => writer/AppendSinkWriter.java} (76%) rename fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/{FlinkSinkFunction.java => writer/FlinkSinkWriter.java} (84%) rename fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/{UpsertSinkFunction.java => writer/UpsertSinkWriter.java} (85%) rename fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/{FlinkSinkFunctionTest.java => FlinkSinkWriterTest.java} (53%) diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java new file mode 100644 index 00000000..3c6b6e05 --- /dev/null +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.connector.flink.sink; + +import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter; +import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter; +import com.alibaba.fluss.connector.flink.sink.writer.UpsertSinkWriter; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; + +/** Flink sink for Fluss. */ +public class FlinkSink implements Sink { + + private static final long serialVersionUID = 1L; + + private final SinkWriterBuilder builder; + + public FlinkSink(SinkWriterBuilder builder) { + this.builder = builder; + } + + @Deprecated + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + throw new UnsupportedOperationException( + "Not supported. Use FlinkSink#createWriter(WriterInitContext context)"); + } + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + FlinkSinkWriter flinkSinkWriter = builder.createWriter(); + flinkSinkWriter.initialize(context); + return flinkSinkWriter; + } + + @Internal + interface SinkWriterBuilder extends Serializable { + W createWriter(); + } + + @Internal + static class AppendSinkSinkWriterBuilder implements SinkWriterBuilder { + + private static final long serialVersionUID = 1L; + + private final TablePath tablePath; + private final Configuration flussConfig; + private final RowType tableRowType; + + public AppendSinkSinkWriterBuilder( + TablePath tablePath, Configuration flussConfig, RowType tableRowType) { + this.tablePath = tablePath; + this.flussConfig = flussConfig; + this.tableRowType = tableRowType; + } + + @Override + public AppendSinkWriter createWriter() { + return new AppendSinkWriter(tablePath, flussConfig, tableRowType); + } + } + + @Internal + static class UpsertSinkWriterBuilder implements SinkWriterBuilder { + + private static final long serialVersionUID = 1L; + + private final TablePath tablePath; + private final Configuration flussConfig; + private final RowType tableRowType; + private final @Nullable int[] targetColumnIndexes; + + UpsertSinkWriterBuilder( + TablePath tablePath, + Configuration flussConfig, + RowType tableRowType, + @Nullable int[] targetColumnIndexes) { + this.tablePath = tablePath; + this.flussConfig = flussConfig; + this.tableRowType = tableRowType; + this.targetColumnIndexes = targetColumnIndexes; + } + + @Override + public UpsertSinkWriter createWriter() { + return new UpsertSinkWriter(tablePath, flussConfig, tableRowType, targetColumnIndexes); + } + } +} diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index 5699aef9..496931df 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.connector.flink.sink; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter; import com.alibaba.fluss.connector.flink.utils.PushdownUtils; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion; @@ -28,7 +29,7 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.RowLevelModificationScanContext; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; @@ -144,13 +145,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } } - FlinkSinkFunction sinkFunction = - primaryKeyIndexes.length > 0 - ? new UpsertSinkFunction( + FlinkSink.SinkWriterBuilder flinkSinkWriterBuilder = + (primaryKeyIndexes.length > 0) + ? new FlinkSink.UpsertSinkWriterBuilder( tablePath, flussConfig, tableRowType, targetColumnIndexes) - : new AppendSinkFunction(tablePath, flussConfig, tableRowType); + : new FlinkSink.AppendSinkSinkWriterBuilder( + tablePath, flussConfig, tableRowType); - return SinkFunctionProvider.of(sinkFunction); + FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder); + + return SinkV2Provider.of(flinkSink); } private List columns(int[] columnIndexes) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/AppendSinkWriter.java similarity index 76% rename from fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java rename to fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/AppendSinkWriter.java index 5b96fb85..04233870 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/AppendSinkWriter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.fluss.connector.flink.sink; +package com.alibaba.fluss.connector.flink.sink.writer; import com.alibaba.fluss.client.table.writer.AppendWriter; import com.alibaba.fluss.config.Configuration; @@ -22,26 +22,25 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import java.io.IOException; import java.util.concurrent.CompletableFuture; -/** An append only sink for fluss log table. */ -class AppendSinkFunction extends FlinkSinkFunction { - - private static final long serialVersionUID = 1L; +/** An append only sink writer for fluss log table. */ +public class AppendSinkWriter extends FlinkSinkWriter { private transient AppendWriter appendWriter; - AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { + public AppendSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { super(tablePath, flussConfig, tableRowType); } @Override - public void open(org.apache.flink.configuration.Configuration config) { - super.open(config); + public void initialize(WriterInitContext context) { + super.initialize(context); appendWriter = table.getAppendWriter(); LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName()); } @@ -51,19 +50,14 @@ CompletableFuture writeRow(RowKind rowKind, InternalRow internalRow) { return appendWriter.append(internalRow); } - @Override - void flush() throws IOException { - appendWriter.flush(); - checkAsyncException(); - } - @Override FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() { return FlinkRowToFlussRowConverter.create(tableRowType); } @Override - public void close() throws Exception { - super.close(); + public void flush(boolean endOfInput) throws IOException { + appendWriter.flush(); + checkAsyncException(); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/FlinkSinkWriter.java similarity index 84% rename from fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java rename to fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/FlinkSinkWriter.java index 171870cc..18354284 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/FlinkSinkWriter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.fluss.connector.flink.sink; +package com.alibaba.fluss.connector.flink.sink.writer; import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; @@ -30,14 +30,11 @@ import com.alibaba.fluss.metrics.MetricNames; import com.alibaba.fluss.row.InternalRow; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -48,16 +45,13 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.io.Serializable; import java.util.Collections; import java.util.concurrent.CompletableFuture; -/** Flink's {@link SinkFunction} implementation for Fluss. */ -abstract class FlinkSinkFunction extends RichSinkFunction - implements CheckpointedFunction, Serializable { +/** Base class for Flink {@link SinkWriter} implementations in Fluss. */ +public abstract class FlinkSinkWriter implements SinkWriter { - private static final long serialVersionUID = 1L; - protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkFunction.class); + protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkWriter.class); private final TablePath tablePath; private final Configuration flussConfig; @@ -75,11 +69,11 @@ abstract class FlinkSinkFunction extends RichSinkFunction private transient Counter numRecordsOutErrorsCounter; private volatile Throwable asyncWriterException; - public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { + public FlinkSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { this(tablePath, flussConfig, tableRowType, null); } - public FlinkSinkFunction( + public FlinkSinkWriter( TablePath tablePath, Configuration flussConfig, RowType tableRowType, @@ -90,14 +84,13 @@ public FlinkSinkFunction( this.tableRowType = tableRowType; } - @Override - public void open(org.apache.flink.configuration.Configuration config) { + public void initialize(WriterInitContext context) { LOG.info( "Opening Fluss {}, database: {} and table: {}", this.getClass().getSimpleName(), tablePath.getDatabaseName(), tablePath.getTableName()); - metricGroup = InternalSinkWriterMetricGroup.wrap(getRuntimeContext().getMetricGroup()); + metricGroup = InternalSinkWriterMetricGroup.wrap(context.metricGroup()); flinkMetricRegistry = new FlinkMetricRegistry( metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS)); @@ -115,7 +108,7 @@ protected void initMetrics() { } @Override - public void invoke(RowData value, SinkFunction.Context context) throws IOException { + public void write(RowData value, Context context) throws IOException, InterruptedException { checkAsyncException(); InternalRow internalRow = dataConverter.toInternalRow(value); CompletableFuture writeFuture = writeRow(value.getRowKind(), internalRow); @@ -131,19 +124,7 @@ public void invoke(RowData value, SinkFunction.Context context) throws IOExcepti } @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws IOException { - flush(); - } - - @Override - public void initializeState(FunctionInitializationContext functionInitializationContext) {} - - @Override - public void finish() throws IOException { - flush(); - } - - abstract void flush() throws IOException; + public abstract void flush(boolean endOfInput) throws IOException, InterruptedException; abstract FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter(); @@ -151,8 +132,6 @@ public void finish() throws IOException { @Override public void close() throws Exception { - super.close(); - try { if (table != null) { table.close(); @@ -193,7 +172,7 @@ public void close() throws Exception { private void sanityCheck(TableDescriptor flussTableDescriptor) { // when it's UpsertSinkFunction, it means it has primary key got from Flink's metadata - boolean hasPrimaryKey = this instanceof UpsertSinkFunction; + boolean hasPrimaryKey = this instanceof UpsertSinkWriter; if (flussTableDescriptor.hasPrimaryKey() != hasPrimaryKey) { throw new ValidationException( String.format( diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/UpsertSinkWriter.java similarity index 85% rename from fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java rename to fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/UpsertSinkWriter.java index 6f878af6..bb4fce52 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/UpsertSinkWriter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.fluss.connector.flink.sink; +package com.alibaba.fluss.connector.flink.sink.writer; import com.alibaba.fluss.client.table.writer.UpsertWrite; import com.alibaba.fluss.client.table.writer.UpsertWriter; @@ -23,6 +23,7 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -31,14 +32,12 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; -/** A upsert sink for fluss primary key table. */ -class UpsertSinkFunction extends FlinkSinkFunction { - - private static final long serialVersionUID = 1L; +/** An upsert sink writer or fluss primary key table. */ +public class UpsertSinkWriter extends FlinkSinkWriter { private transient UpsertWriter upsertWriter; - UpsertSinkFunction( + public UpsertSinkWriter( TablePath tablePath, Configuration flussConfig, RowType tableRowType, @@ -47,8 +46,8 @@ class UpsertSinkFunction extends FlinkSinkFunction { } @Override - public void open(org.apache.flink.configuration.Configuration config) { - super.open(config); + public void initialize(WriterInitContext context) { + super.initialize(context); UpsertWrite upsertOptions = new UpsertWrite(); if (targetColumnIndexes != null) { upsertOptions = upsertOptions.withPartialUpdate(targetColumnIndexes); @@ -75,7 +74,7 @@ FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() { } @Override - void flush() throws IOException { + public void flush(boolean endOfInput) throws IOException, InterruptedException { upsertWriter.flush(); checkAsyncException(); } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkWriterTest.java similarity index 53% rename from fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java rename to fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkWriterTest.java index 09428a7d..f5bf513f 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkWriterTest.java @@ -18,32 +18,46 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter; +import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter; import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.UserCodeClassLoader; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; +import java.util.OptionalLong; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; -/** Test for {@link FlinkSinkFunction}. */ -public class FlinkSinkFunctionTest extends FlinkTestBase { +/** Test for {@link com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter}. */ +public class FlinkSinkWriterTest extends FlinkTestBase { @ParameterizedTest @ValueSource(strings = {"", "1"}) @@ -61,13 +75,15 @@ void testSinkMetrics(String clientId) throws Exception { createTable(tablePath, tableDescriptor); Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); flussConf.set(ConfigOptions.CLIENT_ID, clientId); + RowType rowType = new RowType( true, Arrays.asList( new RowType.RowField("id", DataTypes.INT().getLogicalType()), new RowType.RowField("name", DataTypes.STRING().getLogicalType()))); - FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType); + FlinkSinkWriter flinkSinkFunction = new AppendSinkWriter(tablePath, flussConf, rowType); + InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup(); MockStreamingRuntimeContext mockStreamingRuntimeContext = @@ -77,11 +93,13 @@ public OperatorMetricGroup getMetricGroup() { return interceptingOperatorMetricGroup; } }; - flinkSinkFunction.setRuntimeContext(mockStreamingRuntimeContext); - flinkSinkFunction.open(new org.apache.flink.configuration.Configuration()); - flinkSinkFunction.invoke( - GenericRowData.of(1, StringData.fromString("a")), new MockSinkContext()); - flinkSinkFunction.flush(); + MockWriterInitContext mockWriterInitContext = + new MockWriterInitContext(mockStreamingRuntimeContext); + + flinkSinkFunction.initialize(mockWriterInitContext); + flinkSinkFunction.write( + GenericRowData.of(1, StringData.fromString("a")), new MockSinkWriterContext()); + flinkSinkFunction.flush(false); Metric currentSendTime = interceptingOperatorMetricGroup.get(MetricNames.CURRENT_SEND_TIME); assertThat(currentSendTime).isInstanceOf(Gauge.class); @@ -94,12 +112,7 @@ public OperatorMetricGroup getMetricGroup() { flinkSinkFunction.close(); } - static class MockSinkContext implements SinkFunction.Context { - @Override - public long currentProcessingTime() { - return 0; - } - + static class MockSinkWriterContext implements SinkWriter.Context { @Override public long currentWatermark() { return 0; @@ -110,4 +123,76 @@ public Long timestamp() { return 0L; } } + + static class MockWriterInitContext implements WriterInitContext { + + private static final String UNEXPECTED_METHOD_CALL_MESSAGE = + "Unexpected method call. Expected that this method will never be called by a test case."; + + private final MockStreamingRuntimeContext mockStreamingRuntimeContext; + + public MockWriterInitContext(MockStreamingRuntimeContext mockStreamingRuntimeContext) { + this.mockStreamingRuntimeContext = mockStreamingRuntimeContext; + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public MailboxExecutor getMailboxExecutor() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public ProcessingTimeService getProcessingTimeService() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public SinkWriterMetricGroup metricGroup() { + return InternalSinkWriterMetricGroup.wrap(mockStreamingRuntimeContext.getMetricGroup()); + } + + @Override + public SerializationSchema.InitializationContext + asSerializationSchemaInitializationContext() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public boolean isObjectReuseEnabled() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return false; + } + + @Override + public TypeSerializer createInputSerializer() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return OptionalLong.empty(); + } + + @Override + public JobInfo getJobInfo() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + + @Override + public TaskInfo getTaskInfo() { + fail(UNEXPECTED_METHOD_CALL_MESSAGE); + return null; + } + } } From 6ea1b85b587c176b48750e9b9a4917d6f4cea340 Mon Sep 17 00:00:00 2001 From: Michael Koepf <47541996+michaelkoepf@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:40:27 +0100 Subject: [PATCH 2/2] [flink] Upgraded sink connector to new API version - Adapted access modifiers - Consistent naming Issue #132 --- .../com/alibaba/fluss/connector/flink/sink/FlinkSink.java | 8 ++++---- .../fluss/connector/flink/sink/FlinkTableSink.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java index 3c6b6e05..d795c0a1 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java @@ -35,13 +35,13 @@ import java.io.Serializable; /** Flink sink for Fluss. */ -public class FlinkSink implements Sink { +class FlinkSink implements Sink { private static final long serialVersionUID = 1L; private final SinkWriterBuilder builder; - public FlinkSink(SinkWriterBuilder builder) { + FlinkSink(SinkWriterBuilder builder) { this.builder = builder; } @@ -65,7 +65,7 @@ interface SinkWriterBuilder extends Serializable { } @Internal - static class AppendSinkSinkWriterBuilder implements SinkWriterBuilder { + static class AppendSinkWriterBuilder implements SinkWriterBuilder { private static final long serialVersionUID = 1L; @@ -73,7 +73,7 @@ static class AppendSinkSinkWriterBuilder implements SinkWriterBuilder 0) ? new FlinkSink.UpsertSinkWriterBuilder( tablePath, flussConfig, tableRowType, targetColumnIndexes) - : new FlinkSink.AppendSinkSinkWriterBuilder( + : new FlinkSink.AppendSinkWriterBuilder( tablePath, flussConfig, tableRowType); FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);