Skip to content

Commit

Permalink
[cdc] In schema building phase of mysql cdc, the connection should ha…
Browse files Browse the repository at this point in the history
…ndle jdbc properties (#2507)
  • Loading branch information
yuzelin authored Dec 14, 2023
1 parent 5a8b597 commit 57e8dc3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;

Expand All @@ -66,17 +67,25 @@ public class MySqlActionUtils {
.withDescription(
"Whether capture the scan the newly added tables or not, by default is true.");

static Connection getConnection(Configuration mySqlConfig, boolean tinyint1NotBool)
static Connection getConnection(Configuration mySqlConfig, Map<String, String> jdbcProperties)
throws Exception {
String paramString = "";
if (!jdbcProperties.isEmpty()) {
paramString =
"?"
+ jdbcProperties.entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("&"));
}

String url =
String.format(
"jdbc:mysql://%s:%d%s",
mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
mySqlConfig.get(MySqlSourceOptions.PORT),
// we need to add the `tinyInt1isBit` parameter to the connection url to
// make sure the tinyint(1) in MySQL is converted to bits or not. Refer to
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-result-sets.html#cj-conn-prop_tinyInt1isBit
tinyint1NotBool ? "?tinyInt1isBit=false" : "");
paramString);

LOG.info("Connect to MySQL server using url: {}", url);

return DriverManager.getConnection(
url,
Expand All @@ -93,9 +102,9 @@ public static MySqlSchemasInfo getMySqlTableInfos(
Pattern databasePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
MySqlSchemasInfo mySqlSchemasInfo = new MySqlSchemasInfo();
try (Connection conn =
MySqlActionUtils.getConnection(
mySqlConfig, typeMapping.containsMode(TINYINT1_NOT_BOOL))) {
Map<String, String> jdbcProperties = getJdbcProperties(typeMapping, mySqlConfig);

try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig, jdbcProperties)) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet schemas = metaData.getCatalogs()) {
while (schemas.next()) {
Expand Down Expand Up @@ -129,7 +138,7 @@ public static MySqlSchemasInfo getMySqlTableInfos(
}

public static MySqlSource<String> buildMySqlSource(
Configuration mySqlConfig, String tableList) {
Configuration mySqlConfig, String tableList, TypeMapping typeMapping) {
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();

sourceBuilder
Expand Down Expand Up @@ -195,18 +204,18 @@ public static MySqlSource<String> buildMySqlSource(
}

Properties jdbcProperties = new Properties();
jdbcProperties.putAll(getJdbcProperties(typeMapping, mySqlConfig));
sourceBuilder.jdbcProperties(jdbcProperties);

Properties debeziumProperties = new Properties();
for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
} else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
debeziumProperties.put(
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
}
}
sourceBuilder.jdbcProperties(jdbcProperties);
sourceBuilder.debeziumProperties(debeziumProperties);

Map<String, Object> customConverterConfigs = new HashMap<>();
Expand All @@ -223,6 +232,37 @@ public static MySqlSource<String> buildMySqlSource(
.build();
}

// see
// https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options
// https://dev.mysql.com/doc/connectors/en/connector-j-reference-configuration-properties.html
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Map<String, String> jdbcProperties =
mySqlConfig.toMap().entrySet().stream()
.filter(e -> e.getKey().startsWith(JdbcUrlUtils.PROPERTIES_PREFIX))
.collect(
Collectors.toMap(
e ->
e.getKey()
.substring(
JdbcUrlUtils.PROPERTIES_PREFIX
.length()),
Map.Entry::getValue));

if (typeMapping.containsMode(TINYINT1_NOT_BOOL)) {
String tinyInt1isBit = jdbcProperties.get("tinyInt1isBit");
if (tinyInt1isBit == null) {
jdbcProperties.put("tinyInt1isBit", "false");
} else if ("true".equals(jdbcProperties.get("tinyInt1isBit"))) {
throw new IllegalArgumentException(
"Type mapping option 'tinyint1-not-bool' conflicts with jdbc properties 'jdbc.properties.tinyInt1isBit=true'. "
+ "Option 'tinyint1-not-bool' is equal to 'jdbc.properties.tinyInt1isBit=false'.");
}
}

return jdbcProperties;
}

public static void registerJdbcDriver() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ protected MySqlSource<String> buildSource() {
cdcSourceConfig.get(MySqlSourceOptions.DATABASE_NAME),
includingTables,
monitoredTables,
excludedTables));
excludedTables),
typeMapping);
}

private void logNonPkTables(List<Identifier> nonPkTables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected MySqlSource<String> buildSource() {
mySqlSchemasInfo.pkTables().stream()
.map(i -> i.getDatabaseName() + "\\." + i.getObjectName())
.collect(Collectors.joining("|"));
return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList);
return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList, typeMapping);
}

private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -43,6 +45,7 @@
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT test for {@link TypeMapping} in MySQL CDC. */
public class MySqlCdcTypeMappingITCase extends MySqlActionITCaseBase {
Expand All @@ -61,6 +64,10 @@ public void testTinyInt1NotBool() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "tinyint1_not_bool_test");

// test tinyInt1isBit compatibility and url building
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "false");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "useSSL", "false");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
.withTableConfig(getBasicTableConfig())
Expand Down Expand Up @@ -113,6 +120,28 @@ public void testTinyInt1NotBool() throws Exception {
}
}

@Test
public void testConflictTinyInt1NotBool() {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "tinyint1_not_bool_test");
mySqlConfig.put(JdbcUrlUtils.PROPERTIES_PREFIX + "tinyInt1isBit", "true");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
.withTableConfig(getBasicTableConfig())
.withMode(COMBINED.configString())
.withTypeMappingModes(TINYINT1_NOT_BOOL.configString())
.build();

assertThatThrownBy(action::run)
.satisfies(
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
"Type mapping option 'tinyint1-not-bool' conflicts with "
+ "jdbc properties 'jdbc.properties.tinyInt1isBit=true'. "
+ "Option 'tinyint1-not-bool' is equal to 'jdbc.properties.tinyInt1isBit=false'."));
}

// --------------------------------------- all-to-string ---------------------------------------

@Test
Expand Down

0 comments on commit 57e8dc3

Please sign in to comment.