diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 98b5d459b..3af5dad0e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -345,10 +345,10 @@ public void testTableSourceFilterWithUnionAll() throws Exception { @Test public void testJobManagerFailoverSource() throws Exception { LOG.info("start to test JobManagerFailoverSource."); - initializeTable(TABLE_CSV_JM); + initializeTableWithData(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(200L); + env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -391,12 +391,12 @@ public void testJobManagerFailoverSource() throws Exception { } @Test - public void testTaskManagerFailoverSounce() throws Exception { + public void testTaskManagerFailoverSource() throws Exception { LOG.info("start to test TaskManagerFailoverSource."); - initializeTable(TABLE_CSV_TM); + initializeTableWithData(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(200L); + env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -476,6 +476,43 @@ private void initializeTable(String table) { String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } + private void initializeTableWithData(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, table), + String.format("insert into %s.%s values ('1',1),('2',2),('3',3)", DATABASE, table), + String.format( + "insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table), + String.format( + "insert into %s.%s values ('201',2),('202',2),('203',2)", DATABASE, table), + String.format( + "insert into %s.%s values ('301',3),('302',3),('303',3)", DATABASE, table), + String.format( + "insert into %s.%s values ('401',4),('402',4),('403',4)", DATABASE, table), + String.format( + "insert into %s.%s values ('501',5),('502',5),('503',5)", DATABASE, table), + String.format( + "insert into %s.%s values ('601',6),('602',6),('603',6)", DATABASE, table), + String.format( + "insert into %s.%s values ('701',7),('702',7),('703',7)", DATABASE, table), + String.format( + "insert into %s.%s values ('801',8),('802',8),('803',8)", DATABASE, table), + String.format( + "insert into %s.%s values ('901',9),('902',9),('903',9)", + DATABASE, table)); + } + private static List fetchRows(Iterator iter, int size) { List rows = new ArrayList<>(size); while (size > 0 && iter.hasNext()) {