Skip to content

Commit

Permalink
[log](improve)Add printing parameters and move the verification forwa…
Browse files Browse the repository at this point in the history
…rd (apache#295)
  • Loading branch information
JNSimba authored Jan 17, 2024
1 parent d0c1d7c commit 800755e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
Expand All @@ -42,9 +43,9 @@ public class CdcTools {
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
System.out.println();
switch (operation) {
case MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
Expand All @@ -66,6 +67,7 @@ public static void main(String[] args) throws Exception {

private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("mysql-conf"));
Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
DatabaseSync databaseSync = new MysqlDatabaseSync();
Expand All @@ -74,6 +76,7 @@ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {

private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("oracle-conf"));
Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
Configuration oracleConfig = Configuration.fromMap(oracleMap);
DatabaseSync databaseSync = new OracleDatabaseSync();
Expand All @@ -82,6 +85,7 @@ private static void createOracleSyncDatabase(String[] opArgs) throws Exception {

private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("postgres-conf"));
Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new PostgresDatabaseSync();
Expand All @@ -90,6 +94,7 @@ private static void createPostgresSyncDatabase(String[] opArgs) throws Exception

private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("sqlserver-conf"));
Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new SqlServerDatabaseSync();
Expand All @@ -115,6 +120,7 @@ private static void syncDatabase(
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");

Preconditions.checkArgument(params.has("sink-conf"));
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
Expand Down Expand Up @@ -149,7 +155,13 @@ private static void syncDatabase(

private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
if (!params.has(key)) {
return new HashMap<>();
System.out.println(
"Can not find key ["
+ key
+ "] from args: "
+ params.toMap().toString()
+ ".\n");
return null;
}

Map<String, String> map = new HashMap<>();
Expand All @@ -163,7 +175,8 @@ private static Map<String, String> getConfigMap(MultipleParameterTool params, St
continue;
}

System.err.println("Invalid " + key + " " + param + ".\n");
System.out.println("Invalid " + key + " " + param + ".\n");
return null;
}
return map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -441,7 +442,9 @@ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
}

public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
this.tableConfig = tableConfig;
if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
this.tableConfig = tableConfig;
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,20 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import java.net.MalformedURLException;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
Expand Down Expand Up @@ -107,7 +112,7 @@ public static GenericContainer createDorisContainer() {
return container;
}

protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
protected static void initializeJdbcConnection() throws Exception {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
Expand All @@ -124,6 +129,7 @@ protected static void initializeJdbcConnection() throws SQLException, MalformedU
} while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
}
LOG.info("Connected to Doris successfully...");
printClusterStatus();
}

private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
Expand All @@ -135,4 +141,28 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce
}
return false;
}

protected static void printClusterStatus() throws Exception {
LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
try (Statement statement = connection.createStatement()) {
ResultSet showFrontends = statement.executeQuery("show frontends");
LOG.info("Frontends status: {}", convertList(showFrontends));
ResultSet showBackends = statement.executeQuery("show backends");
LOG.info("Backends status: {}", convertList(showBackends));
}
}

private static List<Map> convertList(ResultSet rs) throws SQLException {
List<Map> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
Map<String, Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
rowData.put(metaData.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static void stopMySQLContainers() {

@Test
public void testMySQL2Doris() throws Exception {
printClusterStatus();
initializeMySQLTable();
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Expand Down Expand Up @@ -173,6 +174,7 @@ public void testMySQL2Doris() throws Exception {

@Test
public void testAutoAddTable() throws Exception {
printClusterStatus();
initializeMySQLTable();
initializeDorisTable();
JobClient jobClient = submitJob();
Expand Down

0 comments on commit 800755e

Please sign in to comment.