Skip to content

Commit

Permalink
[Feature-570][client] Add CDCSOURCE jdbc properties and upgrade flink…
Browse files Browse the repository at this point in the history
… cdc version

为mysql source cdc添加jdbc连接参数;升级1.13和1.14 cdc连接版本(2.2.0->2.2.1)
  • Loading branch information
aiwenmo authored Jun 5, 2022
2 parents c1227bd + a56503a commit 4842048
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,22 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");

for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}

// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}

Expand All @@ -90,7 +102,8 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,22 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");

for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}

// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}

Expand All @@ -90,7 +102,8 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,22 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");

for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}

// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}

Expand All @@ -90,7 +102,8 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class FlinkCDCConfig {
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> jdbc;
private Map<String, String> sink;
private List<Schema> schemaList;
private String schemaFieldName;
Expand All @@ -34,7 +35,7 @@ public FlinkCDCConfig() {

public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink,Map<String, String> jdbc) {
this.type = type;
this.hostname = hostname;
this.port = port;
Expand All @@ -49,6 +50,7 @@ public FlinkCDCConfig(String type, String hostname, Integer port, String usernam
this.debezium = debezium;
this.source = source;
this.sink = sink;
this.jdbc = jdbc;
}

public String getType() {
Expand Down Expand Up @@ -217,6 +219,14 @@ public Map<String, String> getDebezium() {
return debezium;
}

public Map<String, String> getJdbc() {
return jdbc;
}

public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}

public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}
Expand Down
51 changes: 37 additions & 14 deletions dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public class CDCSource {
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> jdbc;
private Map<String, String> source;
private Map<String, String> sink;

public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this.connector = connector;
this.statement = statement;
this.name = name;
Expand All @@ -47,6 +48,7 @@ public CDCSource(String connector, String statement, String name, String hostnam
this.parallelism = parallelism;
this.startupMode = startupMode;
this.debezium = debezium;
this.jdbc = jdbc;
this.source = source;
this.sink = sink;
}
Expand Down Expand Up @@ -74,6 +76,18 @@ public static CDCSource build(String statement) {
}
}
}
// jdbc参数(jdbc.properties.*)
Map<String, String> jdbc = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("jdbc.properties.")) {
String key = entry.getKey();
key = key.replaceFirst("jdbc.properties.", "");
if (!jdbc.containsKey(key)) {
jdbc.put(key, entry.getValue());
}
}
}

Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
Expand All @@ -85,19 +99,20 @@ public static CDCSource build(String statement) {
}
}
CDCSource cdcSource = new CDCSource(
config.get("connector"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink
config.get("connector"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink,
jdbc
);
if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name"));
Expand Down Expand Up @@ -250,4 +265,12 @@ public Map<String, String> getSource() {
public void setSource(Map<String, String> source) {
this.source = source;
}

public Map<String, String> getJdbc() {
return jdbc;
}

public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TableResult build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink());
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(),cdcSource.getJdbc());
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
Expand Down
2 changes: 1 addition & 1 deletion dlink-flink/dlink-flink-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<flinkcdc.version>2.2.1</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
Expand Down
2 changes: 1 addition & 1 deletion dlink-flink/dlink-flink-1.14/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.4</flink.version>
<flinkcdc.version>2.2.0</flinkcdc.version>
<flinkcdc.version>2.2.1</flinkcdc.version>
<commons.version>1.3.1</commons.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand Down

0 comments on commit 4842048

Please sign in to comment.