diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index e565d0a72..2f672adb2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -59,6 +59,8 @@ import java.util.Set; import java.util.regex.Pattern; +import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; + public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private static final String TABLE_NAME_OPTIONS = "table-name"; @@ -483,6 +485,25 @@ private void handleTableCreationFailure(Exception ex) throws DorisSystemExceptio } } + protected Properties getJdbcProperties() { + Properties jdbcProps = new Properties(); + for (Map.Entry entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(PROPERTIES_PREFIX)) { + jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value); + } + } + return jdbcProps; + } + + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + jdbcProperties.forEach( + (key, value) -> jdbcUrlBuilder.append("&").append(key).append("=").append(value)); + return jdbcUrlBuilder.toString(); + } + public DatabaseSync setEnv(StreamExecutionEnvironment env) { this.env = env; return this; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java index 2dcd21a9b..3947c1e16 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java @@ -89,9 +89,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(JdbcSourceOptions.HOSTNAME), config.get(PORT), config.get(JdbcSourceOptions.DATABASE_NAME)); @@ -224,4 +226,21 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { public String getTableListPrefix() { return config.get(JdbcSourceOptions.SCHEMA_NAME); } + + @Override + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + boolean firstParam = true; + for (Map.Entry entry : jdbcProperties.entrySet()) { + Object key = entry.getKey(); + Object value = entry.getValue(); + if (firstParam) { + jdbcUrlBuilder.append(":").append(key).append("=").append(value).append(";"); + firstParam = false; + } else { + jdbcUrlBuilder.append(key).append("=").append(value).append(";"); + } + } + return jdbcUrlBuilder.toString(); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index a58008cdd..9fdfdcaed 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -55,10 +55,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; + public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; - private static final String PROPERTIES_PREFIX = "jdbc.properties."; public MysqlDatabaseSync() throws SQLException { super(); @@ -83,12 +84,10 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { Properties jdbcProperties = getJdbcProperties(); - StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL); - jdbcProperties.forEach( - (key, value) -> jdbcUrlSb.append("&").append(key).append("=").append(value)); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - jdbcUrlSb.toString(), + jdbcUrlTemplate, config.get(MySqlSourceOptions.HOSTNAME), config.get(MySqlSourceOptions.PORT)); @@ -269,16 +268,4 @@ private Map getChunkColumnMap() { } return chunkMap; } - - private Properties getJdbcProperties() { - Properties jdbcProps = new Properties(); - for (Map.Entry entry : config.toMap().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key.startsWith(PROPERTIES_PREFIX)) { - jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value); - } - } - return jdbcProps; - } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 66e26d152..15fc632b4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -66,7 +66,7 @@ public class PostgresDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class); - private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; + private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s?"; public PostgresDatabaseSync() throws SQLException { super(); @@ -84,9 +84,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(PostgresSourceOptions.HOSTNAME), config.get(PostgresSourceOptions.PG_PORT), config.get(PostgresSourceOptions.DATABASE_NAME)); @@ -227,7 +229,24 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { @Override public String getTableListPrefix() { - String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME); - return schemaName; + return config.get(PostgresSourceOptions.SCHEMA_NAME); + } + + @Override + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + + if (!initialJdbcUrl.startsWith("?")) { + return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties); + } + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + int recordIndex = 0; + for (Map.Entry entry : jdbcProperties.entrySet()) { + jdbcUrlBuilder.append(entry.getKey()).append("=").append(entry.getValue()); + if (recordIndex < jdbcProperties.size() - 1) { + jdbcUrlBuilder.append("&"); + recordIndex++; + } + } + return jdbcUrlBuilder.toString(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index 08c54dd38..cb6b66829 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -63,7 +63,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); - private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; + private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s;"; private static final String PORT = "port"; public SqlServerDatabaseSync() throws SQLException { @@ -82,9 +82,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(JdbcSourceOptions.HOSTNAME), config.getInteger(PORT, 1433), config.get(JdbcSourceOptions.DATABASE_NAME)); @@ -216,4 +218,12 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { public String getTableListPrefix() { return config.get(JdbcSourceOptions.SCHEMA_NAME); } + + @Override + public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + jdbcProperties.forEach( + (key, value) -> jdbcUrlBuilder.append(key).append("=").append(value).append(";")); + return jdbcUrlBuilder.toString(); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java index 0327079a0..0666cb9d4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -49,6 +49,10 @@ public static void main(String[] args) throws Exception { sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1"); sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456"); sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1"); + sourceConfig.put("jdbc.properties.resultSetHoldability", "1"); + sourceConfig.put("jdbc.properties.SSL", "false"); Configuration config = Configuration.fromMap(sourceConfig); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index e0c0b828e..c430ea87b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -53,6 +53,8 @@ public static void main(String[] args) throws Exception { mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306"); mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root"); mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "12345678"); + // add jdbc properties for MySQL + mysqlConfig.put("jdbc.properties.use_ssl", "false"); Configuration config = Configuration.fromMap(mysqlConfig); Map sinkConfig = new HashMap<>(); @@ -61,6 +63,7 @@ public static void main(String[] args) throws Exception { sinkConfig.put(DorisConfigOptions.PASSWORD.key(), ""); sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString()); + sinkConfig.put("sink.enable-delete", "false"); Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index f1e61e72e..99892e022 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -49,6 +49,8 @@ public static void main(String[] args) throws Exception { sourceConfig.put(PostgresSourceOptions.PG_PORT.key(), "5432"); sourceConfig.put(PostgresSourceOptions.USERNAME.key(), "postgres"); sourceConfig.put(PostgresSourceOptions.PASSWORD.key(), "123456"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.ssl", "false"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 4e343239a..ca6a3121b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -47,6 +47,9 @@ public static void main(String[] args) throws Exception { sourceConfig.put(DatabaseSyncConfig.PORT, "1433"); sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "sa"); sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "Passw@rd"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.encrypt", "false"); + sourceConfig.put("jdbc.properties.integratedSecurity", "false"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index f0cd0a51f..859a87208 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -20,16 +20,22 @@ import org.apache.flink.configuration.Configuration; import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; import org.jetbrains.annotations.NotNull; +import org.junit.Assert; import org.junit.Test; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -169,4 +175,95 @@ public void singleSinkTablePatternTest() throws SQLException { assertFalse("ssb_test.dates".matches(syncTableListPattern)); assertFalse("ssb_test.lineorder".matches(syncTableListPattern)); } + + @Test + public void getJdbcPropertiesTest() throws Exception { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("jdbc.properties.use_ssl", "false"); + + Configuration config = Configuration.fromMap(mysqlConfig); + databaseSync.setConfig(config); + Properties jdbcProperties = databaseSync.getJdbcProperties(); + Assert.assertEquals(1, jdbcProperties.size()); + Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl")); + } + + @Test + public void getJdbcUrlTemplateTest() throws SQLException { + String mysqlJdbcTemplate = "jdbc:mysql://%s:%d?useInformationSchema=true"; + String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?"; + String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;"; + String db2JdbcTemplate = "jdbc:db2://%s:%d/%s"; + + // mysql jdbc properties configuration + DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync(); + Map mysqlJdbcConfig = new LinkedHashMap<>(); + mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false"); + + DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync(); + Map postgresJdbcConfig = new LinkedHashMap<>(); + postgresJdbcConfig.put("jdbc.properties.ssl", "false"); + + DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync(); + Map sqlServerJdbcConfig = new LinkedHashMap<>(); + sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false"); + sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false"); + + DatabaseSync db2DatabaseSync = new Db2DatabaseSync(); + Map db2JdbcConfig = new LinkedHashMap<>(); + db2JdbcConfig.put("jdbc.properties.ssl", "false"); + db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1"); + db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1"); + + Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig); + mysqlDatabaseSync.setConfig(mysqlConfig); + + Configuration postgresConfig = Configuration.fromMap(postgresJdbcConfig); + postgresDatabaseSync.setConfig(postgresConfig); + + Configuration sqlServerConfig = Configuration.fromMap(sqlServerJdbcConfig); + sqlServerDatabaseSync.setConfig(sqlServerConfig); + + Configuration db2Config = Configuration.fromMap(db2JdbcConfig); + db2DatabaseSync.setConfig(db2Config); + + Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, mysqlJdbcProperties.size()); + Assert.assertEquals("false", mysqlJdbcProperties.getProperty("use_ssl")); + String mysqlJdbcUrlTemplate = + mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate, mysqlJdbcProperties); + Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false", mysqlJdbcUrlTemplate); + + Properties postgresJdbcProperties = postgresDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, postgresJdbcProperties.size()); + Assert.assertEquals("false", postgresJdbcProperties.getProperty("ssl")); + String postgresJdbcUrlTemplate = + postgresDatabaseSync.getJdbcUrlTemplate( + postgresJdbcTemplate, postgresJdbcProperties); + Assert.assertEquals(postgresJdbcTemplate + "&ssl=false", postgresJdbcUrlTemplate); + + Properties sqlServerJdbcProperties = sqlServerDatabaseSync.getJdbcProperties(); + Assert.assertEquals(2, sqlServerJdbcProperties.size()); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("encrypt")); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("integratedSecurity")); + String sqlServerJdbcUrlTemplate = + sqlServerDatabaseSync.getJdbcUrlTemplate( + sqlServerJdbcTemplate, sqlServerJdbcProperties); + Assert.assertEquals( + sqlServerJdbcTemplate + "encrypt=false;integratedSecurity=false;", + sqlServerJdbcUrlTemplate); + + Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties(); + Assert.assertEquals(3, db2JdbcProperties.size()); + Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("resultSetHoldability")); + String db2JdbcUrlTemplate = + db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate, db2JdbcProperties); + Assert.assertEquals( + db2JdbcTemplate + + ":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;", + db2JdbcUrlTemplate); + } }