From 41929d31dad1f11a96daec31c7f55839594e261e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 5 Mar 2024 15:20:39 +0800 Subject: [PATCH] fix --- .../org/apache/doris/flink/DorisTestBase.java | 22 ++++++-------- .../doris/flink/sink/DorisSinkITCase.java | 29 ++++++++++++++----- .../doris/flink/source/DorisSourceITCase.java | 7 ++++- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index 31ff2b124..dca9a1c66 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -59,7 +59,6 @@ public abstract class DorisTestBase { protected static final String USERNAME = "root"; protected static final String PASSWORD = ""; protected static final GenericContainer DORIS_CONTAINER = createDorisContainer(); - protected static Connection connection; protected static final int DEFAULT_PARALLELISM = 4; protected static String getFenodes() { @@ -118,10 +117,10 @@ protected static void initializeJdbcConnection() throws Exception { new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader()); LOG.info("Try to connect to Doris..."); Thread.currentThread().setContextClassLoader(urlClassLoader); - connection = - DriverManager.getConnection( - String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet resultSet; do { LOG.info("Wait for the Backend to start successfully..."); @@ -143,14 +142,11 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce } protected static void printClusterStatus() throws Exception { - LOG.info( - "{} {} Current machine IP: {} {} {}", - Thread.currentThread().getId(), - Thread.currentThread().getName(), - InetAddress.getLocalHost(), - connection.isClosed(), - connection.isValid(1000)); - try (Statement statement = connection.createStatement()) { + LOG.info("Current machine IP: {}", InetAddress.getLocalHost()); + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet showFrontends = statement.executeQuery("show frontends"); LOG.info("Frontends status: {}", convertList(showFrontends)); ResultSet showBackends = statement.executeQuery("show backends"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 26cbc2c4a..646b5b20b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -30,6 +30,8 @@ import org.junit.Test; import org.junit.jupiter.api.Assertions; +import java.sql.Connection; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; @@ -61,9 +63,13 @@ public void testSinkCsvFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_CSV)); while (sinkResultSet.next()) { @@ -102,9 +108,12 @@ public void testSinkJsonFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_JSON)); while (sinkResultSet.next()) { @@ -172,9 +181,12 @@ public void testTableSinkJsonFormat() throws Exception { Thread.sleep(10000); Set> actual = new HashSet<>(); - try (Statement sinkStatement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { ResultSet sinkResultSet = - sinkStatement.executeQuery( + statement.executeQuery( String.format( "select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL)); @@ -191,7 +203,10 @@ public void testTableSinkJsonFormat() throws Exception { } private void initializeTable(String table) throws Exception { - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); statement.execute( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index a5a3b534a..3c4735502 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -32,6 +32,8 @@ import org.junit.Test; import org.junit.jupiter.api.Assertions; +import java.sql.Connection; +import java.sql.DriverManager; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; @@ -111,7 +113,10 @@ public void testTableSource() throws Exception { } private void initializeTable(String table) throws Exception { - try (Statement statement = connection.createStatement()) { + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD); + Statement statement = connection.createStatement()) { statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); statement.execute(