diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index 77492ae07..e6318f8f9 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -40,5 +40,5 @@ jobs: - name: Run E2ECases run: | - cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="adamlee489/doris:2.0.3" + cd flink-doris-connector && mvn test -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index d03ee646f..0b6706560 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -40,5 +40,5 @@ jobs: - name: Run ITCases run: | - cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" + cd flink-doris-connector && mvn test -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index 09ae4bddb..e0f75f08e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -18,14 +18,11 @@ package org.apache.doris.flink; import com.google.common.collect.Lists; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerLoggerFactory; import java.io.BufferedReader; @@ -45,16 +42,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import java.util.stream.Stream; - -import static org.awaitility.Awaitility.given; -import static org.awaitility.Durations.ONE_SECOND; public abstract class DorisTestBase { protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class); - private static final String DEFAULT_DOCKER_IMAGE = "adamlee489/doris:2.0.3"; + private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0"; protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image") == null ? DEFAULT_DOCKER_IMAGE @@ -66,32 +58,29 @@ public abstract class DorisTestBase { protected static final String USERNAME = "root"; protected static final String PASSWORD = ""; protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); - protected static final int DEFAULT_PARALLELISM = 4; protected static String getFenodes() { return DORIS_CONTAINER.getHost() + ":8030"; } - @BeforeClass - public static void startContainers() { - LOG.info("Starting doris containers..."); - Startables.deepStart(Stream.of(DORIS_CONTAINER)).join(); - given().ignoreExceptions() - .await() - .atMost(300, TimeUnit.SECONDS) - .pollInterval(ONE_SECOND) - .untilAsserted(DorisTestBase::initializeJdbcConnection); - LOG.info("Containers doris are started."); + static { + startContainers(); } - @AfterClass - public static void stopContainers() { - LOG.info("Stopping doris containers..."); - DORIS_CONTAINER.stop(); - LOG.info("Containers doris are stopped."); + public static void startContainers() { + try { + LOG.info("Starting doris containers..."); + // singleton doris container + DORIS_CONTAINER.start(); + initializeJdbcConnection(); + } catch (Exception ex) { + LOG.error("Failed to start containers doris, ", ex); + } + LOG.info("Containers doris are started."); } public static GenericContainer createDorisContainer() { + LOG.info("Create doris containers..."); GenericContainer container = new GenericContainer<>(DORIS_DOCKER_IMAGE) .withNetwork(Network.newNetwork()) @@ -100,7 +89,7 @@ public static GenericContainer createDorisContainer() { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) - .withReuse(true); + .withExposedPorts(8030, 9030, 8040, 9060); container.setPortBindings( Lists.newArrayList( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 99e7a13ec..2aecaaf5e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -280,6 +280,7 @@ private void initializeDorisTable() throws Exception { DriverManager.getConnection( String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));