diff --git a/.github/ISSUE_TEMPLATE/bug-report.yml b/.github/ISSUE_TEMPLATE/bug-report.yml index 7394710170..65e2c26039 100644 --- a/.github/ISSUE_TEMPLATE/bug-report.yml +++ b/.github/ISSUE_TEMPLATE/bug-report.yml @@ -95,7 +95,8 @@ body: Which version of Dinky are you running? We only accept bugs report from the LTS projects. options: - dev - - 0.6.3-SNAPSHOT + - 0.6.4 + - 0.6.3 - 0.6.2 - 0.6.1 - 0.6.0 diff --git a/dlink-admin/pom.xml b/dlink-admin/pom.xml index 40ab0957dc..3aa2ef5ade 100644 --- a/dlink-admin/pom.xml +++ b/dlink-admin/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-alert/dlink-alert-base/pom.xml b/dlink-alert/dlink-alert-base/pom.xml index a8752b5fc5..b68b0e3452 100644 --- a/dlink-alert/dlink-alert-base/pom.xml +++ b/dlink-alert/dlink-alert-base/pom.xml @@ -5,7 +5,7 @@ dlink-alert com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-alert/dlink-alert-dingtalk/pom.xml b/dlink-alert/dlink-alert-dingtalk/pom.xml index 836076bb7e..d4f949efde 100644 --- a/dlink-alert/dlink-alert-dingtalk/pom.xml +++ b/dlink-alert/dlink-alert-dingtalk/pom.xml @@ -5,7 +5,7 @@ dlink-alert com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-alert/dlink-alert-email/pom.xml b/dlink-alert/dlink-alert-email/pom.xml index bac6c051c1..5b4d72eaf1 100644 --- a/dlink-alert/dlink-alert-email/pom.xml +++ b/dlink-alert/dlink-alert-email/pom.xml @@ -5,7 +5,7 @@ dlink-alert com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 dlink-alert-email diff --git a/dlink-alert/dlink-alert-feishu/pom.xml b/dlink-alert/dlink-alert-feishu/pom.xml index e7b5b254c1..aaef61225d 100644 --- a/dlink-alert/dlink-alert-feishu/pom.xml +++ b/dlink-alert/dlink-alert-feishu/pom.xml @@ -5,7 +5,7 @@ dlink-alert com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 dlink-alert-feishu diff --git a/dlink-alert/dlink-alert-wechat/pom.xml b/dlink-alert/dlink-alert-wechat/pom.xml index 1820c89d3d..86991890e3 100644 --- a/dlink-alert/dlink-alert-wechat/pom.xml +++ b/dlink-alert/dlink-alert-wechat/pom.xml @@ -5,7 +5,7 @@ dlink-alert com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-alert/pom.xml b/dlink-alert/pom.xml index 7ef3d09c4c..0e83e3ca68 100644 --- a/dlink-alert/pom.xml +++ b/dlink-alert/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-1.11/pom.xml b/dlink-app/dlink-app-1.11/pom.xml index 820a3bc710..876485f6a3 100644 --- a/dlink-app/dlink-app-1.11/pom.xml +++ b/dlink-app/dlink-app-1.11/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-1.12/pom.xml b/dlink-app/dlink-app-1.12/pom.xml index 2843cb80e7..6a2ac5e341 100644 --- a/dlink-app/dlink-app-1.12/pom.xml +++ b/dlink-app/dlink-app-1.12/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-1.13/pom.xml b/dlink-app/dlink-app-1.13/pom.xml index 19751b7c24..3767036a98 100644 --- a/dlink-app/dlink-app-1.13/pom.xml +++ b/dlink-app/dlink-app-1.13/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-1.14/pom.xml b/dlink-app/dlink-app-1.14/pom.xml index 0a9a98b278..fa90a7751e 100644 --- a/dlink-app/dlink-app-1.14/pom.xml +++ b/dlink-app/dlink-app-1.14/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-1.15/pom.xml b/dlink-app/dlink-app-1.15/pom.xml index eafbafc356..ad25a45147 100644 --- a/dlink-app/dlink-app-1.15/pom.xml +++ b/dlink-app/dlink-app-1.15/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/dlink-app-base/pom.xml b/dlink-app/dlink-app-base/pom.xml index 1a25dcba37..28f8724513 100644 --- a/dlink-app/dlink-app-base/pom.xml +++ b/dlink-app/dlink-app-base/pom.xml @@ -5,7 +5,7 @@ dlink-app com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-app/pom.xml b/dlink-app/pom.xml index 79801cb471..59af48d421 100644 --- a/dlink-app/pom.xml +++ b/dlink-app/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-assembly/pom.xml b/dlink-assembly/pom.xml index 4ecb964491..6e96d55749 100644 --- a/dlink-assembly/pom.xml +++ b/dlink-assembly/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.11/pom.xml b/dlink-client/dlink-client-1.11/pom.xml index 62a82f3961..d238ed7a56 100644 --- a/dlink-client/dlink-client-1.11/pom.xml +++ b/dlink-client/dlink-client-1.11/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.12/pom.xml b/dlink-client/dlink-client-1.12/pom.xml index 10246300da..39abd698bf 100644 --- a/dlink-client/dlink-client-1.12/pom.xml +++ b/dlink-client/dlink-client-1.12/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.13/pom.xml b/dlink-client/dlink-client-1.13/pom.xml index eddd09a15c..abd5347795 100644 --- a/dlink-client/dlink-client-1.13/pom.xml +++ b/dlink-client/dlink-client-1.13/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 7a501d0c0b..fb045dc9c2 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -62,10 +62,22 @@ public DataStreamSource build(StreamExecutionEnvironment env) { String connectionPoolSize = config.getSource().get("connection.pool.size"); String heartbeatInterval = config.getSource().get("heartbeat.interval"); - Properties properties = new Properties(); + Properties debeziumProperties = new Properties(); + // 为部分转换添加默认值 + debeziumProperties.setProperty("bigint.unsigned.handling.mode","long"); + debeziumProperties.setProperty("decimal.handling.mode","string"); + for (Map.Entry entry : config.getDebezium().entrySet()) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { - properties.setProperty(entry.getKey(), entry.getValue()); + debeziumProperties.setProperty(entry.getKey(), entry.getValue()); + } + } + + // 添加jdbc参数注入 + Properties jdbcProperties = new Properties(); + for (Map.Entry entry : config.getJdbc().entrySet()) { + if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { + jdbcProperties.setProperty(entry.getKey(), entry.getValue()); } } @@ -90,7 +102,8 @@ public DataStreamSource build(StreamExecutionEnvironment env) { } sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); - sourceBuilder.debeziumProperties(properties); + sourceBuilder.debeziumProperties(debeziumProperties); + sourceBuilder.jdbcProperties(jdbcProperties); if (Asserts.isNotNullString(config.getStartupMode())) { switch (config.getStartupMode().toLowerCase()) { diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index c212bf2cb7..dcd12220bc 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -60,9 +60,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -70,51 +70,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } private void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -148,10 +148,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -263,23 +263,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-1.14/pom.xml b/dlink-client/dlink-client-1.14/pom.xml index b9f0dc093e..6e2f7211e4 100644 --- a/dlink-client/dlink-client-1.14/pom.xml +++ b/dlink-client/dlink-client-1.14/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 7a501d0c0b..fb045dc9c2 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -62,10 +62,22 @@ public DataStreamSource build(StreamExecutionEnvironment env) { String connectionPoolSize = config.getSource().get("connection.pool.size"); String heartbeatInterval = config.getSource().get("heartbeat.interval"); - Properties properties = new Properties(); + Properties debeziumProperties = new Properties(); + // 为部分转换添加默认值 + debeziumProperties.setProperty("bigint.unsigned.handling.mode","long"); + debeziumProperties.setProperty("decimal.handling.mode","string"); + for (Map.Entry entry : config.getDebezium().entrySet()) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { - properties.setProperty(entry.getKey(), entry.getValue()); + debeziumProperties.setProperty(entry.getKey(), entry.getValue()); + } + } + + // 添加jdbc参数注入 + Properties jdbcProperties = new Properties(); + for (Map.Entry entry : config.getJdbc().entrySet()) { + if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { + jdbcProperties.setProperty(entry.getKey(), entry.getValue()); } } @@ -90,7 +102,8 @@ public DataStreamSource build(StreamExecutionEnvironment env) { } sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); - sourceBuilder.debeziumProperties(properties); + sourceBuilder.debeziumProperties(debeziumProperties); + sourceBuilder.jdbcProperties(jdbcProperties); if (Asserts.isNotNullString(config.getStartupMode())) { switch (config.getStartupMode().toLowerCase()) { diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index b9caac7e0b..5c189c57d7 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } private void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-1.15/pom.xml b/dlink-client/dlink-client-1.15/pom.xml index 53725b3bc3..a7a9545bf5 100644 --- a/dlink-client/dlink-client-1.15/pom.xml +++ b/dlink-client/dlink-client-1.15/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java index 7a501d0c0b..fb045dc9c2 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/mysql/MysqlCDCBuilder.java @@ -62,10 +62,22 @@ public DataStreamSource build(StreamExecutionEnvironment env) { String connectionPoolSize = config.getSource().get("connection.pool.size"); String heartbeatInterval = config.getSource().get("heartbeat.interval"); - Properties properties = new Properties(); + Properties debeziumProperties = new Properties(); + // 为部分转换添加默认值 + debeziumProperties.setProperty("bigint.unsigned.handling.mode","long"); + debeziumProperties.setProperty("decimal.handling.mode","string"); + for (Map.Entry entry : config.getDebezium().entrySet()) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { - properties.setProperty(entry.getKey(), entry.getValue()); + debeziumProperties.setProperty(entry.getKey(), entry.getValue()); + } + } + + // 添加jdbc参数注入 + Properties jdbcProperties = new Properties(); + for (Map.Entry entry : config.getJdbc().entrySet()) { + if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { + jdbcProperties.setProperty(entry.getKey(), entry.getValue()); } } @@ -90,7 +102,8 @@ public DataStreamSource build(StreamExecutionEnvironment env) { } sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); - sourceBuilder.debeziumProperties(properties); + sourceBuilder.debeziumProperties(debeziumProperties); + sourceBuilder.jdbcProperties(jdbcProperties); if (Asserts.isNotNullString(config.getStartupMode())) { switch (config.getStartupMode().toLowerCase()) { diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index bf003ba0a9..77adb8b625 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } public void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-base/pom.xml b/dlink-client/dlink-client-base/pom.xml index cccf37d7b2..eee639b167 100644 --- a/dlink-client/dlink-client-base/pom.xml +++ b/dlink-client/dlink-client-base/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java b/dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java index 47cbe0c316..6b9cf88bea 100644 --- a/dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java +++ b/dlink-client/dlink-client-base/src/main/java/com/dlink/model/FlinkCDCConfig.java @@ -25,6 +25,7 @@ public class FlinkCDCConfig { private String startupMode; private Map debezium; private Map source; + private Map jdbc; private Map sink; private List schemaList; private String schemaFieldName; @@ -34,7 +35,7 @@ public FlinkCDCConfig() { public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table, String startupMode, - Map debezium, Map source, Map sink) { + Map debezium, Map source, Map sink,Map jdbc) { this.type = type; this.hostname = hostname; this.port = port; @@ -49,6 +50,7 @@ public FlinkCDCConfig(String type, String hostname, Integer port, String usernam this.debezium = debezium; this.source = source; this.sink = sink; + this.jdbc = jdbc; } public String getType() { @@ -217,6 +219,14 @@ public Map getDebezium() { return debezium; } + public Map getJdbc() { + return jdbc; + } + + public void setJdbc(Map jdbc) { + this.jdbc = jdbc; + } + public void setDebezium(Map debezium) { this.debezium = debezium; } diff --git a/dlink-client/dlink-client-hadoop/pom.xml b/dlink-client/dlink-client-hadoop/pom.xml index 348d222b12..22c5a2c111 100644 --- a/dlink-client/dlink-client-hadoop/pom.xml +++ b/dlink-client/dlink-client-hadoop/pom.xml @@ -5,7 +5,7 @@ dlink-client com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-client/pom.xml b/dlink-client/pom.xml index 16cbca8151..557dd32180 100644 --- a/dlink-client/pom.xml +++ b/dlink-client/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-common/pom.xml b/dlink-common/pom.xml index 5280b670a7..68a61bfa19 100644 --- a/dlink-common/pom.xml +++ b/dlink-common/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-common/src/main/java/com/dlink/constant/CommonConstant.java b/dlink-common/src/main/java/com/dlink/constant/CommonConstant.java index 4d46056688..a5469871f7 100644 --- a/dlink-common/src/main/java/com/dlink/constant/CommonConstant.java +++ b/dlink-common/src/main/java/com/dlink/constant/CommonConstant.java @@ -10,7 +10,7 @@ public interface CommonConstant { /** * 项目版本号(banner使用) */ - String PROJECT_VERSION = "0.6.4-SNAPSHOT"; + String PROJECT_VERSION = "0.6.5"; /** * 实例健康 */ diff --git a/dlink-connectors/dlink-connector-jdbc-1.11/pom.xml b/dlink-connectors/dlink-connector-jdbc-1.11/pom.xml index a3392674ad..174e345c89 100644 --- a/dlink-connectors/dlink-connector-jdbc-1.11/pom.xml +++ b/dlink-connectors/dlink-connector-jdbc-1.11/pom.xml @@ -5,7 +5,7 @@ dlink-connectors com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-connectors/dlink-connector-jdbc-1.12/pom.xml b/dlink-connectors/dlink-connector-jdbc-1.12/pom.xml index 099f2f5305..64f17fdd0e 100644 --- a/dlink-connectors/dlink-connector-jdbc-1.12/pom.xml +++ b/dlink-connectors/dlink-connector-jdbc-1.12/pom.xml @@ -5,7 +5,7 @@ dlink-connectors com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-connectors/dlink-connector-jdbc-1.13/pom.xml b/dlink-connectors/dlink-connector-jdbc-1.13/pom.xml index 249b386965..df2d8a8eb5 100644 --- a/dlink-connectors/dlink-connector-jdbc-1.13/pom.xml +++ b/dlink-connectors/dlink-connector-jdbc-1.13/pom.xml @@ -5,7 +5,7 @@ dlink-connectors com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-connectors/dlink-connector-jdbc-1.14/pom.xml b/dlink-connectors/dlink-connector-jdbc-1.14/pom.xml index fb1a1b3e01..5458d052b7 100644 --- a/dlink-connectors/dlink-connector-jdbc-1.14/pom.xml +++ b/dlink-connectors/dlink-connector-jdbc-1.14/pom.xml @@ -5,7 +5,7 @@ dlink-connectors com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-connectors/dlink-connector-phoenix-1.13/pom.xml b/dlink-connectors/dlink-connector-phoenix-1.13/pom.xml index 6923c823be..3b187e30bd 100644 --- a/dlink-connectors/dlink-connector-phoenix-1.13/pom.xml +++ b/dlink-connectors/dlink-connector-phoenix-1.13/pom.xml @@ -5,7 +5,7 @@ dlink-connectors com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-connectors/pom.xml b/dlink-connectors/pom.xml index b65d316a7a..a4d805684d 100644 --- a/dlink-connectors/pom.xml +++ b/dlink-connectors/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 pom diff --git a/dlink-core/pom.xml b/dlink-core/pom.xml index 73763ecd08..8206f3408c 100644 --- a/dlink-core/pom.xml +++ b/dlink-core/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 jar diff --git a/dlink-daemon/pom.xml b/dlink-daemon/pom.xml index f11ddde30e..d58d25edc8 100644 --- a/dlink-daemon/pom.xml +++ b/dlink-daemon/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-executor/pom.xml b/dlink-executor/pom.xml index 4fe2d23941..9675e1a339 100644 --- a/dlink-executor/pom.xml +++ b/dlink-executor/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java b/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java index 3fd93af63a..13960e076e 100644 --- a/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java +++ b/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java @@ -31,11 +31,12 @@ public class CDCSource { private String table; private String startupMode; private Map debezium; + private Map jdbc; private Map source; private Map sink; public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, - Map debezium, Map source, Map sink) { + Map debezium, Map source, Map sink, Map jdbc) { this.connector = connector; this.statement = statement; this.name = name; @@ -47,6 +48,7 @@ public CDCSource(String connector, String statement, String name, String hostnam this.parallelism = parallelism; this.startupMode = startupMode; this.debezium = debezium; + this.jdbc = jdbc; this.source = source; this.sink = sink; } @@ -74,6 +76,18 @@ public static CDCSource build(String statement) { } } } + // jdbc参数(jdbc.properties.*) + Map jdbc = new HashMap<>(); + for (Map.Entry entry : config.entrySet()) { + if (entry.getKey().startsWith("jdbc.properties.")) { + String key = entry.getKey(); + key = key.replaceFirst("jdbc.properties.", ""); + if (!jdbc.containsKey(key)) { + jdbc.put(key, entry.getValue()); + } + } + } + Map sink = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("sink.")) { @@ -85,19 +99,20 @@ public static CDCSource build(String statement) { } } CDCSource cdcSource = new CDCSource( - config.get("connector"), - statement, - map.get("CDCSOURCE").toString(), - config.get("hostname"), - Integer.valueOf(config.get("port")), - config.get("username"), - config.get("password"), - Integer.valueOf(config.get("checkpoint")), - Integer.valueOf(config.get("parallelism")), - config.get("scan.startup.mode"), - debezium, - source, - sink + config.get("connector"), + statement, + map.get("CDCSOURCE").toString(), + config.get("hostname"), + Integer.valueOf(config.get("port")), + config.get("username"), + config.get("password"), + Integer.valueOf(config.get("checkpoint")), + Integer.valueOf(config.get("parallelism")), + config.get("scan.startup.mode"), + debezium, + source, + sink, + jdbc ); if (Asserts.isNotNullString(config.get("database-name"))) { cdcSource.setDatabase(config.get("database-name")); @@ -250,4 +265,12 @@ public Map getSource() { public void setSource(Map source) { this.source = source; } + + public Map getJdbc() { + return jdbc; + } + + public void setJdbc(Map jdbc) { + this.jdbc = jdbc; + } } diff --git a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java index 5e897b37c2..d7b59b0fe3 100644 --- a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java +++ b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java @@ -54,7 +54,7 @@ public TableResult build(Executor executor) { CDCSource cdcSource = CDCSource.build(statement); FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() - , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink()); + , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(),cdcSource.getJdbc()); try { CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); Map> allConfigMap = cdcBuilder.parseMetaDataConfigs(); diff --git a/dlink-extends/pom.xml b/dlink-extends/pom.xml index a5fc8789c3..2a64198837 100644 --- a/dlink-extends/pom.xml +++ b/dlink-extends/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-flink/dlink-flink-1.11/pom.xml b/dlink-flink/dlink-flink-1.11/pom.xml index 50d7e371a9..6858bb2c57 100644 --- a/dlink-flink/dlink-flink-1.11/pom.xml +++ b/dlink-flink/dlink-flink-1.11/pom.xml @@ -5,7 +5,7 @@ dlink-flink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-flink/dlink-flink-1.12/pom.xml b/dlink-flink/dlink-flink-1.12/pom.xml index d54dc86898..f815780669 100644 --- a/dlink-flink/dlink-flink-1.12/pom.xml +++ b/dlink-flink/dlink-flink-1.12/pom.xml @@ -5,7 +5,7 @@ dlink-flink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-flink/dlink-flink-1.13/pom.xml b/dlink-flink/dlink-flink-1.13/pom.xml index 5c4dd933ab..a071012a10 100644 --- a/dlink-flink/dlink-flink-1.13/pom.xml +++ b/dlink-flink/dlink-flink-1.13/pom.xml @@ -5,7 +5,7 @@ dlink-flink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 @@ -15,7 +15,7 @@ 1.8 UTF-8 1.13.6 - 2.2.0 + 2.2.1 1.8 1.8 4.12 diff --git a/dlink-flink/dlink-flink-1.14/pom.xml b/dlink-flink/dlink-flink-1.14/pom.xml index e37ee35e7a..ced1e4a278 100644 --- a/dlink-flink/dlink-flink-1.14/pom.xml +++ b/dlink-flink/dlink-flink-1.14/pom.xml @@ -5,7 +5,7 @@ dlink-flink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 @@ -15,7 +15,7 @@ 1.8 UTF-8 1.14.4 - 2.2.0 + 2.2.1 1.3.1 1.8 1.8 diff --git a/dlink-flink/dlink-flink-1.15/pom.xml b/dlink-flink/dlink-flink-1.15/pom.xml index 51561db7dd..3dd53055fa 100644 --- a/dlink-flink/dlink-flink-1.15/pom.xml +++ b/dlink-flink/dlink-flink-1.15/pom.xml @@ -5,7 +5,7 @@ dlink-flink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-flink/pom.xml b/dlink-flink/pom.xml index 2c402ffeb3..352cb7b2c0 100644 --- a/dlink-flink/pom.xml +++ b/dlink-flink/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-function/pom.xml b/dlink-function/pom.xml index 0834167552..7781bc936f 100644 --- a/dlink-function/pom.xml +++ b/dlink-function/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-gateway/pom.xml b/dlink-gateway/pom.xml index a047e24769..6db47b36cf 100644 --- a/dlink-gateway/pom.xml +++ b/dlink-gateway/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-base/pom.xml b/dlink-metadata/dlink-metadata-base/pom.xml index 8b5bdad532..b0d7e8391f 100644 --- a/dlink-metadata/dlink-metadata-base/pom.xml +++ b/dlink-metadata/dlink-metadata-base/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-clickhouse/pom.xml b/dlink-metadata/dlink-metadata-clickhouse/pom.xml index 403dccdc55..4ffa1b7388 100644 --- a/dlink-metadata/dlink-metadata-clickhouse/pom.xml +++ b/dlink-metadata/dlink-metadata-clickhouse/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-doris/pom.xml b/dlink-metadata/dlink-metadata-doris/pom.xml index 2478d2f318..e023d04d63 100644 --- a/dlink-metadata/dlink-metadata-doris/pom.xml +++ b/dlink-metadata/dlink-metadata-doris/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-hive/pom.xml b/dlink-metadata/dlink-metadata-hive/pom.xml index f14549398e..a2120e91d6 100644 --- a/dlink-metadata/dlink-metadata-hive/pom.xml +++ b/dlink-metadata/dlink-metadata-hive/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-mysql/pom.xml b/dlink-metadata/dlink-metadata-mysql/pom.xml index fe29025cdb..50b02d904c 100644 --- a/dlink-metadata/dlink-metadata-mysql/pom.xml +++ b/dlink-metadata/dlink-metadata-mysql/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-oracle/pom.xml b/dlink-metadata/dlink-metadata-oracle/pom.xml index 44fcc9e9f1..5dcd96394f 100644 --- a/dlink-metadata/dlink-metadata-oracle/pom.xml +++ b/dlink-metadata/dlink-metadata-oracle/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-phoenix/pom.xml b/dlink-metadata/dlink-metadata-phoenix/pom.xml index 61ede4abb2..2b8703f9b5 100644 --- a/dlink-metadata/dlink-metadata-phoenix/pom.xml +++ b/dlink-metadata/dlink-metadata-phoenix/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-postgresql/pom.xml b/dlink-metadata/dlink-metadata-postgresql/pom.xml index c8121e80ca..5ac2d3fa07 100644 --- a/dlink-metadata/dlink-metadata-postgresql/pom.xml +++ b/dlink-metadata/dlink-metadata-postgresql/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/dlink-metadata-sqlserver/pom.xml b/dlink-metadata/dlink-metadata-sqlserver/pom.xml index 1d9f2ca1b5..7342dd21c3 100644 --- a/dlink-metadata/dlink-metadata-sqlserver/pom.xml +++ b/dlink-metadata/dlink-metadata-sqlserver/pom.xml @@ -5,7 +5,7 @@ dlink-metadata com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-metadata/pom.xml b/dlink-metadata/pom.xml index 2e71dfde27..822f74d794 100644 --- a/dlink-metadata/pom.xml +++ b/dlink-metadata/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 diff --git a/dlink-web/pom.xml b/dlink-web/pom.xml index 448a0ddea5..b9b2126d69 100644 --- a/dlink-web/pom.xml +++ b/dlink-web/pom.xml @@ -5,7 +5,7 @@ dlink com.dlink - 0.6.4-SNAPSHOT + 0.6.4 4.0.0 pom diff --git a/dlink-web/src/components/Studio/StudioHome/index.tsx b/dlink-web/src/components/Studio/StudioHome/index.tsx index a6efb65893..4a723b2f5f 100644 --- a/dlink-web/src/components/Studio/StudioHome/index.tsx +++ b/dlink-web/src/components/Studio/StudioHome/index.tsx @@ -12,7 +12,7 @@ const StudioHome = (props: any) => { return ( - 欢迎使用 Dinky v0.6.4-SNAPSHOT + 欢迎使用 Dinky v0.6.4
实时即未来,Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑,并致力于实时计算平台建设。
diff --git a/dlink-web/src/locales/zh-CN/pages.ts b/dlink-web/src/locales/zh-CN/pages.ts index ead0cbd842..c98f93843a 100644 --- a/dlink-web/src/locales/zh-CN/pages.ts +++ b/dlink-web/src/locales/zh-CN/pages.ts @@ -29,7 +29,7 @@ export default { 'pages.welcome.link': '欢迎加入', 'pages.welcome.star': '欢迎 Star ', 'pages.welcome.advancedLayout': 'Github', - 'pages.welcome.alertMessage': '实时计算平台 Dinky 即将发布,目前为体验版,版本号为 0.6.4-SNAPSHOT。', + 'pages.welcome.alertMessage': '实时计算平台 Dinky 即将发布,目前为体验版,版本号为 0.6.4。', 'pages.admin.subPage.title': ' 这个页面只有 admin 权限才能查看', 'pages.admin.subPage.alertMessage': 'umi ui 现已发布,欢迎使用 npm run ui 启动体验。', 'pages.searchTable.createForm.newRule': '新建规则', diff --git a/dlink-web/src/pages/Welcome.tsx b/dlink-web/src/pages/Welcome.tsx index 9857a3cc01..bd9cf10273 100644 --- a/dlink-web/src/pages/Welcome.tsx +++ b/dlink-web/src/pages/Welcome.tsx @@ -19,7 +19,7 @@ export default (): React.ReactNode => { { + 0.6.4 2022-06-05 +

+ +
    +
  • + 新增整库同步表名参数支持换行和列支持按主键优先排序 +
  • +
  • + 新增整库同步日志输出 +
  • +
  • + 新增钉钉报警的 @mobile 配置 +
  • +
  • + 新增任务的 StreamGraph 导出为 JSON +
  • +
  • + 新增任务的 API 接口示例页面 +
  • +
  • + 新增数据开发帮助页面 +
  • +
  • + 新增普通 SQL 的字段血缘 +
  • +
  • + 新增作业监听池来解决频繁写库的问题 +
  • +
  • + 新增非 FlinkSQL 作业的导出 StreamGraphPlan 的按钮隐藏 +
  • +
  • + 新增数据源的删除按钮 +
  • +
  • + 新增整库同步的 jdbc 配置和升级 flinkcdc 版本 +
  • +
  • + 修复刷新作业监控页面时的抖动问题 +
  • +
  • + 修复 Flink Oracle Connector 不能转换 CLOB 到 String 的问题 +
  • +
  • + 修复切换任务时保存点未同步刷新的问题 +
  • +
  • + 修复 ClusterClient 接口不通版本的兼容性问题 +
  • +
  • + 修复 MySQL 类型转换精度信息是空的问题 +
  • +
  • + 修复初始化函数的冗余操作 +
  • +
  • + 修复整库同步的 decimal 问题 +
  • +
  • + 修复获取作业计划失败的问题 +
  • +
  • + 修复整库同步 OracleCDC number 不能被转换为 Long 的问题 +
  • +
  • + 修复微信企业号报警测试按钮后台错误的问题 +
  • +
  • + 修复当切换作业 tab 时无法正确保存修改的作业配置的问题 +
  • +
  • + 修复数据源和元数据不能展示别名的问题 +
  • +
  • + 修复作业重命名后 tab 未更新的问题 +
  • +
  • + 修复 K8S 集群配置的 FlinkLibPath 是空的问题 +
  • +
  • + 优化初始化 sql +
  • +
  • + 优化打包 +
  • +
  • + 优化移除 preset-ui +
  • +
  • + 优化 MySQL 字段类型转换 +
  • +
+
+
diff --git a/pom.xml b/pom.xml index 3fa9cdd754..5d82f0213f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.dlink dlink pom - 0.6.4-SNAPSHOT + 0.6.4 dlink-flink