diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-connector.yml similarity index 97% rename from .github/workflows/build-extension.yml rename to .github/workflows/build-connector.yml index 7259bb43a..298894f7d 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-connector.yml @@ -16,14 +16,14 @@ # under the License. # --- -name: Build Extensions +name: Build Connector on: pull_request: push: jobs: build-extension: - name: "Build Extensions" + name: "Build Connector" runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/run-e2ecase-12.yml b/.github/workflows/run-e2ecase-12.yml deleted file mode 100644 index fd89d20d8..000000000 --- a/.github/workflows/run-e2ecase-12.yml +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---- -name: Run E2ECases 1.2 -on: - pull_request: - push: - -jobs: - build-extension: - name: "Run E2ECases 1.2" - runs-on: ubuntu-latest - defaults: - run: - shell: bash - steps: - - name: Checkout - uses: actions/checkout@master - - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' - - - name: Run E2ECases - run: | - cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="adamlee489/doris:1.2.7.1_x86" - diff --git a/.github/workflows/run-e2ecase-20.yml b/.github/workflows/run-e2ecase.yml similarity index 95% rename from .github/workflows/run-e2ecase-20.yml rename to .github/workflows/run-e2ecase.yml index ebf7ae62d..77492ae07 100644 --- a/.github/workflows/run-e2ecase-20.yml +++ b/.github/workflows/run-e2ecase.yml @@ -16,14 +16,14 @@ # under the License. # --- -name: Run E2ECases 2.0 +name: Run E2ECases on: pull_request: push: jobs: build-extension: - name: "Run E2ECases 2.0" + name: "Run E2ECases" runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/run-itcase-12.yml b/.github/workflows/run-itcase-12.yml deleted file mode 100644 index cd31c3ad5..000000000 --- a/.github/workflows/run-itcase-12.yml +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---- -name: Run ITCases 1.2 -on: - pull_request: - push: - -jobs: - build-extension: - name: "Run ITCases 1.2" - runs-on: ubuntu-latest - defaults: - run: - shell: bash - steps: - - name: Checkout - uses: actions/checkout@master - - - name: Setup java - uses: actions/setup-java@v2 - with: - distribution: adopt - java-version: '8' - - - name: Run ITCases - run: | - cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" - diff --git a/.github/workflows/run-itcase-20.yml b/.github/workflows/run-itcase.yml similarity index 96% rename from .github/workflows/run-itcase-20.yml rename to .github/workflows/run-itcase.yml index ad9ef5a46..d03ee646f 100644 --- a/.github/workflows/run-itcase-20.yml +++ b/.github/workflows/run-itcase.yml @@ -16,14 +16,14 @@ # under the License. # --- -name: Run ITCases 2.0 +name: Run ITCases on: pull_request: push: jobs: build-extension: - name: "Run ITCases 2.0" + name: "Run ITCases" runs-on: ubuntu-latest defaults: run: diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index 278be8ca8..09ae4bddb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -28,6 +28,9 @@ import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerLoggerFactory; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; import java.net.InetAddress; import java.net.URL; import java.net.URLClassLoader; @@ -51,7 +54,11 @@ public abstract class DorisTestBase { protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class); - protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image"); + private static final String DEFAULT_DOCKER_IMAGE = "adamlee489/doris:2.0.3"; + protected static final String DORIS_DOCKER_IMAGE = + System.getProperty("image") == null + ? DEFAULT_DOCKER_IMAGE + : System.getProperty("image"); private static final String DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; @@ -59,7 +66,6 @@ public abstract class DorisTestBase { protected static final String USERNAME = "root"; protected static final String PASSWORD = ""; protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); - protected static Connection connection; protected static final int DEFAULT_PARALLELISM = 4; protected static String getFenodes() { @@ -68,21 +74,21 @@ protected static String getFenodes() { @BeforeClass public static void startContainers() { - LOG.info("Starting containers..."); + LOG.info("Starting doris containers..."); Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); given().ignoreExceptions() .await() .atMost(300, TimeUnit.SECONDS) .pollInterval(ONE_SECOND) .untilAsserted(DorisTestBase::initializeJdbcConnection); - LOG.info("Containers are started."); + LOG.info("Containers doris are started."); } @AfterClass public static void stopContainers() { - LOG.info("Stopping containers..."); + LOG.info("Stopping doris containers..."); DORIS_CONTAINER.stop(); - LOG.info("Containers are stopped."); + LOG.info("Containers doris are stopped."); } public static GenericContainer createDorisContainer() { @@ -90,17 +96,11 @@ public static GenericContainer createDorisContainer() { new GenericContainer<>(DORIS_DOCKER_IMAGE) .withNetwork(Network.newNetwork()) .withNetworkAliases("DorisContainer") - .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010") - .withEnv("FE_ID", "1") - .withEnv("CURRENT_BE_IP", "127.0.0.1") - .withEnv("CURRENT_BE_PORT", "9050") - .withCommand("ulimit -n 65536") - .withCreateContainerCmdModifier( - cmd -> cmd.getHostConfig().withMemorySwap(0L)) .withPrivilegedMode(true) .withLogConsumer( new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))); + DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) + .withReuse(true); container.setPortBindings( Lists.newArrayList( @@ -118,10 +118,10 @@ protected static void initializeJdbcConnection() throws Exception { new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader()); LOG.info("Try to connect to Doris..."); Thread.currentThread().setContextClassLoader(urlClassLoader); - connection = - DriverManager.getConnection( - String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet resultSet; do { LOG.info("Wait for the Backend to start successfully..."); @@ -144,7 +144,12 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce protected static void printClusterStatus() throws Exception { LOG.info("Current machine IP: {}", InetAddress.getLocalHost()); - try (Statement statement = connection.createStatement()) { + echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq"); + echo("sh", "-c", "free -h"); + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet showFrontends = statement.executeQuery("show frontends"); LOG.info("Frontends status: {}", convertList(showFrontends)); ResultSet showBackends = statement.executeQuery("show backends"); @@ -152,6 +157,24 @@ protected static void printClusterStatus() throws Exception { } } + static void echo(String... cmd) { + try { + Process p = Runtime.getRuntime().exec(cmd); + InputStream is = p.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + p.waitFor(); + is.close(); + reader.close(); + p.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + } + private static List convertList(ResultSet rs) throws SQLException { List list = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 26cbc2c4a..c9501d339 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -30,6 +30,8 @@ import org.junit.Test; import org.junit.jupiter.api.Assertions; +import java.sql.Connection; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; @@ -45,7 +47,7 @@ /** DorisSink ITCase with csv and arrow format. */ public class DorisSinkITCase extends DorisTestBase { - static final String DATABASE = "test"; + static final String DATABASE = "test_sink"; static final String TABLE_CSV = "tbl_csv"; static final String TABLE_JSON = "tbl_json"; static final String TABLE_JSON_TBL = "tbl_json_tbl"; @@ -61,9 +63,13 @@ public void testSinkCsvFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_CSV)); while (sinkResultSet.next()) { @@ -102,9 +108,12 @@ public void testSinkJsonFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_JSON)); while (sinkResultSet.next()) { @@ -172,9 +181,12 @@ public void testTableSinkJsonFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL)); @@ -191,7 +203,10 @@ public void testTableSinkJsonFormat() throws Exception { } private void initializeTable(String table) throws Exception { - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); statement.execute( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index a5a3b534a..f88b756fd 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -32,6 +32,8 @@ import org.junit.Test; import org.junit.jupiter.api.Assertions; +import java.sql.Connection; +import java.sql.DriverManager; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +41,7 @@ /** DorisSource ITCase. */ public class DorisSourceITCase extends DorisTestBase { - static final String DATABASE = "test"; + static final String DATABASE = "test_source"; static final String TABLE_READ = "tbl_read"; static final String TABLE_READ_TBL = "tbl_read_tbl"; @@ -111,7 +113,10 @@ public void testTableSource() throws Exception { } private void initializeTable(String table) throws Exception { - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); statement.execute( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java new file mode 100644 index 000000000..ad40255bc --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.apache.doris.flink.DorisTestBase; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** DorisDorisE2ECase. */ +public class DorisDorisE2ECase extends DorisTestBase { + private static final String DATABASE_SOURCE = "test_e2e_source"; + private static final String DATABASE_SINK = "test_e2e_sink"; + private static final String TABLE = "test_tbl"; + + @Test + public void testDoris2Doris() throws Exception { + initializeDorisTable(TABLE); + printClusterStatus(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), DATABASE_SOURCE + "." + TABLE, USERNAME, PASSWORD); + tEnv.executeSql(sourceDDL); + + String sinkDDL = + String.format( + "CREATE TABLE doris_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), DATABASE_SINK + "." + TABLE, USERNAME, PASSWORD); + tEnv.executeSql(sinkDDL); + + tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await(); + + TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink"); + List actual = new ArrayList<>(); + try (CloseableIterator iterator = tableResult.collect()) { + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"}; + Assertions.assertIterableEquals(Arrays.asList(expected), actual); + } + + private void initializeDorisTable(String table) throws Exception { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SOURCE)); + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SINK)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SOURCE, table)); + statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SINK, table)); + statement.execute( + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE_SOURCE, table)); + statement.execute( + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE_SINK, table)); + statement.execute( + String.format( + "insert into %s.%s values ('doris',18)", DATABASE_SOURCE, table)); + statement.execute( + String.format( + "insert into %s.%s values ('flink',10)", DATABASE_SOURCE, table)); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 3390f7553..99e7a13ec 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -63,7 +63,7 @@ */ public class MySQLDorisE2ECase extends DorisTestBase { protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class); - private static final String DATABASE = "test"; + private static final String DATABASE = "test_e2e_mysql"; private static final String MYSQL_USER = "root"; private static final String MYSQL_PASSWD = "123456"; private static final String TABLE_1 = "tbl1"; @@ -276,7 +276,10 @@ public void testAutoAddTable() throws Exception { } private void initializeDorisTable() throws Exception { - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3)); @@ -287,8 +290,11 @@ private void initializeDorisTable() throws Exception { public void checkResult(Set> expected, String query, int columnSize) throws Exception { Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { - ResultSet sinkResultSet = sinkStatement.executeQuery(query); + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { + ResultSet sinkResultSet = statement.executeQuery(query); while (sinkResultSet.next()) { List row = new ArrayList<>(); for (int i = 1; i <= columnSize; i++) {