diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 5fa601d21..1de6253c3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -36,6 +36,8 @@ public class BatchRecordBuffer { private int numOfRecords = 0; private int bufferSizeBytes = 0; private boolean loadBatchFirstRecord = true; + private String database; + private String table; public BatchRecordBuffer(){} @@ -45,6 +47,14 @@ public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { this.buffer = ByteBuffer.allocate(bufferSize); } + public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, int bufferSize) { + super(); + this.database = database; + this.table = table; + this.lineDelimiter = lineDelimiter; + this.buffer = ByteBuffer.allocate(bufferSize); + } + public void insert(byte[] record) { ensureCapacity(record.length); if(loadBatchFirstRecord){ @@ -141,4 +151,19 @@ public void setBufferSizeBytes(int bufferSizeBytes) { this.bufferSizeBytes = bufferSizeBytes; } + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java index 2c578d407..37d3973e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java @@ -87,7 +87,6 @@ public DorisBatchSink.Builder setSerializer(DorisRecordSerializer serial public DorisBatchSink build() { Preconditions.checkNotNull(dorisOptions); Preconditions.checkNotNull(dorisExecutionOptions); - Preconditions.checkNotNull(serializer); if(dorisReadOptions == null) { dorisReadOptions = DorisReadOptions.builder().build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index b6a3f651a..6adb436b5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink.batch; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -29,8 +30,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; - -import org.apache.commons.lang3.StringUtils; +import org.apache.flink.util.Preconditions; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -44,9 +44,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -77,16 +78,14 @@ public class DorisBatchStreamLoad implements Serializable { private String hostPort; private final String username; private final String password; - private final String db; - private final String table; private final Properties loadProps; - private BatchRecordBuffer buffer; + private Map bufferMap = new ConcurrentHashMap<>(); private DorisExecutionOptions executionOptions; private ExecutorService loadExecutorService; private LoadAsyncExecutor loadAsyncExecutor; - private BlockingQueue writeQueue; - private BlockingQueue readQueue; + private BlockingQueue flushQueue; private final AtomicBoolean started; + private volatile boolean loadThreadAlive = false; private AtomicReference exception = new AtomicReference<>(null); private CloseableHttpClient httpClient = new HttpUtil().getHttpClient(); private BackendUtil backendUtil; @@ -99,24 +98,18 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions, dorisOptions.getBenodes()) : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); this.hostPort = backendUtil.getAvailableBackend(); - String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); - this.db = tableInfo[0]; - this.table = tableInfo[1]; this.username = dorisOptions.getUsername(); this.password = dorisOptions.getPassword(); - this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table); this.loadProps = executionOptions.getStreamLoadProp(); this.labelGenerator = labelGenerator; this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); this.executionOptions = executionOptions; - //init queue - this.writeQueue = new ArrayBlockingQueue<>(executionOptions.getFlushQueueSize()); - LOG.info("init RecordBuffer capacity {}, count {}", executionOptions.getBufferFlushMaxBytes(), executionOptions.getFlushQueueSize()); - for (int index = 0; index < executionOptions.getFlushQueueSize(); index++) { - this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, executionOptions.getBufferFlushMaxBytes())); + this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); + if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){ + String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); + Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); + this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); } - readQueue = new LinkedBlockingDeque<>(); - this.loadAsyncExecutor= new LoadAsyncExecutor(); this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy()); this.started = new AtomicBoolean(true); @@ -128,26 +121,26 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions, * @param record * @throws IOException */ - public synchronized void writeRecord(byte[] record) throws InterruptedException { + public synchronized void writeRecord(String database, String table, byte[] record) throws InterruptedException { checkFlushException(); - if(buffer == null){ - buffer = takeRecordFromWriteQueue(); - } + String bufferKey = getTableIdentifier(database, table); + BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> new BatchRecordBuffer(database, table, this.lineDelimiter, executionOptions.getBufferFlushMaxBytes())); buffer.insert(record); //When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 || (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - flush(false); + flush(bufferKey,false); } } - public synchronized void flush(boolean waitUtilDone) throws InterruptedException { + public synchronized void flush(String bufferKey, boolean waitUtilDone) throws InterruptedException { checkFlushException(); - if (buffer != null && !buffer.isEmpty()) { - buffer.setLabelName(labelGenerator.generateBatchLabel()); - BatchRecordBuffer tmpBuff = buffer; - readQueue.put(tmpBuff); - this.buffer = null; + if (null == bufferKey) { + for (String key : bufferMap.keySet()) { + flushBuffer(key); + } + } else if (bufferMap.containsKey(bufferKey)) { + flushBuffer(bufferKey); } if (waitUtilDone) { @@ -155,20 +148,22 @@ public synchronized void flush(boolean waitUtilDone) throws InterruptedException } } - private void putRecordToWriteQueue(BatchRecordBuffer buffer){ - try { - writeQueue.put(buffer); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to recycle a buffer to queue"); - } + private synchronized void flushBuffer(String bufferKey) { + BatchRecordBuffer buffer = bufferMap.get(bufferKey); + buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable())); + putRecordToFlushQueue(buffer); + bufferMap.remove(bufferKey); } - private BatchRecordBuffer takeRecordFromWriteQueue(){ + private void putRecordToFlushQueue(BatchRecordBuffer buffer){ checkFlushException(); + if(!loadThreadAlive){ + throw new RuntimeException("load thread already exit, write was interrupted"); + } try { - return writeQueue.take(); + flushQueue.put(buffer); } catch (InterruptedException e) { - throw new RuntimeException("Failed to take a buffer from queue"); + throw new RuntimeException("Failed to put record buffer to flush queue"); } } @@ -178,31 +173,34 @@ private void checkFlushException() { } } - private void waitAsyncLoadFinish() throws InterruptedException { + private void waitAsyncLoadFinish() { for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){ - BatchRecordBuffer empty = takeRecordFromWriteQueue(); - readQueue.put(empty); + BatchRecordBuffer empty = new BatchRecordBuffer(); + putRecordToFlushQueue(empty); } } + private String getTableIdentifier(String database, String table) { + return database + "." + table; + } + public void close(){ //close async executor this.loadExecutorService.shutdown(); this.started.set(false); - //clear buffer - this.writeQueue.clear(); - this.readQueue.clear(); + this.flushQueue.clear(); } class LoadAsyncExecutor implements Runnable { @Override public void run() { LOG.info("LoadAsyncExecutor start"); + loadThreadAlive = true; while (started.get()) { BatchRecordBuffer buffer = null; try { - buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS); + buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); if(buffer == null){ continue; } @@ -212,23 +210,20 @@ public void run() { } catch (Exception e) { LOG.error("worker running error", e); exception.set(e); + //clear queue to avoid writer thread blocking + flushQueue.clear(); break; - } finally { - //Recycle buffer to avoid writer thread blocking - if(buffer != null){ - buffer.clear(); - putRecordToWriteQueue(buffer); - } } } LOG.info("LoadAsyncExecutor stop"); + loadThreadAlive = false; } /** * execute stream load */ public void load(String label, BatchRecordBuffer buffer) throws IOException{ - refreshLoadUrl(); + refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); HttpPutBuilder putBuilder = new HttpPutBuilder(); @@ -266,14 +261,16 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{ } retry++; // get available backend retry - refreshLoadUrl(); + refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); putBuilder.setUrl(loadUrl); } + buffer.clear(); + buffer = null; } - private void refreshLoadUrl(){ + private void refreshLoadUrl(String database, String table){ hostPort = backendUtil.getAvailableBackend(); - loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table); + loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index d4621c765..6b2ce0223 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -24,11 +24,14 @@ import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -46,12 +49,20 @@ public class DorisBatchWriter implements SinkWriter { private final DorisRecordSerializer serializer; private final transient ScheduledExecutorService scheduledExecutorService; private transient volatile Exception flushException = null; + private String database; + private String table; public DorisBatchWriter(Sink.InitContext initContext, DorisRecordSerializer serializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions executionOptions) { + if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ + String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); + Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); + this.database = tableInfo[0]; + this.table = tableInfo[1]; + } LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); this.labelGenerator = new LabelGenerator(labelPrefix, false); @@ -72,7 +83,7 @@ public void initializeLoad() throws IOException { private void intervalFlush() { try { LOG.info("interval flush triggered."); - batchStreamLoad.flush(false); + batchStreamLoad.flush(null, false); } catch (InterruptedException e) { flushException = e; } @@ -81,18 +92,30 @@ private void intervalFlush() { @Override public void write(IN in, Context context) throws IOException, InterruptedException { checkFlushException(); + if(in instanceof RecordWithMeta){ + RecordWithMeta row = (RecordWithMeta) in; + if(StringUtils.isNullOrWhitespaceOnly(row.getTable()) + ||StringUtils.isNullOrWhitespaceOnly(row.getDatabase()) + ||row.getRecord() == null){ + LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord()); + return; + } + batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(), row.getRecord().getBytes(StandardCharsets.UTF_8)); + return; + } + byte[] serialize = serializer.serialize(in); if(Objects.isNull(serialize)){ //ddl record return; } - batchStreamLoad.writeRecord(serialize); + batchStreamLoad.writeRecord(database, table, serialize); } @Override public void flush(boolean flush) throws IOException, InterruptedException { checkFlushException(); LOG.info("checkpoint flush triggered."); - batchStreamLoad.flush(true); + batchStreamLoad.flush(null, true); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java new file mode 100644 index 000000000..7f4d26914 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +public class RecordWithMeta { + private String database; + private String table; + private String record; + + public RecordWithMeta() { + } + + public RecordWithMeta(String database, String table, String record) { + this.database = database; + this.table = table; + this.record = record; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getRecord() { + return record; + } + + public void setRecord(String record) { + this.record = record; + } + + public String getTableIdentifier() { + return this.database + "." + this.table; + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 235b553d0..55f781154 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -38,4 +38,8 @@ public String generateLabel(long chkId) { public String generateBatchLabel() { return labelPrefix + "_" + UUID.randomUUID(); } + + public String generateBatchLabel(String table) { + return String.format("%s_%s_%s", labelPrefix, table, UUID.randomUUID()); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java new file mode 100644 index 000000000..eade29261 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.batch.DorisBatchSink; +import org.apache.doris.flink.sink.batch.RecordWithMeta; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Properties; +import java.util.UUID; + + +public class DorisSinkMultiTableExample { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DorisBatchSink.Builder builder = DorisBatchSink.builder(); + final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); + + readOptionBuilder.setDeserializeArrowAsync(false) + .setDeserializeQueueSize(64) + .setExecMemLimit(2147483648L) + .setRequestQueryTimeoutS(3600) + .setRequestBatchSize(1000) + .setRequestConnectTimeoutMs(10000) + .setRequestReadTimeoutMs(10000) + .setRequestRetries(3) + .setRequestTabletSize(1024 * 1024); + + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("line_delimiter", "\n"); + properties.setProperty("format", "csv"); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.test_flink_tmp") + .setUsername("root") + .setPassword(""); + + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + + executionBuilder.setLabelPrefix("label") + .setStreamLoadProp(properties) + .setDeletable(false) + .setBufferFlushMaxBytes(8 * 1024) + .setBufferFlushMaxRows(10) + .setBufferFlushIntervalMs(1000 * 10); + + builder.setDorisReadOptions(readOptionBuilder.build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setDorisOptions(dorisBuilder.build()); + +// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); +// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); +// DataStreamSource stringDataStreamSource = env.fromCollection( +// Arrays.asList(record, record1)); +// stringDataStreamSource.sinkTo(builder.build()); + + env.addSource(new SourceFunction() { + private Long id = 1000000L; + @Override + public void run(SourceContext out) throws Exception { + while (true) { + id = id + 1; + RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", UUID.randomUUID() + ",1"); + out.collect(record); + record = new RecordWithMeta("test", "test_flink_tmp", UUID.randomUUID() + ",1"); + out.collect(record); + Thread.sleep(3000); + } + } + + @Override + public void cancel() { + + } + }).sinkTo(builder.build()); + + env.execute("doris multi table test"); + } + +}