builder = DorisSink.builder();
- builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setDorisOptions(dorisOptions)
- .setSerializer(
- JsonDebeziumSchemaSerializer.builder()
- .setDorisOptions(dorisOptions)
- .setNewSchemaChange(true)
- .build());
-
- env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // .print();
- .sinkTo(builder.build());
-
- env.execute("Print MySQL Snapshot + Binlog");
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CatalogExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CatalogExample.java
deleted file mode 100644
index 3cdf30ec9..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/CatalogExample.java
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.example;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-public class CatalogExample {
-
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql(
- "CREATE CATALOG doris_catalog WITH(\n"
- + "'type' = 'doris',\n"
- + "'default-database' = 'test',\n"
- + "'username' = 'root',\n"
- + "'password' = '',\n"
- + "'fenodes' = '1127.0.0.1:8030',\n"
- + "'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n"
- + "'sink.label-prefix' = 'label'\n"
- + ")");
- // define a dynamic aggregating query
- final Table result = tEnv.sqlQuery("SELECT * from doris_catalog.test.type_test");
-
- // print the result to the console
- tEnv.toRetractStream(result, Row.class).print();
- env.execute();
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisDateAndTimestampSqlTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisDateAndTimestampSqlTest.java
deleted file mode 100644
index f904ef44e..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisDateAndTimestampSqlTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.example;
-
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-
-import java.util.UUID;
-
-public class DorisDateAndTimestampSqlTest {
-
- public static void main(String[] args) {
- TableEnvironment tEnv =
- TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- tEnv.executeSql(
- "create table test_source ( "
- + " id INT, "
- + " score DECIMAL(10, 9), "
- + " submit_time TIMESTAMP "
- + " ) with ( "
- + " 'password'='', "
- + " 'connector'='doris', "
- + " 'fenodes'='FE_HOST:FE_PORT', "
- + " 'table.identifier'='db.source_table', "
- + " 'username'='root' "
- + ")");
-
- tEnv.executeSql(
- "create table test_sink ( "
- + " id INT, "
- + " score DECIMAL(10, 9), "
- + " submit_time DATE "
- + " ) with ( "
- + " 'password'='', "
- + " 'connector'='doris', "
- + " 'fenodes'='FE_HOST:FE_PORT', "
- + " 'sink.label-prefix' = 'label_"
- + UUID.randomUUID()
- + "' , "
- + " 'table.identifier'='db.sink_table', "
- + " 'username'='root' "
- + ")");
- tEnv.executeSql(
- "insert into "
- + " test_sink "
- + "select "
- + " id, "
- + " score,"
- + " to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time "
- + "from "
- + " test_source "
- + "where "
- + " submit_time>='2022-05-31 00:00:00'")
- .print();
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisIntranetAccessSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisIntranetAccessSinkExample.java
deleted file mode 100644
index debc81406..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisIntranetAccessSinkExample.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.example;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * When the flink connector accesses doris, it parses out all surviving BE nodes according to the FE
- * address filled in.
- *
- * However, when the BE node is deployed, most of the internal network IP is filled in, so the BE
- * node parsed by FE is the internal network IP. When flink is deployed on a non-intranet segment,
- * the BE node will be inaccessible on the network.
- *
- *
In this case, you can access the BE node on the intranet by directly configuring {@link new
- * DorisOptions.builder().setBenodes().build()}, after you configure this parameter, Flink Connector
- * will not parse all BE nodes through FE nodes.
- */
-public class DorisIntranetAccessSinkExample {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.enableCheckpointing(10000);
- env.getCheckpointConfig()
- .enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
-
- DorisSink.Builder builder = DorisSink.builder();
- final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
-
- Properties properties = new Properties();
- properties.setProperty("column_separator", ",");
- properties.setProperty("line_delimiter", "\n");
- properties.setProperty("format", "csv");
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes("10.20.30.1:8030")
- .setBenodes("10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040")
- .setTableIdentifier("test.test_sink")
- .setUsername("root")
- .setPassword("");
-
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder
- .disable2PC()
- .setLabelPrefix("label-doris")
- .setStreamLoadProp(properties)
- .setBufferSize(8 * 1024)
- .setBufferCount(3);
-
- builder.setDorisReadOptions(readOptionBuilder.build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setSerializer(new SimpleStringSerializer())
- .setDorisOptions(dorisBuilder.build());
-
- List> data = new ArrayList<>();
- data.add(new Tuple2<>(1, "zhangsan"));
- data.add(new Tuple2<>(2, "lisi"));
- data.add(new Tuple2<>(3, "wangwu"));
- DataStreamSource> source = env.fromCollection(data);
- source.map((MapFunction, String>) t -> t.f0 + "," + t.f1)
- .sinkTo(builder.build());
- env.execute("doris test");
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkArraySQLExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkArraySQLExample.java
deleted file mode 100644
index 99514d328..000000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkArraySQLExample.java
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.example;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-import java.util.UUID;
-
-public class DorisSinkArraySQLExample {
-
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql(
- "CREATE TABLE source (\n"
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + " `c_11` array ,\n"
- + " `c_12` array ,\n"
- + " `c_13` array ,\n"
- + " `c_14` array ,\n"
- + " `c_15` array ,\n"
- + " `c_16` array \n"
- + ") WITH (\n"
- + " 'connector' = 'datagen', \n"
- + " 'fields.c_7.element.min' = '1', \n"
- + " 'fields.c_7.element.max' = '10', \n"
- + " 'fields.c_8.element.min' = '1', \n"
- + " 'fields.c_8.element.max' = '10', \n"
- + " 'fields.c_14.element.length' = '10', \n"
- + " 'fields.c_15.element.length' = '10', \n"
- + " 'fields.c_16.element.length' = '10', \n"
- + " 'number-of-rows' = '5' \n"
- + ");");
-
- tEnv.executeSql(
- "CREATE TABLE source_doris ("
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array ,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + // ARRAY
- " `c_11` array ,\n"
- + // ARRAY
- " `c_12` array ,\n"
- + // ARRAY
- " `c_13` array ,\n"
- + // ARRAY
- " `c_14` array ,\n"
- + " `c_15` array ,\n"
- + " `c_16` array \n"
- + ") WITH ("
- + " 'connector' = 'doris',\n"
- + " 'fenodes' = '127.0.0.1:8030',\n"
- + " 'table.identifier' = 'test.array_test_type',\n"
- + " 'username' = 'root',\n"
- + " 'password' = ''\n"
- + ")");
-
- // define a dynamic aggregating query
- // final Table result = tEnv.sqlQuery("SELECT * from source_doris ");
-
- // print the result to the console
- // tEnv.toRetractStream(result, Row.class).print();
- // env.execute();
-
- tEnv.executeSql(
- "CREATE TABLE sink ("
- + " `id` int,\n"
- + " `c_1` array ,\n"
- + " `c_2` array ,\n"
- + " `c_3` array ,\n"
- + " `c_4` array ,\n"
- + " `c_5` array ,\n"
- + " `c_6` array ,\n"
- + " `c_7` array ,\n"
- + " `c_8` array ,\n"
- + " `c_9` array ,\n"
- + " `c_10` array ,\n"
- + // ARRAY
- " `c_11` array ,\n"
- + // ARRAY
- " `c_12` array ,\n"
- + // ARRAY
- " `c_13` array