Skip to content

Commit

Permalink
[improve] add multi table sink to DorisSink (apache#224)
Browse files Browse the repository at this point in the history
DorisSink supports multi-table import.

Example:
```java
 Configuration config = new Configuration();
//        config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/");
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000)));
        env.enableCheckpointing(10000);
        DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
        Properties properties = new Properties();
        properties.setProperty("column_separator", ",");
        properties.setProperty("line_delimiter", "\n");
        properties.setProperty("format", "csv");
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes("127.0.0.1:8030")
                .setTableIdentifier("")
                .setUsername("root")
                .setPassword("");

        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("xxx12")
                .setStreamLoadProp(properties)
                .setDeletable(false).enable2PC();

        builder.setDorisReadOptions(readOptionBuilder.build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisBuilder.build());

        RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
       RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
        DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
                Arrays.asList(record, record1));
        stringDataStreamSource.sinkTo(builder.build());
```
For details, please refer to `org.apache.doris.flink.DorisSinkStreamMultiTableExample.java`
  • Loading branch information
JNSimba authored Nov 6, 2023
1 parent a4b4bdf commit 0b2a14a
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public String getTableIdentifier() {
return tableIdentifier;
}

public void setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.doris.flink.sink;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;

Expand Down Expand Up @@ -62,6 +66,14 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
return backends;
}

public static BackendUtil getInstance(DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger){
if(StringUtils.isNotEmpty(dorisOptions.getBenodes())){
return new BackendUtil(dorisOptions.getBenodes());
} else {
return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger));
}
}

public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {
public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public String getDb() {
return db;
}

public String getTable() {
return table;
}

public String getHostPort() {
return hostPort;
}
Expand Down Expand Up @@ -141,7 +145,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
try {
// TODO: According to label abort txn. Currently, it can only be aborted based on txnid,
// so we must first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateLabel(startChkID);
String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
Expand Down Expand Up @@ -215,7 +219,7 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw

public RespContent stopLoad(String label) throws IOException{
recordStream.endInput();
LOG.info("stream load stopped for {} on host {}", label, hostPort);
LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort);
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
Expand All @@ -233,7 +237,7 @@ public void startLoad(String label, boolean isResume) throws IOException {
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
LOG.info("stream load started for {} on host {}", label, hostPort);
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder.setUrl(loadUrlStr)
Expand All @@ -247,7 +251,7 @@ public void startLoad(String label, boolean isResume) throws IOException {
putBuilder.enable2PC();
}
pendingLoadFuture = executorService.submit(() -> {
LOG.info("start execute load");
LOG.info("table {} start execute load", table);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 0b2a14a

Please sign in to comment.