From 0b2a14a93d3b65115c25fe59d84d6b3375da517d Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 6 Nov 2023 16:56:04 +0800 Subject: [PATCH] [improve] add multi table sink to DorisSink (#224) DorisSink supports multi-table import. Example: ```java Configuration config = new Configuration(); // config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/"); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000))); env.enableCheckpointing(10000); DorisSink.Builder builder = DorisSink.builder(); final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("127.0.0.1:8030") .setTableIdentifier("") .setUsername("root") .setPassword(""); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("xxx12") .setStreamLoadProp(properties) .setDeletable(false).enable2PC(); builder.setDorisReadOptions(readOptionBuilder.build()) .setDorisExecutionOptions(executionBuilder.build()) .setDorisOptions(dorisBuilder.build()); RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); DataStreamSource stringDataStreamSource = env.fromCollection( Arrays.asList(record, record1)); stringDataStreamSource.sinkTo(builder.build()); ``` For details, please refer to `org.apache.doris.flink.DorisSinkStreamMultiTableExample.java` --- .../apache/doris/flink/cfg/DorisOptions.java | 4 + .../apache/doris/flink/sink/BackendUtil.java | 12 + .../apache/doris/flink/sink/DorisSink.java | 1 - .../flink/sink/writer/DorisStreamLoad.java | 12 +- .../doris/flink/sink/writer/DorisWriter.java | 211 +++++++++++++----- .../flink/sink/writer/DorisWriterState.java | 32 ++- .../writer/DorisWriterStateSerializer.java | 17 +- .../flink/sink/writer/LabelGenerator.java | 19 +- .../DorisSinkStreamMultiTableExample.java | 100 +++++++++ .../apache/doris/flink/sink/HttpTestUtil.java | 41 ++++ .../sink/writer/TestDorisStreamLoad.java | 12 +- .../flink/sink/writer/TestDorisWriter.java | 12 +- .../TestDorisWriterStateSerializer.java | 4 +- 13 files changed, 394 insertions(+), 83 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index f560eae3e..6391e9142 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -52,6 +52,10 @@ public String getTableIdentifier() { return tableIdentifier; } + public void setTableIdentifier(String tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + public String save() throws IllegalArgumentException { Properties copy = new Properties(); return IOUtils.propsToString(copy); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 0d45e2f28..954bdd07b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -17,7 +17,11 @@ package org.apache.doris.flink.sink; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; @@ -62,6 +66,14 @@ private List initBackends(String beNodes) { return backends; } + public static BackendUtil getInstance(DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger){ + if(StringUtils.isNotEmpty(dorisOptions.getBenodes())){ + return new BackendUtil(dorisOptions.getBenodes()); + } else { + return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger)); + } + } + public String getAvailableBackend() { long tmp = pos + backends.size(); while (pos < tmp) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index bc2d45c86..3adcd9ee1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -140,7 +140,6 @@ public Builder setSerializer(DorisRecordSerializer serializer) { public DorisSink build() { Preconditions.checkNotNull(dorisOptions); Preconditions.checkNotNull(dorisExecutionOptions); - Preconditions.checkNotNull(serializer); if(dorisReadOptions == null) { dorisReadOptions = DorisReadOptions.builder().build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 4fd9abfa9..fb904ef72 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -114,6 +114,10 @@ public String getDb() { return db; } + public String getTable() { + return table; + } + public String getHostPort() { return hostPort; } @@ -141,7 +145,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { try { // TODO: According to label abort txn. Currently, it can only be aborted based on txnid, // so we must first request a streamload based on the label to get the txnid. - String label = labelGenerator.generateLabel(startChkID); + String label = labelGenerator.generateTableLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) .baseAuth(user, passwd) @@ -215,7 +219,7 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw public RespContent stopLoad(String label) throws IOException{ recordStream.endInput(); - LOG.info("stream load stopped for {} on host {}", label, hostPort); + LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort); Preconditions.checkState(pendingLoadFuture != null); try { return handlePreCommitResponse(pendingLoadFuture.get()); @@ -233,7 +237,7 @@ public void startLoad(String label, boolean isResume) throws IOException { loadBatchFirstRecord = !isResume; HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(isResume); - LOG.info("stream load started for {} on host {}", label, hostPort); + LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); try { InputStreamEntity entity = new InputStreamEntity(recordStream); putBuilder.setUrl(loadUrlStr) @@ -247,7 +251,7 @@ public void startLoad(String label, boolean isResume) throws IOException { putBuilder.enable2PC(); } pendingLoadFuture = executorService.submit(() -> { - LOG.info("start execute load"); + LOG.info("table {} start execute load", table); return httpClient.execute(putBuilder.build()); }); } catch (Exception e) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index b61d174bf..8550a214e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -23,28 +23,31 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.exception.StreamLoadException; -import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpUtil; +import org.apache.doris.flink.sink.batch.RecordWithMeta; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -62,21 +65,21 @@ public class DorisWriter implements StatefulSink.StatefulSinkWriter DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; private long curCheckpointId; - private DorisStreamLoad dorisStreamLoad; - volatile boolean loading; + private Map dorisStreamLoadMap = new ConcurrentHashMap<>(); + private Map labelGeneratorMap = new ConcurrentHashMap<>();; + volatile boolean globalLoading; + private Map loadingMap = new ConcurrentHashMap<>(); private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; private final DorisExecutionOptions executionOptions; private final String labelPrefix; - private final LabelGenerator labelGenerator; + private final int subtaskId; private final int intervalTime; - private final DorisWriterState dorisWriterState; private final DorisRecordSerializer serializer; private final transient ScheduledExecutorService scheduledExecutorService; private transient Thread executorThread; private transient volatile Exception loadException = null; private BackendUtil backendUtil; - private String currentLabel; public DorisWriter(Sink.InitContext initContext, Collection state, @@ -91,106 +94,188 @@ public DorisWriter(Sink.InitContext initContext, this.curCheckpointId = lastCheckpointId + 1; LOG.info("restore checkpointId {}", lastCheckpointId); LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); - this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix()); - this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); - this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC()); + this.labelPrefix = executionOptions.getLabelPrefix(); + this.subtaskId = initContext.getSubtaskId(); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); this.serializer = serializer; this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; this.executionOptions = executionOptions; this.intervalTime = executionOptions.checkInterval(); - this.loading = false; + this.globalLoading = false; initializeLoad(state); } public void initializeLoad(Collection state) { - this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( - dorisOptions.getBenodes()) - : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG); try { - this.dorisStreamLoad = new DorisStreamLoad( - backendUtil.getAvailableBackend(), - dorisOptions, - executionOptions, - labelGenerator, new HttpUtil().getHttpClient()); - // TODO: we need check and abort all pending transaction. - // Discard transactions that may cause the job to fail. if(executionOptions.enabled2PC()) { - dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); + abortLingeringTransactions(state); } } catch (Exception e) { + LOG.error("Failed to abort transaction.", e); throw new DorisRuntimeException(e); } // get main work thread. executorThread = Thread.currentThread(); - this.currentLabel = labelGenerator.generateLabel(curCheckpointId); // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } + private void abortLingeringTransactions(Collection recoveredStates) throws Exception { + List alreadyAborts = new ArrayList<>(); + //abort label in state + for(DorisWriterState state : recoveredStates){ + // Todo: When the sink parallelism is reduced, + // the txn of the redundant task before aborting is also needed. + if(!state.getLabelPrefix().equals(labelPrefix)){ + LOG.warn("Label prefix from previous execution {} has changed to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix()); + } + String key = state.getDatabase() + "." + state.getTable(); + DorisStreamLoad streamLoader = getStreamLoader(key); + streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); + alreadyAborts.add(state.getLabelPrefix()); + } + + // TODO: In a multi-table scenario, if do not restore from checkpoint, + // when modify labelPrefix at startup, we cannot abort the previous label. + if(!alreadyAborts.contains(labelPrefix) + && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier()) + && StringUtils.isNotEmpty(labelPrefix)){ + //abort current labelPrefix + DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier()); + streamLoader.abortPreCommit(labelPrefix, curCheckpointId); + } + } + @Override public void write(IN in, Context context) throws IOException { checkLoadException(); - byte[] serialize = serializer.serialize(in); - if(Objects.isNull(serialize)){ - //ddl record + Tuple2 rowTuple = serializeRecord(in); + String tableKey = rowTuple.f0; + byte[] serializeRow = rowTuple.f1; + if(serializeRow == null){ return; } - if(!loading) { + + DorisStreamLoad streamLoader = getStreamLoader(tableKey); + if(!loadingMap.containsKey(tableKey)) { // start stream load only when there has data - dorisStreamLoad.startLoad(currentLabel, false); - loading = true; + LabelGenerator labelGenerator = getLabelGenerator(tableKey); + String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); + streamLoader.startLoad(currentLabel, false); + loadingMap.put(tableKey, true); + globalLoading = true; + } + streamLoader.writeRecord(serializeRow); + } + + private Tuple2 serializeRecord(IN in) throws IOException { + String tableKey = dorisOptions.getTableIdentifier(); + byte[] serializeRow = null; + if(serializer != null) { + serializeRow = serializer.serialize(in); + if(Objects.isNull(serializeRow)){ + //ddl record by JsonDebeziumSchemaSerializer + return Tuple2.of(tableKey, null); + } } - dorisStreamLoad.writeRecord(serialize); + //multi table load + if(in instanceof RecordWithMeta){ + RecordWithMeta row = (RecordWithMeta) in; + if(StringUtils.isBlank(row.getTable()) + || StringUtils.isBlank(row.getDatabase()) + || row.getRecord() == null){ + LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord()); + return Tuple2.of(tableKey, null); + } + tableKey = row.getDatabase() + "." + row.getTable(); + serializeRow = row.getRecord().getBytes(StandardCharsets.UTF_8); + } + return Tuple2.of(tableKey, serializeRow); } @Override public void flush(boolean flush) throws IOException, InterruptedException { - + //No action is triggered, everything is in the precommit method } - @Override public Collection prepareCommit() throws IOException, InterruptedException { - if(!loading){ - //There is no data during the entire checkpoint period + // Verify whether data is written during a checkpoint + if(!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)){ return Collections.emptyList(); } // disable exception checker before stop load. - loading = false; - Preconditions.checkState(dorisStreamLoad != null); - RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); - if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); - throw new DorisRuntimeException(errMsg); - } - if (!executionOptions.enabled2PC()) { - return Collections.emptyList(); + globalLoading = false; + // clean loadingMap + loadingMap.clear(); + + // submit stream load http request + List committableList = new ArrayList<>(); + for(Map.Entry streamLoader : dorisStreamLoadMap.entrySet()){ + String tableIdentifier = streamLoader.getKey(); + DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); + LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); + String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); + RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + String errMsg = String.format("tabel {} stream load error: %s, see more in %s", tableIdentifier, respContent.getMessage(), respContent.getErrorURL()); + throw new DorisRuntimeException(errMsg); + } + if(executionOptions.enabled2PC()){ + long txnId = respContent.getTxnId(); + committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); + } } - long txnId = respContent.getTxnId(); - return Collections.singletonList(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); + return committableList; } @Override public List snapshotState(long checkpointId) throws IOException { - Preconditions.checkState(dorisStreamLoad != null); - // dynamic refresh BE node - this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + List writerStates = new ArrayList<>(); + for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ + //Dynamic refresh backend + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + DorisWriterState writerState = new DorisWriterState(labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), subtaskId); + writerStates.add(writerState); + } this.curCheckpointId = checkpointId + 1; - this.currentLabel = labelGenerator.generateLabel(curCheckpointId); - return Collections.singletonList(dorisWriterState); + return writerStates; + } + + private LabelGenerator getLabelGenerator(String tableKey){ + return labelGeneratorMap.computeIfAbsent(tableKey, v-> new LabelGenerator(labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); + } + + private DorisStreamLoad getStreamLoader(String tableKey){ + LabelGenerator labelGenerator = getLabelGenerator(tableKey); + dorisOptions.setTableIdentifier(tableKey); + return dorisStreamLoadMap.computeIfAbsent(tableKey, v -> new DorisStreamLoad(backendUtil.getAvailableBackend(), + dorisOptions, + executionOptions, + labelGenerator, + new HttpUtil().getHttpClient())); } + /** + * Check the streamload http request regularly + */ private void checkDone() { + for(Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()){ + checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); + } + } + + private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad){ // the load future is done and checked in prepareCommit(). // this will check error while loading. LOG.debug("start timer checker, interval {} ms", intervalTime); if (dorisStreamLoad.getPendingLoadFuture() != null && dorisStreamLoad.getPendingLoadFuture().isDone()) { - if (!loading) { - LOG.debug("not loading, skip timer checker"); + if (!globalLoading || !loadingMap.get(tableIdentifier)) { + LOG.debug("not loading, skip timer checker for table {}", tableIdentifier); return; } @@ -202,13 +287,14 @@ private void checkDone() { // use send cached data to new txn, then notify to restart the stream if (executionOptions.isUseCache()) { try { - this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); if (executionOptions.enabled2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } // start a new txn(stream load) - LOG.info("getting exception, breakpoint resume for checkpoint ID: {}", curCheckpointId); - dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true); + LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", curCheckpointId, tableIdentifier); + LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); + dorisStreamLoad.startLoad(labelGenerator.generateTableLabel(curCheckpointId), true); } catch (Exception e) { throw new DorisRuntimeException(e); } @@ -222,7 +308,7 @@ private void checkDone() { } loadException = new StreamLoadException(errorMsg); - LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg); + LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", tableIdentifier, errorMsg); // set the executor thread interrupted in case blocking in write data. executorThread.interrupt(); } @@ -238,21 +324,24 @@ private void checkLoadException() { @VisibleForTesting public boolean isLoading() { - return this.loading; + return this.globalLoading; } @VisibleForTesting - public void setDorisStreamLoad(DorisStreamLoad streamLoad) { - this.dorisStreamLoad = streamLoad; + public void setDorisStreamLoadMap(Map streamLoadMap) { + this.dorisStreamLoadMap = streamLoadMap; } @Override public void close() throws Exception { + LOG.info("Close DorisWriter."); if (scheduledExecutorService != null) { scheduledExecutorService.shutdownNow(); } - if (dorisStreamLoad != null) { - dorisStreamLoad.close(); + if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) { + for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ + dorisStreamLoad.close(); + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java index 1ec9727ec..a08e58ddb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java @@ -24,14 +24,36 @@ */ public class DorisWriterState { String labelPrefix; + String database; + String table; + int subtaskId; public DorisWriterState(String labelPrefix) { this.labelPrefix = labelPrefix; } + public DorisWriterState(String labelPrefix, String database, String table, int subtaskId) { + this.labelPrefix = labelPrefix; + this.database = database; + this.table = table; + this.subtaskId = subtaskId; + } + public String getLabelPrefix() { return labelPrefix; } + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public int getSubtaskId() { + return subtaskId; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -41,18 +63,24 @@ public boolean equals(Object o) { return false; } DorisWriterState that = (DorisWriterState) o; - return Objects.equals(labelPrefix, that.labelPrefix); + return Objects.equals(labelPrefix, that.labelPrefix) + && Objects.equals(database, that.database) + && Objects.equals(table, that.table) + && Objects.equals(subtaskId, that.subtaskId); } @Override public int hashCode() { - return Objects.hash(labelPrefix); + return Objects.hash(labelPrefix, database, table, subtaskId); } @Override public String toString() { return "DorisWriterState{" + "labelPrefix='" + labelPrefix + '\'' + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", subtaskId=" + subtaskId + '}'; } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java index aed97b259..97f5cc131 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java @@ -30,9 +30,10 @@ */ public class DorisWriterStateSerializer implements SimpleVersionedSerializer { + private static final int VERSION = 2; @Override public int getVersion() { - return 1; + return VERSION; } @Override @@ -40,6 +41,9 @@ public byte[] serialize(DorisWriterState dorisWriterState) throws IOException { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { out.writeUTF(dorisWriterState.getLabelPrefix()); + out.writeUTF(dorisWriterState.getDatabase()); + out.writeUTF(dorisWriterState.getTable()); + out.writeInt(dorisWriterState.getSubtaskId()); out.flush(); return baos.toByteArray(); } @@ -49,8 +53,15 @@ public byte[] serialize(DorisWriterState dorisWriterState) throws IOException { public DorisWriterState deserialize(int version, byte[] serialized) throws IOException { try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); final DataInputStream in = new DataInputStream(bais)) { - final String labelPrefix = in.readUTF(); - return new DorisWriterState(labelPrefix); + String labelPrefix = in.readUTF(); + if(version == 1){ + return new DorisWriterState(labelPrefix); + }else { + final String database = in.readUTF(); + final String table = in.readUTF(); + final int subtaskId = in.readInt(); + return new DorisWriterState(labelPrefix, database, table, subtaskId); + } } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 55f781154..60f5595e6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -16,6 +16,8 @@ // under the License. package org.apache.doris.flink.sink.writer; +import org.apache.flink.util.Preconditions; + import java.util.UUID; /** @@ -24,14 +26,29 @@ public class LabelGenerator { private String labelPrefix; private boolean enable2PC; + private String tableIdentifier; + private int subtaskId; public LabelGenerator(String labelPrefix, boolean enable2PC) { this.labelPrefix = labelPrefix; this.enable2PC = enable2PC; } + public LabelGenerator(String labelPrefix, boolean enable2PC, String tableIdentifier, int subtaskId) { + this(labelPrefix, enable2PC); + // The label of stream load can not contain `.` + this.tableIdentifier = tableIdentifier.replace(".", "_"); + this.subtaskId = subtaskId; + } + public String generateLabel(long chkId) { - String label = labelPrefix + "_" + chkId; + String label = String.format("%s_%s_%s", labelPrefix, subtaskId, chkId); + return enable2PC ? label : label + "_" + UUID.randomUUID(); + } + + public String generateTableLabel(long chkId) { + Preconditions.checkState(tableIdentifier != null); + String label = String.format("%s_%s_%s_%s", labelPrefix, tableIdentifier, subtaskId, chkId); return enable2PC ? label : label + "_" + UUID.randomUUID(); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java new file mode 100644 index 000000000..a884ea213 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.batch.RecordWithMeta; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +import java.util.Properties; +import java.util.UUID; + + +public class DorisSinkStreamMultiTableExample { + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); +// config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setParallelism(1); + env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/"); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000))); + env.enableCheckpointing(10000); + DorisSink.Builder builder = DorisSink.builder(); + final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("line_delimiter", "\n"); + properties.setProperty("format", "csv"); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes("127.0.0.1:8030") + .setTableIdentifier("") + .setUsername("root") + .setPassword(""); + + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder.setLabelPrefix("xxx12") + .setStreamLoadProp(properties) + .setDeletable(false).enable2PC(); + + builder.setDorisReadOptions(readOptionBuilder.build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setDorisOptions(dorisBuilder.build()); + +// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); +// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); +// DataStreamSource stringDataStreamSource = env.fromCollection( +// Arrays.asList(record, record1)); +// stringDataStreamSource.sinkTo(builder.build()); + + env.addSource(new ParallelSourceFunction() { + private Long id = 1000000L; + @Override + public void run(SourceContext out) throws Exception { + while (true) { + id = id + 1; + RecordWithMeta record = new RecordWithMeta("test", "test_flink_a", UUID.randomUUID() + ",1"); + out.collect(record); + record = new RecordWithMeta("test", "test_flink_b", UUID.randomUUID() + ",2"); + out.collect(record); + if(id > 100){ + //mock dynamic add table + RecordWithMeta record3 = new RecordWithMeta("test", "test_flink_c", UUID.randomUUID() + ",1"); + out.collect(record3); + } + Thread.sleep(3000); + } + } + + @Override + public void cancel() { + + } + }).sinkTo(builder.build()); + + env.execute("doris stream multi table test"); + } + +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java index 67e8ba38d..99e984698 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java @@ -50,6 +50,27 @@ public class HttpTestUtil { "}\n" + "\n"; + public static final String LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE = "{\n" + + "\"TxnId\": 1,\n" + + "\"Label\": \"test001_db_table_0_1\",\n" + + "\"TwoPhaseCommit\": \"true\",\n" + + "\"Status\": \"Label Already Exists\",\n" + + "\"ExistingJobStatus\": \"PRECOMMITTED\",\n" + + "\"Message\": \"errCode = 2, detailMessage = Label [test001_db_table_0_1] has already been used, relate to txn [1]\",\n" + + "\"NumberTotalRows\": 0,\n" + + "\"NumberLoadedRows\": 0,\n" + + "\"NumberFilteredRows\": 0,\n" + + "\"NumberUnselectedRows\": 0,\n" + + "\"LoadBytes\": 0,\n" + + "\"LoadTimeMs\": 0,\n" + + "\"BeginTxnTimeMs\": 0,\n" + + "\"StreamLoadPutTimeMs\": 0,\n" + + "\"ReadDataTimeMs\": 0,\n" + + "\"WriteDataTimeMs\": 0,\n" + + "\"CommitAndPublishTimeMs\": 0\n" + + "}\n" + + "\n"; + public static final String PRE_COMMIT_RESPONSE = "{\n" + "\"TxnId\": 2,\n" + "\"Label\": \"test001_0_2\",\n" + @@ -70,6 +91,26 @@ public class HttpTestUtil { "}\n" + "\n"; + public static final String PRE_COMMIT_TABLE_RESPONSE = "{\n" + + "\"TxnId\": 2,\n" + + "\"Label\": \"test001_db_table_0_2\",\n" + + "\"TwoPhaseCommit\": \"true\",\n" + + "\"Status\": \"Success\",\n" + + "\"Message\": \"OK\",\n" + + "\"NumberTotalRows\": 0,\n" + + "\"NumberLoadedRows\": 0,\n" + + "\"NumberFilteredRows\": 0,\n" + + "\"NumberUnselectedRows\": 0,\n" + + "\"LoadBytes\": 0,\n" + + "\"LoadTimeMs\": 0,\n" + + "\"BeginTxnTimeMs\": 0,\n" + + "\"StreamLoadPutTimeMs\": 0,\n" + + "\"ReadDataTimeMs\": 0,\n" + + "\"WriteDataTimeMs\": 0,\n" + + "\"CommitAndPublishTimeMs\": 0\n" + + "}\n" + + "\n"; + public static final String ABORT_SUCCESS_RESPONSE = "{\n" + "\"status\": \"Success\",\n" + "\"msg\": \"transaction [1] abort successfully.\"\n" + diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index e295de11e..8f292e262 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -56,13 +56,13 @@ public void setUp() throws Exception{ @Test public void testAbortPreCommit() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); - CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_RESPONSE, true); - CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); + CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, true); + CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true); when(httpClient.execute(any())).thenReturn(existLabelResponse, preCommitResponse); - DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient)); + DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true, "db.table", 0), httpClient)); doNothing().when(dorisStreamLoad).abortTransaction(anyLong()); - dorisStreamLoad.abortPreCommit("test001_0", 1); + dorisStreamLoad.abortPreCommit("test001", 1); } @Test @@ -70,7 +70,7 @@ public void testAbortTransaction() throws Exception{ CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true); when(httpClient.execute(any())).thenReturn(abortSuccessResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient); dorisStreamLoad.abortTransaction(anyLong()); } @@ -79,7 +79,7 @@ public void testAbortTransactionFailed() throws Exception{ CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse abortFailedResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE, true); when(httpClient.execute(any())).thenReturn(abortFailedResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient); dorisStreamLoad.abortTransaction(anyLong()); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 01e1559ad..36f98c89a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -34,7 +34,9 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -61,13 +63,14 @@ public void testPrepareCommit() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); - + Map dorisStreamLoadMap = new ConcurrentHashMap<>(); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad); dorisStreamLoad.startLoad("", false); Sink.InitContext initContext = mock(Sink.InitContext.class); when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter dorisWriter = new DorisWriter(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); - dorisWriter.setDorisStreamLoad(dorisStreamLoad); + dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); dorisWriter.write("doris,1",null); Collection committableList = dorisWriter.prepareCommit(); Assert.assertEquals(1, committableList.size()); @@ -84,11 +87,14 @@ public void testSnapshot() throws Exception { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); + Map dorisStreamLoadMap = new ConcurrentHashMap<>(); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad); + Sink.InitContext initContext = mock(Sink.InitContext.class); when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter dorisWriter = new DorisWriter(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); - dorisWriter.setDorisStreamLoad(dorisStreamLoad); + dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); List writerStates = dorisWriter.snapshotState(1); Assert.assertEquals(1, writerStates.size()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java index 84695d7ec..fce2a268b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java @@ -26,9 +26,9 @@ public class TestDorisWriterStateSerializer { @Test public void testSerialize() throws Exception { - DorisWriterState expectDorisWriterState = new DorisWriterState("doris"); + DorisWriterState expectDorisWriterState = new DorisWriterState("doris", "db", "table", 0); DorisWriterStateSerializer serializer = new DorisWriterStateSerializer(); - DorisWriterState dorisWriterState = serializer.deserialize(1, serializer.serialize(expectDorisWriterState)); + DorisWriterState dorisWriterState = serializer.deserialize(2, serializer.serialize(expectDorisWriterState)); Assert.assertEquals(expectDorisWriterState, dorisWriterState); } }