Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 9, 2024
1 parent 19786a9 commit e7a679b
Showing 1 changed file with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
@Test
public void testJobManagerFailoverSource() throws Exception {
LOG.info("start to test JobManagerFailoverSource.");
initializeTable(TABLE_CSV_JM);
initializeTableWithData(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200L);
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
Expand Down Expand Up @@ -391,12 +391,12 @@ public void testJobManagerFailoverSource() throws Exception {
}

@Test
public void testTaskManagerFailoverSounce() throws Exception {
public void testTaskManagerFailoverSource() throws Exception {
LOG.info("start to test TaskManagerFailoverSource.");
initializeTable(TABLE_CSV_TM);
initializeTableWithData(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200L);
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
Expand Down Expand Up @@ -476,6 +476,43 @@ private void initializeTable(String table) {
String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
}

private void initializeTableWithData(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
String.format(
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE, table),
String.format("insert into %s.%s values ('1',1),('2',2),('3',3)", DATABASE, table),
String.format(
"insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table),
String.format(
"insert into %s.%s values ('201',2),('202',2),('203',2)", DATABASE, table),
String.format(
"insert into %s.%s values ('301',3),('302',3),('303',3)", DATABASE, table),
String.format(
"insert into %s.%s values ('401',4),('402',4),('403',4)", DATABASE, table),
String.format(
"insert into %s.%s values ('501',5),('502',5),('503',5)", DATABASE, table),
String.format(
"insert into %s.%s values ('601',6),('602',6),('603',6)", DATABASE, table),
String.format(
"insert into %s.%s values ('701',7),('702',7),('703',7)", DATABASE, table),
String.format(
"insert into %s.%s values ('801',8),('802',8),('803',8)", DATABASE, table),
String.format(
"insert into %s.%s values ('901',9),('902',9),('903',9)",
DATABASE, table));
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Expand Down

0 comments on commit e7a679b

Please sign in to comment.