diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 70806ac0b0..f53ef8bcce 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -14,18 +14,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) { case LOCALDATETIME: case TIMESTAMP: return new TimestampType(); + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); default: return new VarCharType(); } diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index e8f5ad115e..617d0cf10c 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -12,16 +12,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import javax.xml.bind.DatatypeConverter; import java.io.Serializable; import java.math.BigDecimal; import java.time.Instant; @@ -223,6 +220,13 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else { return value; } + } else if (logicalType instanceof VarBinaryType) { + // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. + if (value instanceof String) { + return DatatypeConverter.parseBase64Binary((String) value); + } else { + return value; + } } else { return value; } diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 70806ac0b0..f53ef8bcce 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -14,18 +14,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) { case LOCALDATETIME: case TIMESTAMP: return new TimestampType(); + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); default: return new VarCharType(); } diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index e8f5ad115e..617d0cf10c 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -12,16 +12,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import javax.xml.bind.DatatypeConverter; import java.io.Serializable; import java.math.BigDecimal; import java.time.Instant; @@ -223,6 +220,13 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else { return value; } + } else if (logicalType instanceof VarBinaryType) { + // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. + if (value instanceof String) { + return DatatypeConverter.parseBase64Binary((String) value); + } else { + return value; + } } else { return value; } diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 70806ac0b0..f53ef8bcce 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -14,18 +14,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) { case LOCALDATETIME: case TIMESTAMP: return new TimestampType(); + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); default: return new VarCharType(); } diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index 4d40e8c8d2..eff890abf6 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -1,5 +1,7 @@ package com.dlink.cdc.sql; +import com.dlink.utils.FlinkBaseUtil; +import com.dlink.utils.LogUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -12,11 +14,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -35,12 +33,12 @@ import com.dlink.cdc.CDCBuilder; import com.dlink.cdc.SinkBuilder; import com.dlink.executor.CustomTableEnvironment; + +import javax.xml.bind.DatatypeConverter; import com.dlink.model.FlinkCDCConfig; import com.dlink.model.Schema; import com.dlink.model.Table; -import com.dlink.utils.FlinkBaseUtil; import com.dlink.utils.JSONUtil; -import com.dlink.utils.LogUtil; /** * SQLSinkBuilder @@ -225,6 +223,13 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else { return value; } + } else if (logicalType instanceof VarBinaryType) { + // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. + if (value instanceof String) { + return DatatypeConverter.parseBase64Binary((String) value); + } else { + return value; + } } else { return value; } diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 70806ac0b0..f53ef8bcce 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -14,18 +14,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) { case LOCALDATETIME: case TIMESTAMP: return new TimestampType(); + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); default: return new VarCharType(); } diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index 4d40e8c8d2..ca51fae980 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -12,16 +12,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import javax.xml.bind.DatatypeConverter; import java.io.Serializable; import java.math.BigDecimal; import java.time.Instant; @@ -225,6 +222,13 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else { return value; } + } else if (logicalType instanceof VarBinaryType) { + // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. + if (value instanceof String) { + return DatatypeConverter.parseBase64Binary((String) value); + } else { + return value; + } } else { return value; } diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 70806ac0b0..f53ef8bcce 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -14,18 +14,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) { case LOCALDATETIME: case TIMESTAMP: return new TimestampType(); + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); default: return new VarCharType(); } diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index 4d40e8c8d2..ca51fae980 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -12,16 +12,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; +import javax.xml.bind.DatatypeConverter; import java.io.Serializable; import java.math.BigDecimal; import java.time.Instant; @@ -225,6 +222,13 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else { return value; } + } else if (logicalType instanceof VarBinaryType) { + // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. + if (value instanceof String) { + return DatatypeConverter.parseBase64Binary((String) value); + } else { + return value; + } } else { return value; }