Skip to content

Commit

Permalink
[Improve] fix connection close and add dorise2ecase (apache#329)
Browse files Browse the repository at this point in the history
 add dorise2ecase and improvement test case
  • Loading branch information
JNSimba authored Mar 6, 2024
1 parent 43e0e5c commit f8e61ba
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Build Extensions
name: Build Connector
on:
pull_request:
push:

jobs:
build-extension:
name: "Build Extensions"
name: "Build Connector"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
44 changes: 0 additions & 44 deletions .github/workflows/run-e2ecase-12.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Run E2ECases 2.0
name: Run E2ECases
on:
pull_request:
push:

jobs:
build-extension:
name: "Run E2ECases 2.0"
name: "Run E2ECases"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
44 changes: 0 additions & 44 deletions .github/workflows/run-itcase-12.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Run ITCases 2.0
name: Run ITCases
on:
pull_request:
push:

jobs:
build-extension:
name: "Run ITCases 2.0"
name: "Run ITCases"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
Expand All @@ -51,15 +54,18 @@

public abstract class DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image");
private static final String DEFAULT_DOCKER_IMAGE = "adamlee489/doris:2.0.3";
protected static final String DORIS_DOCKER_IMAGE =
System.getProperty("image") == null
? DEFAULT_DOCKER_IMAGE
: System.getProperty("image");
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String URL = "jdbc:mysql://%s:9030";
protected static final String USERNAME = "root";
protected static final String PASSWORD = "";
protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
protected static Connection connection;
protected static final int DEFAULT_PARALLELISM = 4;

protected static String getFenodes() {
Expand All @@ -68,39 +74,33 @@ protected static String getFenodes() {

@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
LOG.info("Starting doris containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
.await()
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
LOG.info("Containers are started.");
LOG.info("Containers doris are started.");
}

@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
LOG.info("Stopping doris containers...");
DORIS_CONTAINER.stop();
LOG.info("Containers are stopped.");
LOG.info("Containers doris are stopped.");
}

public static GenericContainer createDorisContainer() {
GenericContainer container =
new GenericContainer<>(DORIS_DOCKER_IMAGE)
.withNetwork(Network.newNetwork())
.withNetworkAliases("DorisContainer")
.withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
.withEnv("FE_ID", "1")
.withEnv("CURRENT_BE_IP", "127.0.0.1")
.withEnv("CURRENT_BE_PORT", "9050")
.withCommand("ulimit -n 65536")
.withCreateContainerCmdModifier(
cmd -> cmd.getHostConfig().withMemorySwap(0L))
.withPrivilegedMode(true)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
.withReuse(true);

container.setPortBindings(
Lists.newArrayList(
Expand All @@ -118,10 +118,10 @@ protected static void initializeJdbcConnection() throws Exception {
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
LOG.info("Try to connect to Doris...");
Thread.currentThread().setContextClassLoader(urlClassLoader);
connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet resultSet;
do {
LOG.info("Wait for the Backend to start successfully...");
Expand All @@ -144,14 +144,37 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce

protected static void printClusterStatus() throws Exception {
LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
try (Statement statement = connection.createStatement()) {
echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
echo("sh", "-c", "free -h");
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet showFrontends = statement.executeQuery("show frontends");
LOG.info("Frontends status: {}", convertList(showFrontends));
ResultSet showBackends = statement.executeQuery("show backends");
LOG.info("Backends status: {}", convertList(showBackends));
}
}

static void echo(String... cmd) {
try {
Process p = Runtime.getRuntime().exec(cmd);
InputStream is = p.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
p.waitFor();
is.close();
reader.close();
p.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}

private static List<Map> convertList(ResultSet rs) throws SQLException {
List<Map> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -45,7 +47,7 @@

/** DorisSink ITCase with csv and arrow format. */
public class DorisSinkITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String DATABASE = "test_sink";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
Expand All @@ -61,9 +63,13 @@ public void testSinkCsvFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {

try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_CSV));
while (sinkResultSet.next()) {
Expand Down Expand Up @@ -102,9 +108,12 @@ public void testSinkJsonFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_JSON));
while (sinkResultSet.next()) {
Expand Down Expand Up @@ -172,9 +181,12 @@ public void testTableSinkJsonFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON_TBL));
Expand All @@ -191,7 +203,10 @@ public void testTableSinkJsonFormat() throws Exception {
}

private void initializeTable(String table) throws Exception {
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
statement.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** DorisSource ITCase. */
public class DorisSourceITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String DATABASE = "test_source";
static final String TABLE_READ = "tbl_read";
static final String TABLE_READ_TBL = "tbl_read_tbl";

Expand Down Expand Up @@ -111,7 +113,10 @@ public void testTableSource() throws Exception {
}

private void initializeTable(String table) throws Exception {
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
statement.execute(
Expand Down
Loading

0 comments on commit f8e61ba

Please sign in to comment.