Skip to content

Commit

Permalink
add integer value to integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 5, 2024
1 parent 074c23e commit 70ad470
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 14 deletions.
15 changes: 6 additions & 9 deletions pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
"mysqlType": types.TypeToStr(col.GetType(), col.GetCharset()),
"charset": col.GetCharset(),
"collate": col.GetCollate(),
// todo: add detail description about length,
// for the text type, default length is 4294967295,
// it's out of the range, convert it to int32, make it -1.
"length": int32(col.GetFlen()),
"decimal": col.GetDecimal(),
"elements": col.GetElems(),
"unsigned": mysql.HasUnsignedFlag(col.GetFlag()),
"zerofill": mysql.HasZerofillFlag(col.GetFlag()),
"length": col.GetFlen(),
"decimal": col.GetDecimal(),
"elements": col.GetElems(),
"unsigned": mysql.HasUnsignedFlag(col.GetFlag()),
"zerofill": mysql.HasZerofillFlag(col.GetFlag()),
}
column := map[string]interface{}{
"name": col.Name.O,
Expand Down Expand Up @@ -314,7 +311,7 @@ func newTableSchemaFromAvroNative(native map[string]interface{}) *TableSchema {
MySQLType: rawDataType["mysqlType"].(string),
Charset: rawDataType["charset"].(string),
Collate: rawDataType["collate"].(string),
Length: int(rawDataType["length"].(int32)),
Length: int(rawDataType["length"].(int64)),
Decimal: int(rawDataType["decimal"].(int32)),
Elements: elements,
Unsigned: rawDataType["unsigned"].(bool),
Expand Down
77 changes: 77 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,83 @@ func TestEncodeDDLEvent(t *testing.T) {
}
}

func TestEncodeIntegerTypes(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.tp_unsigned_int (
id int auto_increment,
c_unsigned_tinyint tinyint unsigned null,
c_unsigned_smallint smallint unsigned null,
c_unsigned_mediumint mediumint unsigned null,
c_unsigned_int int unsigned null,
c_unsigned_bigint bigint unsigned null,
constraint pk primary key (id))`
ddlEvent := helper.DDL2Event(sql)

sql = `insert into test.tp_unsigned_int(c_unsigned_tinyint, c_unsigned_smallint, c_unsigned_mediumint,
c_unsigned_int, c_unsigned_bigint) values (255, 65535, 16777215, 4294967295, 18446744073709551615)`
dmlEvent := helper.DML2Event(sql, "test", "tp_unsigned_int")

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)
for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

m, err := enc.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

_, err = dec.NextDDLEvent()
require.NoError(t, err)

err = enc.AppendRowChangedEvent(ctx, "", dmlEvent, func() {})
require.NoError(t, err)

messages := enc.Build()
err = dec.AddKeyValue(messages[0].Key, messages[0].Value)
require.NoError(t, err)

messageType, hasNext, err = dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, messageType)

decodedRow, err := dec.NextRowChangedEvent()
require.NoError(t, err)
require.Equal(t, decodedRow.CommitTs, dmlEvent.CommitTs)

decodedColumns := make(map[string]*model.Column, len(decodedRow.Columns))
for _, column := range decodedRow.Columns {
decodedColumns[column.Name] = column
}

for _, expected := range dmlEvent.Columns {
decoded, ok := decodedColumns[expected.Name]
require.True(t, ok)
require.EqualValues(t, expected.Value, decoded.Value)
require.Equal(t, expected.Charset, decoded.Charset)
require.Equal(t, expected.Collation, decoded.Collation)
}
}
}

func TestEncoderOtherTypes(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down
7 changes: 3 additions & 4 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simple
import (
"encoding/base64"
"fmt"
"math"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -635,12 +636,10 @@ func encodeValue4Avro(
default:

}
// value too large, convert it to string
vv := int64(v)
if uint64(vv) != v {
if v > math.MaxInt64 {
return strconv.FormatUint(v, 10), "string", nil
}
return vv, "long", nil
return int64(v), "long", nil
case int64:
return v, "long", nil
case []byte:
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/message.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
{
"name": "length",
"type": "int"
"type": "long"
},
{
"name": "decimal",
Expand Down

0 comments on commit 70ad470

Please sign in to comment.