Skip to content

Commit

Permalink
[improve](cdc) add jdbc properties configuration (apache#450)
Browse files Browse the repository at this point in the history
In the previous version, we were unable to configure the JDBC URL properties for Postgres, SqlServer, and Db2, which might cause some connection issues in certain scenarios.
  • Loading branch information
vinlee19 authored Aug 12, 2024
1 parent e76775a commit 1869a7d
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;

public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private static final String TABLE_NAME_OPTIONS = "table-name";
Expand Down Expand Up @@ -483,6 +485,25 @@ private void handleTableCreationFailure(Exception ex) throws DorisSystemExceptio
}
}

protected Properties getJdbcProperties() {
Properties jdbcProps = new Properties();
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
return jdbcProps;
}

protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
jdbcProperties.forEach(
(key, value) -> jdbcUrlBuilder.append("&").append(key).append("=").append(value));
return jdbcUrlBuilder.toString();
}

public DatabaseSync setEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public void registerDriver() throws SQLException {

@Override
public Connection getConnection() throws SQLException {
Properties jdbcProperties = getJdbcProperties();
String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
JDBC_URL,
jdbcUrlTemplate,
config.get(JdbcSourceOptions.HOSTNAME),
config.get(PORT),
config.get(JdbcSourceOptions.DATABASE_NAME));
Expand Down Expand Up @@ -224,4 +226,21 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
public String getTableListPrefix() {
return config.get(JdbcSourceOptions.SCHEMA_NAME);
}

@Override
protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
boolean firstParam = true;
for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
if (firstParam) {
jdbcUrlBuilder.append(":").append(key).append("=").append(value).append(";");
firstParam = false;
} else {
jdbcUrlBuilder.append(key).append("=").append(value).append(";");
}
}
return jdbcUrlBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;

public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class);
private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true";
private static final String PROPERTIES_PREFIX = "jdbc.properties.";

public MysqlDatabaseSync() throws SQLException {
super();
Expand All @@ -83,12 +84,10 @@ public void registerDriver() throws SQLException {
@Override
public Connection getConnection() throws SQLException {
Properties jdbcProperties = getJdbcProperties();
StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL);
jdbcProperties.forEach(
(key, value) -> jdbcUrlSb.append("&").append(key).append("=").append(value));
String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
jdbcUrlSb.toString(),
jdbcUrlTemplate,
config.get(MySqlSourceOptions.HOSTNAME),
config.get(MySqlSourceOptions.PORT));

Expand Down Expand Up @@ -269,16 +268,4 @@ private Map<ObjectPath, String> getChunkColumnMap() {
}
return chunkMap;
}

private Properties getJdbcProperties() {
Properties jdbcProps = new Properties();
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
return jdbcProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
public class PostgresDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class);

private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s?";

public PostgresDatabaseSync() throws SQLException {
super();
Expand All @@ -84,9 +84,11 @@ public void registerDriver() throws SQLException {

@Override
public Connection getConnection() throws SQLException {
Properties jdbcProperties = getJdbcProperties();
String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
JDBC_URL,
jdbcUrlTemplate,
config.get(PostgresSourceOptions.HOSTNAME),
config.get(PostgresSourceOptions.PG_PORT),
config.get(PostgresSourceOptions.DATABASE_NAME));
Expand Down Expand Up @@ -227,7 +229,24 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {

@Override
public String getTableListPrefix() {
String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
return schemaName;
return config.get(PostgresSourceOptions.SCHEMA_NAME);
}

@Override
protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {

if (!initialJdbcUrl.startsWith("?")) {
return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties);
}
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
int recordIndex = 0;
for (Map.Entry<Object, Object> entry : jdbcProperties.entrySet()) {
jdbcUrlBuilder.append(entry.getKey()).append("=").append(entry.getValue());
if (recordIndex < jdbcProperties.size() - 1) {
jdbcUrlBuilder.append("&");
recordIndex++;
}
}
return jdbcUrlBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

public class SqlServerDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class);
private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s";
private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s;";
private static final String PORT = "port";

public SqlServerDatabaseSync() throws SQLException {
Expand All @@ -82,9 +82,11 @@ public void registerDriver() throws SQLException {

@Override
public Connection getConnection() throws SQLException {
Properties jdbcProperties = getJdbcProperties();
String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties);
String jdbcUrl =
String.format(
JDBC_URL,
jdbcUrlTemplate,
config.get(JdbcSourceOptions.HOSTNAME),
config.getInteger(PORT, 1433),
config.get(JdbcSourceOptions.DATABASE_NAME));
Expand Down Expand Up @@ -216,4 +218,12 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
public String getTableListPrefix() {
return config.get(JdbcSourceOptions.SCHEMA_NAME);
}

@Override
public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
jdbcProperties.forEach(
(key, value) -> jdbcUrlBuilder.append(key).append("=").append(value).append(";"));
return jdbcUrlBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public static void main(String[] args) throws Exception {
sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
// add jdbc properties configuration
sourceConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1");
sourceConfig.put("jdbc.properties.resultSetHoldability", "1");
sourceConfig.put("jdbc.properties.SSL", "false");

Configuration config = Configuration.fromMap(sourceConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public static void main(String[] args) throws Exception {
mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");
mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root");
mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "12345678");
// add jdbc properties for MySQL
mysqlConfig.put("jdbc.properties.use_ssl", "false");
Configuration config = Configuration.fromMap(mysqlConfig);

Map<String, String> sinkConfig = new HashMap<>();
Expand All @@ -61,6 +63,7 @@ public static void main(String[] args) throws Exception {
sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "");
sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://10.20.30.1:9030");
sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
sinkConfig.put("sink.enable-delete", "false");
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public static void main(String[] args) throws Exception {
sourceConfig.put(PostgresSourceOptions.PG_PORT.key(), "5432");
sourceConfig.put(PostgresSourceOptions.USERNAME.key(), "postgres");
sourceConfig.put(PostgresSourceOptions.PASSWORD.key(), "123456");
// add jdbc properties configuration
sourceConfig.put("jdbc.properties.ssl", "false");
// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
// sourceConfig.put("scan.incremental.snapshot.enabled","true");
// sourceConfig.put("debezium.include.schema.changes","false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public static void main(String[] args) throws Exception {
sourceConfig.put(DatabaseSyncConfig.PORT, "1433");
sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "sa");
sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "Passw@rd");
// add jdbc properties configuration
sourceConfig.put("jdbc.properties.encrypt", "false");
sourceConfig.put("jdbc.properties.integratedSecurity", "false");
// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
// sourceConfig.put("scan.incremental.snapshot.enabled","true");
// sourceConfig.put("debezium.include.schema.changes","false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@
import org.apache.flink.configuration.Configuration;

import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Test;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -169,4 +175,95 @@ public void singleSinkTablePatternTest() throws SQLException {
assertFalse("ssb_test.dates".matches(syncTableListPattern));
assertFalse("ssb_test.lineorder".matches(syncTableListPattern));
}

@Test
public void getJdbcPropertiesTest() throws Exception {
DatabaseSync databaseSync = new MysqlDatabaseSync();
Map<String, String> mysqlConfig = new HashMap<>();
mysqlConfig.put("jdbc.properties.use_ssl", "false");

Configuration config = Configuration.fromMap(mysqlConfig);
databaseSync.setConfig(config);
Properties jdbcProperties = databaseSync.getJdbcProperties();
Assert.assertEquals(1, jdbcProperties.size());
Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl"));
}

@Test
public void getJdbcUrlTemplateTest() throws SQLException {
String mysqlJdbcTemplate = "jdbc:mysql://%s:%d?useInformationSchema=true";
String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?";
String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;";
String db2JdbcTemplate = "jdbc:db2://%s:%d/%s";

// mysql jdbc properties configuration
DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync();
Map<String, String> mysqlJdbcConfig = new LinkedHashMap<>();
mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false");

DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync();
Map<String, String> postgresJdbcConfig = new LinkedHashMap<>();
postgresJdbcConfig.put("jdbc.properties.ssl", "false");

DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync();
Map<String, String> sqlServerJdbcConfig = new LinkedHashMap<>();
sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false");
sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false");

DatabaseSync db2DatabaseSync = new Db2DatabaseSync();
Map<String, String> db2JdbcConfig = new LinkedHashMap<>();
db2JdbcConfig.put("jdbc.properties.ssl", "false");
db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1");
db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1");

Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig);
mysqlDatabaseSync.setConfig(mysqlConfig);

Configuration postgresConfig = Configuration.fromMap(postgresJdbcConfig);
postgresDatabaseSync.setConfig(postgresConfig);

Configuration sqlServerConfig = Configuration.fromMap(sqlServerJdbcConfig);
sqlServerDatabaseSync.setConfig(sqlServerConfig);

Configuration db2Config = Configuration.fromMap(db2JdbcConfig);
db2DatabaseSync.setConfig(db2Config);

Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties();
Assert.assertEquals(1, mysqlJdbcProperties.size());
Assert.assertEquals("false", mysqlJdbcProperties.getProperty("use_ssl"));
String mysqlJdbcUrlTemplate =
mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate, mysqlJdbcProperties);
Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false", mysqlJdbcUrlTemplate);

Properties postgresJdbcProperties = postgresDatabaseSync.getJdbcProperties();
Assert.assertEquals(1, postgresJdbcProperties.size());
Assert.assertEquals("false", postgresJdbcProperties.getProperty("ssl"));
String postgresJdbcUrlTemplate =
postgresDatabaseSync.getJdbcUrlTemplate(
postgresJdbcTemplate, postgresJdbcProperties);
Assert.assertEquals(postgresJdbcTemplate + "&ssl=false", postgresJdbcUrlTemplate);

Properties sqlServerJdbcProperties = sqlServerDatabaseSync.getJdbcProperties();
Assert.assertEquals(2, sqlServerJdbcProperties.size());
Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("encrypt"));
Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("integratedSecurity"));
String sqlServerJdbcUrlTemplate =
sqlServerDatabaseSync.getJdbcUrlTemplate(
sqlServerJdbcTemplate, sqlServerJdbcProperties);
Assert.assertEquals(
sqlServerJdbcTemplate + "encrypt=false;integratedSecurity=false;",
sqlServerJdbcUrlTemplate);

Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties();
Assert.assertEquals(3, db2JdbcProperties.size());
Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl"));
Assert.assertEquals("1", db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet"));
Assert.assertEquals("1", db2JdbcProperties.getProperty("resultSetHoldability"));
String db2JdbcUrlTemplate =
db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate, db2JdbcProperties);
Assert.assertEquals(
db2JdbcTemplate
+ ":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;",
db2JdbcUrlTemplate);
}
}

0 comments on commit 1869a7d

Please sign in to comment.