From 86ec757cffcbb46d4813740687e5a04584205c01 Mon Sep 17 00:00:00 2001 From: Zack Young <1052455797@qq.com> Date: Sun, 5 Jun 2022 22:08:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20TimestampType=E7=9A=84?= =?UTF-8?q?=E5=80=BC=E5=8F=AF=E8=83=BD=E6=98=AFString=E5=BC=95=E5=8F=91?= =?UTF-8?q?=E7=9A=84=E7=B1=BB=E5=9E=8B=E5=BC=BA=E8=BD=ACbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 112 +++++++++--------- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 112 +++++++++--------- 2 files changed, 114 insertions(+), 110 deletions(-) diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index b9caac7e0b..5c189c57d7 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } private void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else { diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index bf003ba0a9..77adb8b625 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -67,9 +67,9 @@ public void addSink(StreamExecutionEnvironment env, DataStream rowDataD } private DataStream buildRow( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList) { final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); @@ -77,51 +77,51 @@ private DataStream buildRow( RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - switch (value.get("op").toString()) { - case "r": - case "c": - Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(irow); - break; - case "d": - Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(drow); - break; - case "u": - Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubrow); - Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uarow); - break; + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(Map value, Collector out) throws Exception { + switch (value.get("op").toString()) { + case "r": + case "c": + Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(irow); + break; + case "d": + Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(drow); + break; + case "u": + Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubrow); + Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uarow); + break; + } } - } - }, rowTypeInfo); + }, rowTypeInfo); } public void addTableSink( - CustomTableEnvironment customTableEnvironment, - DataStream rowDataDataStream, - Table table, - List columnNameList) { + CustomTableEnvironment customTableEnvironment, + DataStream rowDataDataStream, + Table table, + List columnNameList) { String sinkTableName = getSinkTableName(table); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); @@ -155,10 +155,10 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); if (Asserts.isNotNullCollection(schemaList)) { @@ -176,7 +176,7 @@ public DataStreamSource build( logger.info("Build " + table.getSchemaTableName() + " flatMap successful..."); logger.info("Start build " + table.getSchemaTableName() + " sink..."); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); - }catch (Exception e) { + } catch (Exception e) { logger.error("Build " + table.getSchemaTableName() + " cdc sync failed..."); logger.error(LogUtil.getError(e)); } @@ -270,23 +270,25 @@ protected Object convertValue(Object value, LogicalType logicalType) { return null; } if (logicalType instanceof DateType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDate(); - }else { + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate(); } } else if (logicalType instanceof TimestampType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(ZoneId.systemDefault()).toLocalDateTime(); - }else { + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else { return Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); } else if (logicalType instanceof BigIntType) { - if(value instanceof Integer){ + if (value instanceof Integer) { return ((Integer) value).longValue(); - }else { + } else { return value; } } else {