Skip to content

Commit

Permalink
add multisink to DorisBatchSink (apache#223)
Browse files Browse the repository at this point in the history
Support multi-table writing on DorisBatchSink
Example:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DorisBatchSink.Builder<RecordWithMeta> builder = DorisBatchSink.builder();
        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

        Properties properties = new Properties();
        properties.setProperty("column_separator", ",");
        properties.setProperty("line_delimiter", "\n");
        properties.setProperty("format", "csv");
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes("127.0.0.1:8030")
                .setTableIdentifier("")
                .setUsername("root")
                .setPassword("");

        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

        executionBuilder.setLabelPrefix("label")
                .setStreamLoadProp(properties)
                .setDeletable(false)
                .setBufferFlushMaxBytes(8 * 1024)
                .setBufferFlushMaxRows(10)
                .setBufferFlushIntervalMs(1000 * 10);

        builder.setDorisReadOptions(readOptionBuilder.build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisBuilder.build());

       //Multiple table writing
       RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
       RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");           
       DataStreamSource<RecordWithMeta> source = env.fromCollection(Arrays.asList(record, record1));
       source.sinkTo(builder.build());

```
  • Loading branch information
JNSimba authored Nov 6, 2023
1 parent 9e68239 commit 32fd54e
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class BatchRecordBuffer {
private int numOfRecords = 0;
private int bufferSizeBytes = 0;
private boolean loadBatchFirstRecord = true;
private String database;
private String table;

public BatchRecordBuffer(){}

Expand All @@ -45,6 +47,14 @@ public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
this.buffer = ByteBuffer.allocate(bufferSize);
}

public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, int bufferSize) {
super();
this.database = database;
this.table = table;
this.lineDelimiter = lineDelimiter;
this.buffer = ByteBuffer.allocate(bufferSize);
}

public void insert(byte[] record) {
ensureCapacity(record.length);
if(loadBatchFirstRecord){
Expand Down Expand Up @@ -141,4 +151,19 @@ public void setBufferSizeBytes(int bufferSizeBytes) {
this.bufferSizeBytes = bufferSizeBytes;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public DorisBatchSink.Builder<IN> setSerializer(DorisRecordSerializer<IN> serial
public DorisBatchSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.batch;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand All @@ -29,8 +30,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.util.Preconditions;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand All @@ -44,9 +44,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -77,16 +78,14 @@ public class DorisBatchStreamLoad implements Serializable {
private String hostPort;
private final String username;
private final String password;
private final String db;
private final String table;
private final Properties loadProps;
private BatchRecordBuffer buffer;
private Map<String, BatchRecordBuffer> bufferMap = new ConcurrentHashMap<>();
private DorisExecutionOptions executionOptions;
private ExecutorService loadExecutorService;
private LoadAsyncExecutor loadAsyncExecutor;
private BlockingQueue<BatchRecordBuffer> writeQueue;
private BlockingQueue<BatchRecordBuffer> readQueue;
private BlockingQueue<BatchRecordBuffer> flushQueue;
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
private BackendUtil backendUtil;
Expand All @@ -99,24 +98,18 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions,
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
this.executionOptions = executionOptions;
//init queue
this.writeQueue = new ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
LOG.info("init RecordBuffer capacity {}, count {}", executionOptions.getBufferFlushMaxBytes(), executionOptions.getFlushQueueSize());
for (int index = 0; index < executionOptions.getFlushQueueSize(); index++) {
this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table");
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
}
readQueue = new LinkedBlockingDeque<>();

this.loadAsyncExecutor= new LoadAsyncExecutor();
this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
Expand All @@ -128,47 +121,49 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions,
* @param record
* @throws IOException
*/
public synchronized void writeRecord(byte[] record) throws InterruptedException {
public synchronized void writeRecord(String database, String table, byte[] record) throws InterruptedException {
checkFlushException();
if(buffer == null){
buffer = takeRecordFromWriteQueue();
}
String bufferKey = getTableIdentifier(database, table);
BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> new BatchRecordBuffer(database, table, this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
buffer.insert(record);
//When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion
if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8
|| (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
flush(false);
flush(bufferKey,false);
}
}

public synchronized void flush(boolean waitUtilDone) throws InterruptedException {
public synchronized void flush(String bufferKey, boolean waitUtilDone) throws InterruptedException {
checkFlushException();
if (buffer != null && !buffer.isEmpty()) {
buffer.setLabelName(labelGenerator.generateBatchLabel());
BatchRecordBuffer tmpBuff = buffer;
readQueue.put(tmpBuff);
this.buffer = null;
if (null == bufferKey) {
for (String key : bufferMap.keySet()) {
flushBuffer(key);
}
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
}

if (waitUtilDone) {
waitAsyncLoadFinish();
}
}

private void putRecordToWriteQueue(BatchRecordBuffer buffer){
try {
writeQueue.put(buffer);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to recycle a buffer to queue");
}
private synchronized void flushBuffer(String bufferKey) {
BatchRecordBuffer buffer = bufferMap.get(bufferKey);
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
putRecordToFlushQueue(buffer);
bufferMap.remove(bufferKey);
}

private BatchRecordBuffer takeRecordFromWriteQueue(){
private void putRecordToFlushQueue(BatchRecordBuffer buffer){
checkFlushException();
if(!loadThreadAlive){
throw new RuntimeException("load thread already exit, write was interrupted");
}
try {
return writeQueue.take();
flushQueue.put(buffer);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to take a buffer from queue");
throw new RuntimeException("Failed to put record buffer to flush queue");
}
}

Expand All @@ -178,31 +173,34 @@ private void checkFlushException() {
}
}

private void waitAsyncLoadFinish() throws InterruptedException {
private void waitAsyncLoadFinish() {
for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
BatchRecordBuffer empty = takeRecordFromWriteQueue();
readQueue.put(empty);
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
}

private String getTableIdentifier(String database, String table) {
return database + "." + table;
}

public void close(){
//close async executor
this.loadExecutorService.shutdown();
this.started.set(false);

//clear buffer
this.writeQueue.clear();
this.readQueue.clear();
this.flushQueue.clear();
}

class LoadAsyncExecutor implements Runnable {
@Override
public void run() {
LOG.info("LoadAsyncExecutor start");
loadThreadAlive = true;
while (started.get()) {
BatchRecordBuffer buffer = null;
try {
buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS);
buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if(buffer == null){
continue;
}
Expand All @@ -212,23 +210,20 @@ public void run() {
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);
//clear queue to avoid writer thread blocking
flushQueue.clear();
break;
} finally {
//Recycle buffer to avoid writer thread blocking
if(buffer != null){
buffer.clear();
putRecordToWriteQueue(buffer);
}
}
}
LOG.info("LoadAsyncExecutor stop");
loadThreadAlive = false;
}

/**
* execute stream load
*/
public void load(String label, BatchRecordBuffer buffer) throws IOException{
refreshLoadUrl();
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
ByteBuffer data = buffer.getData();
ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
HttpPutBuilder putBuilder = new HttpPutBuilder();
Expand Down Expand Up @@ -266,14 +261,16 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{
}
retry++;
// get available backend retry
refreshLoadUrl();
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
putBuilder.setUrl(loadUrl);
}
buffer.clear();
buffer = null;
}

private void refreshLoadUrl(){
private void refreshLoadUrl(String database, String table){
hostPort = backendUtil.getAvailableBackend();
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -46,12 +49,20 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> {
private final DorisRecordSerializer<IN> serializer;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient volatile Exception flushException = null;
private String database;
private String table;

public DorisBatchWriter(Sink.InitContext initContext,
DorisRecordSerializer<IN> serializer,
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table");
this.database = tableInfo[0];
this.table = tableInfo[1];
}
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
this.labelGenerator = new LabelGenerator(labelPrefix, false);
Expand All @@ -72,7 +83,7 @@ public void initializeLoad() throws IOException {
private void intervalFlush() {
try {
LOG.info("interval flush triggered.");
batchStreamLoad.flush(false);
batchStreamLoad.flush(null, false);
} catch (InterruptedException e) {
flushException = e;
}
Expand All @@ -81,18 +92,30 @@ private void intervalFlush() {
@Override
public void write(IN in, Context context) throws IOException, InterruptedException {
checkFlushException();
if(in instanceof RecordWithMeta){
RecordWithMeta row = (RecordWithMeta) in;
if(StringUtils.isNullOrWhitespaceOnly(row.getTable())
||StringUtils.isNullOrWhitespaceOnly(row.getDatabase())
||row.getRecord() == null){
LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
return;
}
batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(), row.getRecord().getBytes(StandardCharsets.UTF_8));
return;
}

byte[] serialize = serializer.serialize(in);
if(Objects.isNull(serialize)){
//ddl record
return;
}
batchStreamLoad.writeRecord(serialize);
batchStreamLoad.writeRecord(database, table, serialize);
}
@Override
public void flush(boolean flush) throws IOException, InterruptedException {
checkFlushException();
LOG.info("checkpoint flush triggered.");
batchStreamLoad.flush(true);
batchStreamLoad.flush(null, true);
}

@Override
Expand Down
Loading

0 comments on commit 32fd54e

Please sign in to comment.