Skip to content

Commit

Permalink
fix #674 Support MySQL varbinary and binary in CDCSOURCE. (#675)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanshicheng authored Jul 3, 2022
1 parent 6cfe7ba commit 28694eb
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) {
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -223,6 +220,13 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) {
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -223,6 +220,13 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) {
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.dlink.cdc.sql;

import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -12,11 +14,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand All @@ -35,12 +33,12 @@
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;

import javax.xml.bind.DatatypeConverter;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;

/**
* SQLSinkBuilder
Expand Down Expand Up @@ -225,6 +223,13 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) {
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -225,6 +222,13 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -251,6 +240,8 @@ public LogicalType getLogicalType(ColumnType columnType) {
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
return new VarCharType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -225,6 +222,13 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else {
return value;
}
} else if (logicalType instanceof VarBinaryType) {
// VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
if (value instanceof String) {
return DatatypeConverter.parseBase64Binary((String) value);
} else {
return value;
}
} else {
return value;
}
Expand Down

0 comments on commit 28694eb

Please sign in to comment.