From f15d14592e4ee8e5eae8291a0ad71cb3e4afc46e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 26 Jan 2024 16:02:06 +0800 Subject: [PATCH] add DorisAbstractWriter, Unify DorisSink and DorisBatchSink --- .../flink/cfg/DorisExecutionOptions.java | 20 ++++++++- .../apache/doris/flink/sink/DorisSink.java | 45 ++++++++++--------- .../flink/sink/batch/DorisBatchSink.java | 1 - .../flink/sink/batch/DorisBatchWriter.java | 26 +++++++++-- .../sink/writer/DorisAbstractWriter.java | 26 +++++++++++ .../doris/flink/sink/writer/DorisWriter.java | 9 ++-- .../doris/flink/sink/writer/WriteMode.java | 24 ++++++++++ .../doris/flink/table/DorisConfigOptions.java | 8 ++++ .../flink/table/DorisDynamicTableFactory.java | 4 ++ .../flink/table/DorisDynamicTableSink.java | 25 +++-------- 10 files changed, 138 insertions(+), 50 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index b86bcd421..a890d3497 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -19,6 +19,8 @@ import org.apache.flink.util.Preconditions; +import org.apache.doris.flink.sink.writer.WriteMode; + import java.io.Serializable; import java.util.Properties; @@ -60,6 +62,7 @@ public class DorisExecutionOptions implements Serializable { private final long bufferFlushIntervalMs; private final boolean enableBatchMode; private final boolean ignoreUpdateBefore; + private final WriteMode writeMode; public DorisExecutionOptions( int checkInterval, @@ -77,7 +80,8 @@ public DorisExecutionOptions( int bufferFlushMaxBytes, long bufferFlushIntervalMs, boolean ignoreUpdateBefore, - boolean force2PC) { + boolean force2PC, + WriteMode writeMode) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -97,6 +101,7 @@ public DorisExecutionOptions( this.bufferFlushIntervalMs = bufferFlushIntervalMs; this.ignoreUpdateBefore = ignoreUpdateBefore; + this.writeMode = writeMode; } public static Builder builder() { @@ -196,6 +201,10 @@ public boolean force2PC() { return force2PC; } + public WriteMode getWriteMode() { + return writeMode; + } + /** Builder of {@link DorisExecutionOptions}. */ public static class Builder { private int checkInterval = DEFAULT_CHECK_INTERVAL; @@ -219,6 +228,7 @@ public static class Builder { private boolean enableBatchMode = false; private boolean ignoreUpdateBefore = true; + private WriteMode writeMode = WriteMode.STREAM_LOAD; public Builder setCheckInterval(Integer checkInterval) { this.checkInterval = checkInterval; @@ -305,6 +315,11 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) { return this; } + public Builder setWriteMode(WriteMode writeMode) { + this.writeMode = writeMode; + return this; + } + public DorisExecutionOptions build() { // If format=json is set but read_json_by_line is not set, record may not be written. if (streamLoadProp != null @@ -328,7 +343,8 @@ public DorisExecutionOptions build() { bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, - force2PC); + force2PC, + writeMode); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index 3da50b22d..6a3477651 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -27,10 +27,13 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.sink.batch.DorisBatchWriter; import org.apache.doris.flink.sink.committer.DorisCommitter; +import org.apache.doris.flink.sink.writer.DorisAbstractWriter; import org.apache.doris.flink.sink.writer.DorisWriter; import org.apache.doris.flink.sink.writer.DorisWriterState; import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer; +import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,16 +79,8 @@ private void checkKeyType() { } @Override - public DorisWriter createWriter(InitContext initContext) throws IOException { - DorisWriter dorisWriter = - new DorisWriter<>( - initContext, - Collections.emptyList(), - serializer, - dorisOptions, - dorisReadOptions, - dorisExecutionOptions); - return dorisWriter; + public DorisAbstractWriter createWriter(InitContext initContext) throws IOException { + return getDorisAbstractWriter(initContext, Collections.emptyList()); } @Override @@ -95,18 +90,28 @@ public Committer createCommitter() throws IOException { } @Override - public DorisWriter restoreWriter( + public DorisAbstractWriter restoreWriter( InitContext initContext, Collection recoveredState) throws IOException { - DorisWriter dorisWriter = - new DorisWriter<>( - initContext, - recoveredState, - serializer, - dorisOptions, - dorisReadOptions, - dorisExecutionOptions); - return dorisWriter; + return getDorisAbstractWriter(initContext, recoveredState); + } + + public DorisAbstractWriter getDorisAbstractWriter( + InitContext initContext, Collection states) { + if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisWriter<>( + initContext, + states, + serializer, + dorisOptions, + dorisReadOptions, + dorisExecutionOptions); + } else if (WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisBatchWriter<>( + initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); + } + throw new IllegalArgumentException( + "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java index d9a424b9b..2a9482057 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java @@ -54,7 +54,6 @@ public SinkWriter createWriter(InitContext initContext) throws IOException { dorisOptions, dorisReadOptions, dorisExecutionOptions); - dorisBatchWriter.initializeLoad(); return dorisBatchWriter; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 6a6576c44..4b48436ce 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -18,7 +18,6 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -26,6 +25,9 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisCommittable; +import org.apache.doris.flink.sink.writer.DorisAbstractWriter; +import org.apache.doris.flink.sink.writer.DorisWriterState; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; @@ -33,11 +35,17 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class DorisBatchWriter implements SinkWriter { +/** Doris Batch StreamLoad. */ +public class DorisBatchWriter + implements DorisAbstractWriter { private static final Logger LOG = LoggerFactory.getLogger(DorisBatchWriter.class); private DorisBatchStreamLoad batchStreamLoad; private final DorisOptions dorisOptions; @@ -77,10 +85,11 @@ public DorisBatchWriter( this.dorisReadOptions = dorisReadOptions; this.executionOptions = executionOptions; this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs(); + initializeLoad(); serializer.initial(); } - public void initializeLoad() throws IOException { + public void initializeLoad() { this.batchStreamLoad = new DorisBatchStreamLoad( dorisOptions, dorisReadOptions, executionOptions, labelGenerator); @@ -113,6 +122,17 @@ public void flush(boolean flush) throws IOException, InterruptedException { batchStreamLoad.flush(null, true); } + @Override + public Collection prepareCommit() throws IOException, InterruptedException { + // nothing to commit + return Collections.emptyList(); + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + return new ArrayList<>(); + } + public void writeOneDorisRecord(DorisRecord record) throws InterruptedException { if (record == null || record.getRow() == null) { // ddl or value is null diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java new file mode 100644 index 000000000..2e307d611 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; + +/** Abstract for different Doris Writer: Stream Load, Copy ... */ +public interface DorisAbstractWriter + extends StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter {} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 5b8b4fc2f..c6f81245e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -19,8 +19,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -61,8 +59,7 @@ * @param */ public class DorisWriter - implements StatefulSink.StatefulSinkWriter, - TwoPhaseCommittingSink.PrecommittingSinkWriter { + implements DorisAbstractWriter { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); @@ -169,7 +166,7 @@ private void abortLingeringTransactions(Collection recoveredSt } @Override - public void write(IN in, Context context) throws IOException { + public void write(IN in, Context context) throws IOException, InterruptedException { checkLoadException(); writeOneDorisRecord(serializer.serialize(in)); } @@ -179,7 +176,7 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { writeOneDorisRecord(serializer.flush()); } - public void writeOneDorisRecord(DorisRecord record) throws IOException { + public void writeOneDorisRecord(DorisRecord record) throws IOException, InterruptedException { if (record == null || record.getRow() == null) { // ddl or value is null diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java new file mode 100644 index 000000000..92eedefde --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +public enum WriteMode { + STREAM_LOAD, + STREAM_LOAD_BATCH, + COPY +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index e368e3126..2c7c753c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -21,6 +21,8 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.doris.flink.sink.writer.WriteMode; + import java.time.Duration; import java.util.Map; import java.util.Properties; @@ -227,6 +229,12 @@ public class DorisConfigOptions { .defaultValue(true) .withDescription("whether to enable the delete function"); + public static final ConfigOption SINK_WRITE_MODE = + ConfigOptions.key("sink.write-mode") + .stringType() + .defaultValue(WriteMode.STREAM_LOAD.name()) + .withDescription("Write mode, supports stream_load, stream_load_batch"); + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; public static final ConfigOption SINK_ENABLE_BATCH_MODE = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 49ec868b2..09d4f3a85 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -32,6 +32,7 @@ import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.writer.WriteMode; import java.util.HashSet; import java.util.Map; @@ -77,6 +78,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -153,6 +155,7 @@ public Set> optionalOptions() { options.add(SINK_USE_CACHE); options.add(SOURCE_USE_OLD_API); + options.add(SINK_WRITE_MODE); return options; } @@ -238,6 +241,7 @@ private DorisExecutionOptions getDorisExecutionOptions( builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); builder.setUseCache(readableConfig.get(SINK_USE_CACHE)); + builder.setWriteMode(WriteMode.valueOf(readableConfig.get(SINK_WRITE_MODE).toUpperCase())); return builder.build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 83c10a90b..de5b32f42 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -29,7 +29,6 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.DorisSink; -import org.apache.doris.flink.sink.batch.DorisBatchSink; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,23 +108,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setFieldDelimiter( loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT)); - if (!executionOptions.enableBatchMode()) { - DorisSink.Builder dorisSinkBuilder = DorisSink.builder(); - dorisSinkBuilder - .setDorisOptions(options) - .setDorisReadOptions(readOptions) - .setDorisExecutionOptions(executionOptions) - .setSerializer(serializerBuilder.build()); - return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism); - } else { - DorisBatchSink.Builder dorisBatchSinkBuilder = DorisBatchSink.builder(); - dorisBatchSinkBuilder - .setDorisOptions(options) - .setDorisReadOptions(readOptions) - .setDorisExecutionOptions(executionOptions) - .setSerializer(serializerBuilder.build()); - return SinkV2Provider.of(dorisBatchSinkBuilder.build(), sinkParallelism); - } + DorisSink.Builder dorisSinkBuilder = DorisSink.builder(); + dorisSinkBuilder + .setDorisOptions(options) + .setDorisReadOptions(readOptions) + .setDorisExecutionOptions(executionOptions) + .setSerializer(serializerBuilder.build()); + return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism); } @Override