Skip to content

Commit

Permalink
improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 9, 2024
1 parent ca699b0 commit e9234f3
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 22 deletions.
6 changes: 6 additions & 0 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void setup()
props.put("sink.enable-2pc", "false");
catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props);
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM);
// Use doris catalog.
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public abstract class AbstractContainerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class);
private static ContainerService dorisContainerService;
public static final int DEFAULT_PARALLELISM = 2;

@BeforeClass
public static void initContainers() {
Expand Down Expand Up @@ -88,4 +95,20 @@ private static void closeDorisContainer() {
dorisContainerService.close();
LOG.info("Doris container was closed.");
}

// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}

public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected void cancelE2EJob(String jobName) {

private StreamExecutionEnvironment configFlinkEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,11 @@ protected static void triggerJobManagerFailover(
LOG.info("flink cluster will grant job master leadership. jobId={}", jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

protected void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
initializeDorisTable();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(2);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ private void submitJob(
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(DEFAULT_PARALLELISM);
Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

Expand All @@ -147,7 +148,7 @@ private void submitJob(
public void testTableSinkJsonFormat() throws Exception {
initializeTable(TABLE_JSON_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -196,7 +197,7 @@ public void testTableSinkJsonFormat() throws Exception {
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -244,6 +245,7 @@ public void testDataStreamBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_DS);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(DEFAULT_PARALLELISM);
DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();

DorisOptions.Builder dorisBuilder = DorisOptions.builder();
Expand Down Expand Up @@ -283,7 +285,7 @@ public void testDataStreamBatch() throws Exception {
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -332,7 +334,7 @@ public void testTableGroupCommit() throws Exception {
public void testTableGzFormat() throws Exception {
initializeTable(TABLE_GZ_FORMAT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -374,7 +376,7 @@ public void testJobManagerFailoverSink() throws Exception {
LOG.info("start to test JobManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Expand Down Expand Up @@ -434,7 +436,7 @@ public void testTaskManagerFailoverSink() throws Exception {
LOG.info("start to test TaskManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Expand Down Expand Up @@ -486,13 +488,6 @@ public void testTaskManagerFailoverSink() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}

private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.flink.source;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand All @@ -40,6 +42,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
Expand All @@ -56,13 +59,15 @@ public class DorisSourceITCase extends AbstractITCaseService {
private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
static final String TABLE_CSV_TM = "tbl_csv_tm_source";

@Test
public void testSource() throws Exception {
initializeTable(TABLE_READ);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

env.setParallelism(DEFAULT_PARALLELISM);
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes(getFenodes())
Expand Down Expand Up @@ -91,6 +96,7 @@ public void testSource() throws Exception {
public void testOldSourceApi() throws Exception {
initializeTable(TABLE_READ_OLD_API);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
Properties properties = new Properties();
properties.put("fenodes", getFenodes());
properties.put("username", getDorisUsername());
Expand All @@ -116,7 +122,7 @@ options, new SimpleListDeserializationSchema()))
public void testTableSource() throws Exception {
initializeTable(TABLE_READ_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -165,7 +171,7 @@ public void testTableSource() throws Exception {
public void testTableSourceOldApi() throws Exception {
initializeTable(TABLE_READ_TBL_OLD_API);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
Expand Down Expand Up @@ -202,7 +208,7 @@ public void testTableSourceOldApi() throws Exception {
public void testTableSourceAllOptions() throws Exception {
initializeTable(TABLE_READ_TBL_ALL_OPTIONS);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
Expand Down Expand Up @@ -248,7 +254,7 @@ public void testTableSourceAllOptions() throws Exception {
public void testTableSourceFilterAndProjectionPushDown() throws Exception {
initializeTable(TABLE_READ_TBL_PUSH_DOWN);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
Expand Down Expand Up @@ -287,7 +293,7 @@ public void testTableSourceFilterWithUnionAll() {
LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
Expand Down Expand Up @@ -328,6 +334,98 @@ public void testTableSourceFilterWithUnionAll() {
}
}

@Test
public void testJobManagerFailoverSource() throws Exception {
LOG.info("start to test JobManagerFailoverSource.");
initializeTable(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
String.format(
"CREATE TABLE doris_source_jm ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_CSV_JM,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from doris_source_jm");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
expectedSnapshotData.addAll(
Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"));

if (iterator.hasNext()) {
LOG.info("trigger jobmanager failover...");
triggerFailover(
FailoverType.JM,
jobId,
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}

assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
}

@Test
public void testTaskManagerFailoverSink() throws Exception {
LOG.info("start to test TaskManagerFailoverSink.");
initializeTable(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
String.format(
"CREATE TABLE doris_source_tm ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_CSV_TM,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from doris_source_tm");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
expectedSnapshotData.addAll(
Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"));

if (iterator.hasNext()) {
LOG.info("trigger taskmanager failover...");
triggerFailover(
FailoverType.TM,
jobId,
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}

assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
}

private void checkResult(String testName, Object[] expected, Object[] actual) {
LOG.info(
"Checking DorisSourceITCase result. testName={}, actual={}, expected={}",
Expand All @@ -347,7 +445,7 @@ private void initializeTable(String table) {
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
Expand All @@ -356,4 +454,14 @@ private void initializeTable(String table) {
String.format("insert into %s.%s values ('flink',10)", DATABASE, table),
String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}
Loading

0 comments on commit e9234f3

Please sign in to comment.