Skip to content

Commit

Permalink
0.6.4
Browse files Browse the repository at this point in the history
0.6.4
  • Loading branch information
aiwenmo authored Jun 5, 2022
2 parents 0da8fed + 33581ae commit cdda5ec
Show file tree
Hide file tree
Showing 68 changed files with 423 additions and 250 deletions.
3 changes: 2 additions & 1 deletion .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ body:
Which version of Dinky are you running? We only accept bugs report from the LTS projects.
options:
- dev
- 0.6.3-SNAPSHOT
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
Expand Down
2 changes: 1 addition & 1 deletion dlink-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/dlink-alert-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/dlink-alert-dingtalk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/dlink-alert-email/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-alert-email</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/dlink-alert-feishu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-alert-feishu</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/dlink-alert-wechat/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-alert/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-1.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-1.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-1.14/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-1.15/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/dlink-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,22 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
Properties debeziumProperties = new Properties();
// 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long");
debeziumProperties.setProperty("decimal.handling.mode","string");

for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}

// 添加jdbc参数注入
Properties jdbcProperties = new Properties();
for (Map.Entry<String, String> entry : config.getJdbc().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
jdbcProperties.setProperty(entry.getKey(), entry.getValue());
}
}

Expand All @@ -90,7 +102,8 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,61 +60,61 @@ public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataD
}

private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);

TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);

return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size());
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size());
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size());
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size());
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
}
}
}, rowTypeInfo);
}, rowTypeInfo);
}

private void addTableSink(
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {
CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream,
Table table,
List<String> columnNameList) {

String sinkTableName = getSinkTableName(table);
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
Expand Down Expand Up @@ -148,10 +148,10 @@ public SinkBuilder create(FlinkCDCConfig config) {

@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
Expand Down Expand Up @@ -263,23 +263,25 @@ protected Object convertValue(Object value, LogicalType logicalType) {
return null;
}
if (logicalType instanceof DateType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate();
}else {
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate();
}
} else if (logicalType instanceof TimestampType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime();
}else {
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
} else if (logicalType instanceof BigIntType) {
if(value instanceof Integer){
if (value instanceof Integer) {
return ((Integer) value).longValue();
}else {
} else {
return value;
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.14/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.4-SNAPSHOT</version>
<version>0.6.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading

0 comments on commit cdda5ec

Please sign in to comment.