diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index c212bf2cb7..dcd12220bc 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -60,9 +60,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -70,51 +70,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } private void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -148,10 +148,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -263,23 +263,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index b9caac7e0b..5c189c57d7 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } private void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index bf003ba0a9..77adb8b625 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } public void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else {