From cfdc6d604f7a87f76d3522bea835c620d3925c8f Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 18 Nov 2024 11:37:02 +0800 Subject: [PATCH] [flink] Replace legacy SinkFunction with v2 Sink --- .../flink/service/QueryAddressRegister.java | 86 +++++++++++-------- .../paimon/flink/service/QueryService.java | 2 +- 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java index a650b5c0a6bed..4ae9436cec669 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java @@ -23,10 +23,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import java.net.InetSocketAddress; import java.util.TreeMap; @@ -34,55 +33,66 @@ import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; /** Operator for address server to register addresses to {@link ServiceManager}. */ -public class QueryAddressRegister extends RichSinkFunction { - +public class QueryAddressRegister implements Sink { private final ServiceManager serviceManager; - private transient int numberExecutors; - private transient TreeMap executors; - public QueryAddressRegister(Table table) { this.serviceManager = ((FileStoreTable) table).store().newServiceManager(); } - @Override - public void open(OpenContext openContext) throws Exception { - open(new Configuration()); - } - /** - * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. */ - public void open(Configuration parameters) throws Exception { - this.executors = new TreeMap<>(); + public SinkWriter createWriter(InitContext context) { + return new QueryAddressRegisterSinkWriter(serviceManager); } @Override - public void invoke(InternalRow row, SinkFunction.Context context) { - int numberExecutors = row.getInt(0); - if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) { - throw new IllegalArgumentException( - String.format( - "Number Executors can not be changed! Old %s , New %s .", - this.numberExecutors, numberExecutors)); - } - this.numberExecutors = numberExecutors; + public SinkWriter createWriter(WriterInitContext context) { + return new QueryAddressRegisterSinkWriter(serviceManager); + } - int executorId = row.getInt(1); - String hostname = row.getString(2).toString(); - int port = row.getInt(3); + private static class QueryAddressRegisterSinkWriter implements SinkWriter { + private final ServiceManager serviceManager; - executors.put(executorId, new InetSocketAddress(hostname, port)); + private final TreeMap executors; - if (executors.size() == numberExecutors) { - serviceManager.resetService( - PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); + private int numberExecutors; + + private QueryAddressRegisterSinkWriter(ServiceManager serviceManager) { + this.serviceManager = serviceManager; + this.executors = new TreeMap<>(); } - } - @Override - public void close() throws Exception { - super.close(); - serviceManager.deleteService(PRIMARY_KEY_LOOKUP); + @Override + public void write(InternalRow row, Context context) { + int numberExecutors = row.getInt(0); + if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) { + throw new IllegalArgumentException( + String.format( + "Number Executors can not be changed! Old %s , New %s .", + this.numberExecutors, numberExecutors)); + } + this.numberExecutors = numberExecutors; + + int executorId = row.getInt(1); + String hostname = row.getString(2).toString(); + int port = row.getInt(3); + + executors.put(executorId, new InetSocketAddress(hostname, port)); + + if (executors.size() == numberExecutors) { + serviceManager.resetService( + PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); + } + } + + @Override + public void flush(boolean endOfInput) {} + + @Override + public void close() { + serviceManager.deleteService(PRIMARY_KEY_LOOKUP); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java index bd433fe0f00d0..752d54cff5a0f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java @@ -62,7 +62,7 @@ public static void build(StreamExecutionEnvironment env, Table table, int parall InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), executorOperator) .setParallelism(parallelism) - .addSink(new QueryAddressRegister(table)) + .sinkTo(new QueryAddressRegister(table)) .setParallelism(1); sink.getTransformation().setMaxParallelism(1);