Skip to content

Commit

Permalink
[Improve](case) add customer doris container cluster (apache#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Sep 20, 2024
1 parent c4ae051 commit c61342f
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.flink.container.instance.ContainerService;
import org.apache.doris.flink.container.instance.DorisContainer;
import org.apache.doris.flink.container.instance.DorisCustomerContainer;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -48,7 +49,8 @@ private static void initDorisContainer() {
LOG.info("The doris container has been started and is running status.");
return;
}
dorisContainerService = new DorisContainer();
Boolean customerEnv = Boolean.valueOf(System.getProperty("customer_env", "false"));
dorisContainerService = customerEnv ? new DorisCustomerContainer() : new DorisContainer();
dorisContainerService.startContainer();
LOG.info("Doris container was started.");
}
Expand All @@ -74,9 +76,7 @@ protected String getDorisPassword() {
}

protected String getDorisQueryUrl() {
return String.format(
"jdbc:mysql://%s:%s",
getDorisInstanceHost(), dorisContainerService.getMappedPort(9030));
return dorisContainerService.getJdbcUrl();
}

protected String getDorisInstanceHost() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface ContainerService {

Connection getQueryConnection();

String getJdbcUrl();

String getInstanceHost();

Integer getMappedPort(int originalPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public Connection getQueryConnection() {
}
}

@Override
public String getJdbcUrl() {
return String.format(JDBC_URL, dorisContainer.getHost());
}

@Override
public String getInstanceHost() {
return dorisContainer.getHost();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.container.instance;

import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.exception.DorisRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/** Using a custom Doris environment */
public class DorisCustomerContainer implements ContainerService {
private static final Logger LOG = LoggerFactory.getLogger(DorisCustomerContainer.class);
private static final String JDBC_URL = "jdbc:mysql://%s:%s";

@Override
public void startContainer() {
LOG.info("Using doris customer containers env.");
checkParams();
if (!isRunning()) {
throw new DorisRuntimeException(
"Backend is not alive. Please check the doris cluster.");
}
}

private void checkParams() {
Preconditions.checkArgument(
System.getProperty("doris_host") != null, "doris_host is required.");
Preconditions.checkArgument(
System.getProperty("doris_query_port") != null, "doris_query_port is required.");
Preconditions.checkArgument(
System.getProperty("doris_http_port") != null, "doris_http_port is required.");
Preconditions.checkArgument(
System.getProperty("doris_user") != null, "doris_user is required.");
Preconditions.checkArgument(
System.getProperty("doris_passwd") != null, "doris_passwd is required.");
}

@Override
public boolean isRunning() {
try (Connection conn = getQueryConnection();
Statement stmt = conn.createStatement()) {
ResultSet showBackends = stmt.executeQuery("show backends");
while (showBackends.next()) {
String isAlive = showBackends.getString("Alive").trim();
if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
return true;
}
}
} catch (SQLException e) {
LOG.error("Failed to connect doris cluster.", e);
return false;
}
return false;
}

@Override
public Connection getQueryConnection() {
LOG.info("Try to get query connection from doris.");
String jdbcUrl =
String.format(
JDBC_URL,
System.getProperty("doris_host"),
System.getProperty("doris_query_port"));
try {
return DriverManager.getConnection(jdbcUrl, getUsername(), getPassword());
} catch (SQLException e) {
LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e);
throw new DorisRuntimeException(e);
}
}

@Override
public String getJdbcUrl() {
return String.format(
JDBC_URL, System.getProperty("doris_host"), System.getProperty("doris_query_port"));
}

@Override
public String getInstanceHost() {
return System.getProperty("doris_host");
}

@Override
public Integer getMappedPort(int originalPort) {
return originalPort;
}

@Override
public String getUsername() {
return System.getProperty("doris_user");
}

@Override
public String getPassword() {
return System.getProperty("doris_passwd");
}

@Override
public String getFenodes() {
return System.getProperty("doris_host") + ":" + System.getProperty("doris_http_port");
}

@Override
public String getBenodes() {
return null;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public Connection getQueryConnection() {
}
}

@Override
public String getJdbcUrl() {
return mysqlcontainer.getJdbcUrl();
}

@Override
public void close() {
LOG.info("Stopping MySQL container.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect";
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
Expand Down Expand Up @@ -177,8 +179,6 @@ public void testTableSinkJsonFormat() throws Exception {
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'benodes' = '%s',"
+ " 'auto-redirect' = 'false',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
Expand All @@ -196,7 +196,6 @@ public void testTableSinkJsonFormat() throws Exception {
+ "'"
+ ")",
getFenodes(),
getBenodes(),
DATABASE + "." + TABLE_JSON_TBL,
getDorisUsername(),
getDorisPassword());
Expand All @@ -210,6 +209,52 @@ public void testTableSinkJsonFormat() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

@Test
public void testTableSinkAutoRedirectFalse() throws Exception {
if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) {
LOG.info("benodes is empty, skip the test.");
return;
}
initializeTable(TABLE_TBL_AUTO_REDIRECT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sinkDDL =
String.format(
"CREATE TABLE doris_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'benodes' = '%s',"
+ " 'auto-redirect' = 'false',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.label-prefix' = 'doris_sink"
+ UUID.randomUUID()
+ "'"
+ ")",
getFenodes(),
getBenodes(),
DATABASE + "." + TABLE_TBL_AUTO_REDIRECT,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sinkDDL);
tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");

Thread.sleep(10000);
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL);
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

@Test
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
Expand Down

0 comments on commit c61342f

Please sign in to comment.