Skip to content

Commit

Permalink
Add printing parameters and move the verification forward
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 15, 2024
1 parent d0c1d7c commit b8d5e1d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
Expand All @@ -42,9 +43,9 @@ public class CdcTools {
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
System.out.println();
switch (operation) {
case MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
Expand All @@ -66,6 +67,7 @@ public static void main(String[] args) throws Exception {

private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("mysql-conf"));
Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
DatabaseSync databaseSync = new MysqlDatabaseSync();
Expand All @@ -74,6 +76,7 @@ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {

private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("oracle-conf"));
Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
Configuration oracleConfig = Configuration.fromMap(oracleMap);
DatabaseSync databaseSync = new OracleDatabaseSync();
Expand All @@ -82,6 +85,7 @@ private static void createOracleSyncDatabase(String[] opArgs) throws Exception {

private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("postgres-conf"));
Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new PostgresDatabaseSync();
Expand All @@ -90,6 +94,7 @@ private static void createPostgresSyncDatabase(String[] opArgs) throws Exception

private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("sqlserver-conf"));
Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new SqlServerDatabaseSync();
Expand All @@ -115,6 +120,7 @@ private static void syncDatabase(
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");

Preconditions.checkArgument(params.has("sink-conf"));
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
Expand Down Expand Up @@ -149,7 +155,8 @@ private static void syncDatabase(

private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
if (!params.has(key)) {
return new HashMap<>();
System.out.println("Can not find key [" + key + "] from args: " + params.toMap().toString() + ".\n");
return null;
}

Map<String, String> map = new HashMap<>();
Expand All @@ -163,7 +170,8 @@ private static Map<String, String> getConfigMap(MultipleParameterTool params, St
continue;
}

System.err.println("Invalid " + key + " " + param + ".\n");
System.out.println("Invalid " + key + " " + param + ".\n");
return null;
}
return map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -441,7 +442,9 @@ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
}

public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
this.tableConfig = tableConfig;
if(!CollectionUtil.isNullOrEmpty(tableConfig)){
this.tableConfig = tableConfig;
}
return this;
}

Expand Down

0 comments on commit b8d5e1d

Please sign in to comment.