From 46d4567baeeb525acbb5590e26727b646880ab20 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 11 Sep 2024 10:37:28 +0800 Subject: [PATCH] [Improve](test) improve some ut and itcase (#486) --- flink-doris-connector/pom.xml | 6 + .../flink/catalog/DorisCatalogITCase.java | 2 +- .../container/AbstractContainerTestBase.java | 23 ++ .../flink/container/AbstractE2EService.java | 1 + .../container/AbstractITCaseService.java | 21 +- .../container/e2e/Doris2DorisE2ECase.java | 2 +- .../doris/flink/sink/DorisSinkITCase.java | 35 ++- .../doris/flink/source/DorisSourceITCase.java | 240 ++++++++++++++++-- .../enumerator/DorisSourceEnumeratorTest.java | 111 ++++++++ 9 files changed, 388 insertions(+), 53 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 64ca73920..d773339b3 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -423,6 +423,12 @@ under the License. ${flink.version} test + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + com.github.jsqlparser jsqlparser diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java index b3a3ce04f..099f6ebd6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java @@ -146,7 +146,7 @@ public void setup() props.put("sink.enable-2pc", "false"); catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props); this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); - tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM); // Use doris catalog. tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); tEnv.useCatalog(TEST_CATALOG_NAME); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 967e6f363..61e0faac8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -24,11 +24,18 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public abstract class AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class); private static ContainerService dorisContainerService; + public static final int DEFAULT_PARALLELISM = 2; @BeforeClass public static void initContainers() { @@ -88,4 +95,20 @@ private static void closeDorisContainer() { dorisContainerService.close(); LOG.info("Doris container was closed."); } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new Object[0])); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java index 527f82cc5..ec536ee68 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java @@ -113,6 +113,7 @@ protected void cancelE2EJob(String jobName) { private StreamExecutionEnvironment configFlinkEnvironment() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); Map flinkMap = new HashMap<>(); flinkMap.put("execution.checkpointing.interval", "10s"); flinkMap.put("pipeline.operator-chaining", "false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index 956b8be65..6628933c5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -23,12 +23,8 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.SupplierWithException; -import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,16 +81,6 @@ protected static void waitUntilCondition( } } - @Rule - public final MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(2) - .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .withHaLeadershipControl() - .build()); - /** The type of failover. */ protected enum FailoverType { TM, @@ -138,4 +124,11 @@ protected static void triggerJobManagerFailover( LOG.info("flink cluster will grant job master leadership. jobId={}", jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); } + + protected void sleepMs(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index fcb4858a8..4b4e3b26a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -55,7 +55,7 @@ public void testDoris2Doris() throws Exception { LOG.info("Start executing the test case of doris to doris."); initializeDorisTable(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 50bcf6be1..96562fa40 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -22,8 +22,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -35,6 +38,7 @@ import org.apache.doris.flink.sink.batch.DorisBatchSink; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.doris.flink.utils.MockSource; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +69,16 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(2) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + @Test public void testSinkCsvFormat() throws Exception { initializeTable(TABLE_CSV); @@ -131,6 +145,7 @@ private void submitJob( throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(DEFAULT_PARALLELISM); Builder builder = DorisSink.builder(); final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); @@ -147,7 +162,7 @@ private void submitJob( public void testTableSinkJsonFormat() throws Exception { initializeTable(TABLE_JSON_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -196,7 +211,7 @@ public void testTableSinkJsonFormat() throws Exception { public void testTableBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -244,6 +259,7 @@ public void testDataStreamBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_DS); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(DEFAULT_PARALLELISM); DorisBatchSink.Builder builder = DorisBatchSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); @@ -283,7 +299,7 @@ public void testDataStreamBatch() throws Exception { public void testTableGroupCommit() throws Exception { initializeTable(TABLE_GROUP_COMMIT); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -332,7 +348,7 @@ public void testTableGroupCommit() throws Exception { public void testTableGzFormat() throws Exception { initializeTable(TABLE_GZ_FORMAT); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -374,7 +390,7 @@ public void testJobManagerFailoverSink() throws Exception { LOG.info("start to test JobManagerFailoverSink."); initializeFailoverTable(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); @@ -434,7 +450,7 @@ public void testTaskManagerFailoverSink() throws Exception { LOG.info("start to test TaskManagerFailoverSink."); initializeFailoverTable(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); @@ -486,13 +502,6 @@ public void testTaskManagerFailoverSink() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } - private void sleepMs(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException ignored) { - } - } - private void initializeTable(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 783e6bda2..96a08d1cd 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -17,11 +17,16 @@ package org.apache.doris.flink.source; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -31,18 +36,17 @@ import org.apache.doris.flink.container.ContainerUtils; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; -import org.apache.doris.flink.exception.DorisRuntimeException; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.Set; /** DorisSource ITCase. */ public class DorisSourceITCase extends AbstractITCaseService { @@ -56,13 +60,25 @@ public class DorisSourceITCase extends AbstractITCaseService { private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down"; private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL = "tbl_read_tbl_push_down_with_union_all"; + static final String TABLE_CSV_JM = "tbl_csv_jm_source"; + static final String TABLE_CSV_TM = "tbl_csv_tm_source"; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(2) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); @Test public void testSource() throws Exception { initializeTable(TABLE_READ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); - + env.setParallelism(DEFAULT_PARALLELISM); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes(getFenodes()) @@ -84,13 +100,14 @@ public void testSource() throws Exception { } } List expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]"); - checkResult("testSource", expected.toArray(), actual.toArray()); + checkResultInAnyOrder("testSource", expected.toArray(), actual.toArray()); } @Test public void testOldSourceApi() throws Exception { initializeTable(TABLE_READ_OLD_API); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); Properties properties = new Properties(); properties.put("fenodes", getFenodes()); properties.put("username", getDorisUsername()); @@ -109,14 +126,14 @@ options, new SimpleListDeserializationSchema())) } } List expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]"); - checkResult("testOldSourceApi", expected.toArray(), actual.toArray()); + checkResultInAnyOrder("testOldSourceApi", expected.toArray(), actual.toArray()); } @Test public void testTableSource() throws Exception { initializeTable(TABLE_READ_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -146,7 +163,7 @@ public void testTableSource() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - Assert.assertArrayEquals(expected, actual.toArray()); + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); // fitler query List actualFilter = new ArrayList<>(); @@ -158,14 +175,14 @@ public void testTableSource() throws Exception { } } String[] expectedFilter = new String[] {"+I[doris, 18]"}; - checkResult("testTableSource", expectedFilter, actualFilter.toArray()); + checkResultInAnyOrder("testTableSource", expectedFilter, actualFilter.toArray()); } @Test public void testTableSourceOldApi() throws Exception { initializeTable(TABLE_READ_TBL_OLD_API); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -195,14 +212,14 @@ public void testTableSourceOldApi() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - checkResult("testTableSourceOldApi", expected, actual.toArray()); + checkResultInAnyOrder("testTableSourceOldApi", expected, actual.toArray()); } @Test public void testTableSourceAllOptions() throws Exception { initializeTable(TABLE_READ_TBL_ALL_OPTIONS); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -241,14 +258,14 @@ public void testTableSourceAllOptions() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - checkResult("testTableSourceAllOptions", expected, actual.toArray()); + checkResultInAnyOrder("testTableSourceAllOptions", expected, actual.toArray()); } @Test public void testTableSourceFilterAndProjectionPushDown() throws Exception { initializeTable(TABLE_READ_TBL_PUSH_DOWN); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -279,15 +296,16 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception { } } String[] expected = new String[] {"+I[18]"}; - checkResult("testTableSourceFilterAndProjectionPushDown", expected, actual.toArray()); + checkResultInAnyOrder( + "testTableSourceFilterAndProjectionPushDown", expected, actual.toArray()); } @Test - public void testTableSourceFilterWithUnionAll() { + public void testTableSourceFilterWithUnionAll() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAll case."); initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -318,14 +336,134 @@ public void testTableSourceFilterWithUnionAll() { while (iterator.hasNext()) { actual.add(iterator.next().toString()); } - } catch (Exception e) { - LOG.error("Failed to execute sql. sql={}", querySql, e); - throw new DorisRuntimeException(e); } - Set expected = new HashSet<>(Arrays.asList("+I[flink, 10]", "+I[doris, 18]")); - for (String a : actual) { - Assert.assertTrue(expected.contains(a)); + + String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"}; + checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray()); + } + + @Test + public void testJobManagerFailoverSource() throws Exception { + LOG.info("start to test JobManagerFailoverSource."); + initializeTableWithData(TABLE_CSV_JM); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String sourceDDL = + String.format( + "CREATE TABLE doris_source_jm (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_CSV_JM, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from doris_source_jm"); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + + List expectedData = getExpectedData(); + if (iterator.hasNext()) { + LOG.info("trigger jobmanager failover..."); + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + List actual = fetchRows(iterator); + LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.size() >= expectedData.size()); + Assert.assertTrue(actual.containsAll(expectedData)); + } + + private static List getExpectedData() { + String[] expected = + new String[] { + "+I[101, 1]", + "+I[102, 1]", + "+I[103, 1]", + "+I[201, 2]", + "+I[202, 2]", + "+I[203, 2]", + "+I[301, 3]", + "+I[302, 3]", + "+I[303, 3]", + "+I[401, 4]", + "+I[402, 4]", + "+I[403, 4]", + "+I[501, 5]", + "+I[502, 5]", + "+I[503, 5]", + "+I[601, 6]", + "+I[602, 6]", + "+I[603, 6]", + "+I[701, 7]", + "+I[702, 7]", + "+I[703, 7]", + "+I[801, 8]", + "+I[802, 8]", + "+I[803, 8]", + "+I[901, 9]", + "+I[902, 9]", + "+I[903, 9]" + }; + return Arrays.asList(expected); + } + + @Test + public void testTaskManagerFailoverSource() throws Exception { + LOG.info("start to test TaskManagerFailoverSource."); + initializeTableWithData(TABLE_CSV_TM); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String sourceDDL = + String.format( + "CREATE TABLE doris_source_tm (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_CSV_TM, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from doris_source_tm"); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + List expectedData = getExpectedData(); + if (iterator.hasNext()) { + LOG.info("trigger taskmanager failover..."); + triggerFailover( + FailoverType.TM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); } + + List actual = fetchRows(iterator); + LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.size() >= expectedData.size()); + Assert.assertTrue(actual.containsAll(expectedData)); } private void checkResult(String testName, Object[] expected, Object[] actual) { @@ -337,6 +475,15 @@ private void checkResult(String testName, Object[] expected, Object[] actual) { Assert.assertArrayEquals(expected, actual); } + private void checkResultInAnyOrder(String testName, Object[] expected, Object[] actual) { + LOG.info( + "Checking DorisSourceITCase result. testName={}, actual={}, expected={}", + testName, + actual, + expected); + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual)); + } + private void initializeTable(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), @@ -347,7 +494,7 @@ private void initializeTable(String table) { "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", @@ -356,4 +503,49 @@ private void initializeTable(String table) { String.format("insert into %s.%s values ('flink',10)", DATABASE, table), String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } + + private void initializeTableWithData(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, table), + String.format( + "insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table), + String.format( + "insert into %s.%s values ('201',2),('202',2),('203',2)", DATABASE, table), + String.format( + "insert into %s.%s values ('301',3),('302',3),('303',3)", DATABASE, table), + String.format( + "insert into %s.%s values ('401',4),('402',4),('403',4)", DATABASE, table), + String.format( + "insert into %s.%s values ('501',5),('502',5),('503',5)", DATABASE, table), + String.format( + "insert into %s.%s values ('601',6),('602',6),('603',6)", DATABASE, table), + String.format( + "insert into %s.%s values ('701',7),('702',7),('703',7)", DATABASE, table), + String.format( + "insert into %s.%s values ('801',8),('802',8),('803',8)", DATABASE, table), + String.format( + "insert into %s.%s values ('901',9),('902',9),('903',9)", + DATABASE, table)); + } + + private static List fetchRows(Iterator iter) { + List rows = new ArrayList<>(); + while (iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + } + return rows; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java new file mode 100644 index 000000000..687890154 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; + +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.source.DorisSource; +import org.apache.doris.flink.source.assigners.SimpleSplitAssigner; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link DorisSourceEnumerator}. */ +public class DorisSourceEnumeratorTest { + private static long splitId = 1L; + private TestingSplitEnumeratorContext context; + private DorisSourceSplit split; + private DorisSourceEnumerator enumerator; + + @BeforeEach + void setup() { + this.context = new TestingSplitEnumeratorContext<>(2); + this.split = createRandomSplit(); + this.enumerator = createEnumerator(context, split); + } + + @Test + void testCheckpointNoSplitRequested() throws Exception { + PendingSplitsCheckpoint state = enumerator.snapshotState(1L); + assertThat(state.getSplits()).contains(split); + } + + @Test + void testRestoreEnumerator() throws Exception { + PendingSplitsCheckpoint state = enumerator.snapshotState(1L); + + DorisSource source = DorisSource.builder().build(); + SplitEnumerator restoreEnumerator = + source.restoreEnumerator(context, state); + PendingSplitsCheckpoint pendingSplitsCheckpoint = restoreEnumerator.snapshotState(1L); + assertThat(pendingSplitsCheckpoint.getSplits()).contains(split); + } + + @Test + void testSplitRequestForRegisteredReader() throws Exception { + context.registerReader(1, "somehost"); + enumerator.addReader(1); + enumerator.handleSplitRequest(1, "somehost"); + assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty(); + assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split); + } + + @Test + void testSplitRequestForNonRegisteredReader() throws Exception { + enumerator.handleSplitRequest(1, "somehost"); + assertThat(context.getSplitAssignments()).doesNotContainKey(1); + assertThat(enumerator.snapshotState(1L).getSplits()).contains(split); + } + + @Test + void testNoMoreSplits() { + // first split assignment + context.registerReader(1, "somehost"); + enumerator.addReader(1); + enumerator.handleSplitRequest(1, "somehost"); + + // second request has no more split + enumerator.handleSplitRequest(1, "somehost"); + + assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split); + assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue(); + } + + private static DorisSourceSplit createRandomSplit() { + Set tabletIds = new HashSet<>(); + tabletIds.add(1001L); + return new DorisSourceSplit( + String.valueOf(splitId), + new PartitionDefinition("db", "tbl", "127.0.0.1", tabletIds, "queryPlan")); + } + + private static DorisSourceEnumerator createEnumerator( + final SplitEnumeratorContext context, + final DorisSourceSplit... splits) { + return new DorisSourceEnumerator(context, new SimpleSplitAssigner(Arrays.asList(splits))); + } +}