diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 9caddd2ba..77d9d847e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -20,6 +20,7 @@ import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; @@ -42,9 +43,9 @@ public class CdcTools { private static final List EMPTY_KEYS = Collections.singletonList("password"); public static void main(String[] args) throws Exception { + System.out.println("Input args: " + Arrays.asList(args) + ".\n"); String operation = args[0].toLowerCase(); String[] opArgs = Arrays.copyOfRange(args, 1, args.length); - System.out.println(); switch (operation) { case MYSQL_SYNC_DATABASE: createMySQLSyncDatabase(opArgs); @@ -66,6 +67,7 @@ public static void main(String[] args) throws Exception { private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("mysql-conf")); Map mysqlMap = getConfigMap(params, "mysql-conf"); Configuration mysqlConfig = Configuration.fromMap(mysqlMap); DatabaseSync databaseSync = new MysqlDatabaseSync(); @@ -74,6 +76,7 @@ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { private static void createOracleSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("oracle-conf")); Map oracleMap = getConfigMap(params, "oracle-conf"); Configuration oracleConfig = Configuration.fromMap(oracleMap); DatabaseSync databaseSync = new OracleDatabaseSync(); @@ -82,6 +85,7 @@ private static void createOracleSyncDatabase(String[] opArgs) throws Exception { private static void createPostgresSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("postgres-conf")); Map postgresMap = getConfigMap(params, "postgres-conf"); Configuration postgresConfig = Configuration.fromMap(postgresMap); DatabaseSync databaseSync = new PostgresDatabaseSync(); @@ -90,6 +94,7 @@ private static void createPostgresSyncDatabase(String[] opArgs) throws Exception private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("sqlserver-conf")); Map postgresMap = getConfigMap(params, "sqlserver-conf"); Configuration postgresConfig = Configuration.fromMap(postgresMap); DatabaseSync databaseSync = new SqlServerDatabaseSync(); @@ -115,6 +120,7 @@ private static void syncDatabase( boolean useNewSchemaChange = params.has("use-new-schema-change"); boolean singleSink = params.has("single-sink"); + Preconditions.checkArgument(params.has("sink-conf")); Map sinkMap = getConfigMap(params, "sink-conf"); Map tableMap = getConfigMap(params, "table-conf"); Configuration sinkConfig = Configuration.fromMap(sinkMap); @@ -149,7 +155,8 @@ private static void syncDatabase( private static Map getConfigMap(MultipleParameterTool params, String key) { if (!params.has(key)) { - return new HashMap<>(); + System.out.println("Can not find key [" + key + "] from args: " + params.toMap().toString() + ".\n"); + return null; } Map map = new HashMap<>(); @@ -163,7 +170,8 @@ private static Map getConfigMap(MultipleParameterTool params, St continue; } - System.err.println("Invalid " + key + " " + param + ".\n"); + System.out.println("Invalid " + key + " " + param + ".\n"); + return null; } return map; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index f1e172b11..cd31282e6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -441,7 +442,9 @@ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) { } public DatabaseSync setTableConfig(Map tableConfig) { - this.tableConfig = tableConfig; + if(!CollectionUtil.isNullOrEmpty(tableConfig)){ + this.tableConfig = tableConfig; + } return this; }