Skip to content

Commit

Permalink
[flink] Replace legacy SinkFunction with v2 Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 23, 2024
1 parent e80d4bc commit cfdc6d6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,66 +23,76 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;

import java.net.InetSocketAddress;
import java.util.TreeMap;

import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;

/** Operator for address server to register addresses to {@link ServiceManager}. */
public class QueryAddressRegister extends RichSinkFunction<InternalRow> {

public class QueryAddressRegister implements Sink<InternalRow> {
private final ServiceManager serviceManager;

private transient int numberExecutors;
private transient TreeMap<Integer, InetSocketAddress> executors;

public QueryAddressRegister(Table table) {
this.serviceManager = ((FileStoreTable) table).store().newServiceManager();
}

@Override
public void open(OpenContext openContext) throws Exception {
open(new Configuration());
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.18-.
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 2.0+.
*/
public void open(Configuration parameters) throws Exception {
this.executors = new TreeMap<>();
public SinkWriter<InternalRow> createWriter(InitContext context) {
return new QueryAddressRegisterSinkWriter(serviceManager);
}

@Override
public void invoke(InternalRow row, SinkFunction.Context context) {
int numberExecutors = row.getInt(0);
if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) {
throw new IllegalArgumentException(
String.format(
"Number Executors can not be changed! Old %s , New %s .",
this.numberExecutors, numberExecutors));
}
this.numberExecutors = numberExecutors;
public SinkWriter<InternalRow> createWriter(WriterInitContext context) {
return new QueryAddressRegisterSinkWriter(serviceManager);
}

int executorId = row.getInt(1);
String hostname = row.getString(2).toString();
int port = row.getInt(3);
private static class QueryAddressRegisterSinkWriter implements SinkWriter<InternalRow> {
private final ServiceManager serviceManager;

executors.put(executorId, new InetSocketAddress(hostname, port));
private final TreeMap<Integer, InetSocketAddress> executors;

if (executors.size() == numberExecutors) {
serviceManager.resetService(
PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0]));
private int numberExecutors;

private QueryAddressRegisterSinkWriter(ServiceManager serviceManager) {
this.serviceManager = serviceManager;
this.executors = new TreeMap<>();
}
}

@Override
public void close() throws Exception {
super.close();
serviceManager.deleteService(PRIMARY_KEY_LOOKUP);
@Override
public void write(InternalRow row, Context context) {
int numberExecutors = row.getInt(0);
if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) {
throw new IllegalArgumentException(
String.format(
"Number Executors can not be changed! Old %s , New %s .",
this.numberExecutors, numberExecutors));
}
this.numberExecutors = numberExecutors;

int executorId = row.getInt(1);
String hostname = row.getString(2).toString();
int port = row.getInt(3);

executors.put(executorId, new InetSocketAddress(hostname, port));

if (executors.size() == numberExecutors) {
serviceManager.resetService(
PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0]));
}
}

@Override
public void flush(boolean endOfInput) {}

@Override
public void close() {
serviceManager.deleteService(PRIMARY_KEY_LOOKUP);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void build(StreamExecutionEnvironment env, Table table, int parall
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
executorOperator)
.setParallelism(parallelism)
.addSink(new QueryAddressRegister(table))
.sinkTo(new QueryAddressRegister(table))
.setParallelism(1);

sink.getTransformation().setMaxParallelism(1);
Expand Down

0 comments on commit cfdc6d6

Please sign in to comment.