builder = DorisSink.builder();
- builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(
- .setDorisOptions(dorisOptions)
- .setSerializer(
- JsonDebeziumSchemaSerializer.builder()
- .setDorisOptions(dorisOptions)
- .setNewSchemaChange(true)
- .build());
- env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // .print();
- .sinkTo(;
- env.execute("Print MySQL Snapshot + Binlog");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index 883e96b44..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import java.util.UUID;
-public class DorisDateAndTimestampSqlTest {
- public static void main(String[] args) {
- TableEnvironment tEnv =
- TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- tEnv.executeSql(
- "create table test_source ( "
- + " id INT, "
- + " score DECIMAL(10, 9), "
- + " submit_time TIMESTAMP "
- + " ) with ( "
- + " 'password'='', "
- + " 'connector'='doris', "
- + " 'fenodes'='FE_HOST:FE_PORT', "
- + " 'table.identifier'='db.source_table', "
- + " 'username'='root' "
- + ")");
- tEnv.executeSql(
- "create table test_sink ( "
- + " id INT, "
- + " score DECIMAL(10, 9), "
- + " submit_time DATE "
- + " ) with ( "
- + " 'password'='', "
- + " 'connector'='doris', "
- + " 'fenodes'='FE_HOST:FE_PORT', "
- + " 'sink.label-prefix' = 'label_"
- + UUID.randomUUID()
- + "' , "
- + " 'table.identifier'='db.sink_table', "
- + " 'username'='root' "
- + ")");
- tEnv.executeSql(
- "insert into "
- + " test_sink "
- + "select "
- + " id, "
- + " score,"
- + " to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time "
- + "from "
- + " test_source "
- + "where "
- + " submit_time>='2022-05-31 00:00:00'")
- .print();
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index cda0ae46f..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
- * When the flink connector accesses doris, it parses out all surviving BE nodes according to the FE
- * address filled in.
- *
- * However, when the BE node is deployed, most of the internal network IP is filled in, so the BE
- * node parsed by FE is the internal network IP. When flink is deployed on a non-intranet segment,
- * the BE node will be inaccessible on the network.
- *
- *
In this case, you can access the BE node on the intranet by directly configuring {@link new
- * DorisOptions.builder().setBenodes().build()}, after you configure this parameter, Flink Connector
- * will not parse all BE nodes through FE nodes.
- */
-public class DorisIntranetAccessSinkExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.enableCheckpointing(10000);
- env.getCheckpointConfig()
- .enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
- DorisSink.Builder builder = DorisSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setBenodes(",,")
- .setTableIdentifier("test.test_sink")
- .setUsername("root")
- .setPassword("");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .disable2PC()
- .setLabelPrefix("label-doris")
- .setStreamLoadProp(properties)
- .setBufferSize(8 * 1024)
- .setBufferCount(3);
- builder.setDorisReadOptions(
- .setDorisExecutionOptions(
- .setSerializer(new SimpleStringSerializer())
- .setDorisOptions(;
- List> data = new ArrayList<>();
- data.add(new Tuple2<>(1, "zhangsan"));
- data.add(new Tuple2<>(2, "lisi"));
- data.add(new Tuple2<>(3, "wangwu"));
- DataStreamSource> source = env.fromCollection(data);
-, String>) t -> t.f0 + "," + t.f1)
- .sinkTo(;
- env.execute("doris test");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index 307a08e71..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import java.util.UUID;
-public class DorisSinkArraySQLExample {
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql(
- "CREATE TABLE source (\n"
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + " `c_11` array ,\n"
- + " `c_12` array ,\n"
- + " `c_13` array ,\n"
- + " `c_14` array ,\n"
- + " `c_15` array ,\n"
- + " `c_16` array \n"
- + ") WITH (\n"
- + " 'connector' = 'datagen', \n"
- + " 'fields.c_7.element.min' = '1', \n"
- + " 'fields.c_7.element.max' = '10', \n"
- + " 'fields.c_8.element.min' = '1', \n"
- + " 'fields.c_8.element.max' = '10', \n"
- + " 'fields.c_14.element.length' = '10', \n"
- + " 'fields.c_15.element.length' = '10', \n"
- + " 'fields.c_16.element.length' = '10', \n"
- + " 'number-of-rows' = '5' \n"
- + ");");
- tEnv.executeSql(
- "CREATE TABLE source_doris ("
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array ,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + // ARRAY
- " `c_11` array ,\n"
- + // ARRAY
- " `c_12` array ,\n"
- + // ARRAY
- " `c_13` array ,\n"
- + // ARRAY
- " `c_14` array ,\n"
- + " `c_15` array ,\n"
- + " `c_16` array \n"
- + ") WITH ("
- + " 'connector' = 'doris',\n"
- + " 'fenodes' = '',\n"
- + " 'table.identifier' = 'test.array_test_type',\n"
- + " 'username' = 'root',\n"
- + " 'password' = ''\n"
- + ")");
- // define a dynamic aggregating query
- // final Table result = tEnv.sqlQuery("SELECT * from source_doris ");
- // print the result to the console
- // tEnv.toRetractStream(result, Row.class).print();
- // env.execute();
- tEnv.executeSql(
- "CREATE TABLE sink ("
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array ,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + // ARRAY
- " `c_11` array ,\n"
- + // ARRAY
- " `c_12` array ,\n"
- + // ARRAY
- " `c_13` array ,\n"
- + // ARRAY
- " `c_14` array ,\n"
- + " `c_15` array ,\n"
- + " `c_16` array \n"
- + ") "
- + "WITH (\n"
- + " 'connector' = 'doris',\n"
- + " 'fenodes' = '',\n"
- + " 'table.identifier' = 'test.array_test_type_sink',\n"
- + " 'username' = 'root',\n"
- + " 'password' = '',\n"
- + " 'sink.label-prefix' = 'doris_label4"
- + UUID.randomUUID()
- + "'"
- + ")");
- tEnv.executeSql("INSERT INTO sink select * from source_doris");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index 47add1211..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
-import org.apache.doris.flink.sink.writer.WriteMode;
-import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.UUID;
-public class DorisSinkBatchExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(5000);
- // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
- // Time.milliseconds(30000)));
- DorisSink.Builder builder = DorisSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setTableIdentifier("test.test_flink")
- .setUsername("root")
- .setPassword("");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .setLabelPrefix("label")
- .setStreamLoadProp(properties)
- .setDeletable(false)
- .setBufferFlushMaxBytes(8 * 1024)
- .setBufferFlushMaxRows(900)
- .setBufferFlushIntervalMs(1000 * 10)
- // .setBatchMode(true);
- .setWriteMode(WriteMode.STREAM_LOAD_BATCH);
- builder.setDorisReadOptions(
- .setDorisExecutionOptions(
- .setSerializer(new SimpleStringSerializer())
- .setDorisOptions(;
- env.addSource(
- new SourceFunction() {
- private Long id = 0L;
- @Override
- public void run(SourceContext out) throws Exception {
- while (true) {
- id = id + 1;
- String record = id + "," + UUID.randomUUID() + "," + id + "";
- out.collect(record);
- Thread.sleep(500);
- }
- }
- @Override
- public void cancel() {}
- })
- .sinkTo(;
- env.execute("doris batch test");
- }
- public void testBatchFlush() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DorisBatchSink.Builder builder = DorisBatchSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setTableIdentifier("test.testd")
- .setUsername("root")
- .setPassword("");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .setLabelPrefix("label")
- .setStreamLoadProp(properties)
- .setDeletable(false)
- .setBufferFlushMaxBytes(8 * 1024)
- .setBufferFlushMaxRows(1)
- .setBufferFlushIntervalMs(1000 * 10);
- builder.setDorisReadOptions(
- .setDorisExecutionOptions(
- .setSerializer(new SimpleStringSerializer())
- .setDorisOptions(;
- DataStreamSource stringDataStreamSource =
- env.fromCollection(
- Arrays.asList(
- "1,-74159.9193252453",
- "2,-74159.9193252453",
- "3,-19.7004480979",
- "4,43385.2170333507",
- "5,-16.2602598554"));
- stringDataStreamSource.sinkTo(;
- env.execute("doris batch test");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index 3e908f7d6..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-public class DorisSinkExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.enableCheckpointing(10000);
- env.getCheckpointConfig()
- .enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
- DorisSink.Builder builder = DorisSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setTableIdentifier("db.table")
- .setUsername("test")
- .setPassword("test");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .setLabelPrefix("label-doris")
- .setStreamLoadProp(properties)
- .setBufferSize(8 * 1024)
- .setBufferCount(3);
- builder.setDorisReadOptions(
- .setDorisExecutionOptions(
- .setSerializer(new SimpleStringSerializer())
- .setDorisOptions(;
- List> data = new ArrayList<>();
- data.add(new Tuple2<>("doris", 1));
- DataStreamSource> source = env.fromCollection(data);
-, String>) t -> t.f0 + "," + t.f1)
- .sinkTo(;
- env.execute("doris test");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index e19148016..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.Collector;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.LoadConstants;
-import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
-import java.util.Properties;
-import java.util.UUID;
-public class DorisSinkExampleRowData {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.enableCheckpointing(10000);
- env.setParallelism(1);
- env.getCheckpointConfig()
- .enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
- DorisSink.Builder builder = DorisSink.builder();
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- // properties.setProperty("read_json_by_line", "true");
- // properties.setProperty("format", "json");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setTableIdentifier("db.tbl")
- .setUsername("root")
- .setPassword("");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
- // flink rowdata‘s schema
- String[] fields = {"name", "age"};
- DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
- builder.setDorisExecutionOptions(
- .setSerializer(
- RowDataSerializer.builder() // serialize according to rowdata
- .setType(LoadConstants.CSV) // .setType(LoadConstants.CSV)
- .setFieldDelimiter(",")
- .setFieldNames(fields) // .setFieldDelimiter(",")
- .setFieldType(types)
- .build())
- .setDorisOptions(;
- // mock rowdata source
- DataStream source =
- env.fromElements("")
- .flatMap(
- new FlatMapFunction() {
- @Override
- public void flatMap(String s, Collector out)
- throws Exception {
- GenericRowData genericRowData = new GenericRowData(2);
- genericRowData.setField(
- 0, StringData.fromString("beijing"));
- genericRowData.setField(1, 123);
- out.collect(genericRowData);
- GenericRowData genericRowData2 = new GenericRowData(2);
- genericRowData2.setField(
- 0, StringData.fromString("shanghai"));
- genericRowData2.setField(1, 1234);
- out.collect(genericRowData2);
- }
- });
- source.sinkTo(;
- env.execute("doris test");
- }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/ b/flink-doris-connector/src/test/java/org/apache/doris/flink/
deleted file mode 100644
index f78390ef8..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/
+++ /dev/null
@@ -1,115 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
-import org.apache.doris.flink.sink.batch.RecordWithMeta;
-import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
-import java.util.Properties;
-import java.util.UUID;
-public class DorisSinkMultiTableExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DorisBatchSink.Builder builder = DorisBatchSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("")
- .setTableIdentifier("test.test_flink_tmp")
- .setUsername("root")
- .setPassword("");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .setLabelPrefix("label")
- .setStreamLoadProp(properties)
- .setDeletable(false)
- .setBufferFlushMaxBytes(8 * 1024)
- .setBufferFlushMaxRows(10)
- .setBufferFlushIntervalMs(1000 * 10);
- builder.setDorisReadOptions(
- .setDorisExecutionOptions(
- .setDorisOptions(
- .setSerializer(new RecordWithMetaSerializer());
- // RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
- // RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
- // DataStreamSource stringDataStreamSource = env.fromCollection(
- // Arrays.asList(record, record1));
- // stringDataStreamSource.sinkTo(;
- env.addSource(
- new SourceFunction() {
- private Long id = 1000000L;
- @Override
- public void run(SourceContext