diff --git a/pkg/sink/codec/avro/avro_test.go b/pkg/sink/codec/avro/avro_test.go index 36a6412e801..d4d507417d2 100644 --- a/pkg/sink/codec/avro/avro_test.go +++ b/pkg/sink/codec/avro/avro_test.go @@ -14,19 +14,14 @@ package avro import ( - "bytes" "context" - "encoding/json" - "math/big" "math/rand" - "sort" "testing" "time" "github.com/linkedin/goavro/v2" + timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -35,710 +30,117 @@ import ( "github.com/stretchr/testify/require" ) -type avroTestColumnTuple struct { - col model.Column - colInfo rowcodec.ColInfo - expectedSchema interface{} - expectedData interface{} - expectedType string -} +func TestDMLEventE2E(t *testing.T) { + codecConfig := common.NewConfig(config.ProtocolAvro) + codecConfig.EnableTiDBExtension = true + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -var avroTestColumns = []*avroTestColumnTuple{ - { - model.Column{Name: "tiny", Value: int64(1), Type: mysql.TypeTiny}, - rowcodec.ColInfo{ - ID: 1, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeTiny), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT"}}, - int32(1), "int", - }, - { - model.Column{Name: "short", Value: int64(1), Type: mysql.TypeShort}, - rowcodec.ColInfo{ - ID: 2, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeShort), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT"}}, - int32(1), "int", - }, - { - model.Column{Name: "int24", Value: int64(1), Type: mysql.TypeInt24}, - rowcodec.ColInfo{ - ID: 3, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeInt24), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT"}}, - int32(1), "int", - }, - { - model.Column{Name: "long", Value: int64(1), Type: mysql.TypeLong}, - rowcodec.ColInfo{ - ID: 4, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeLong), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT"}}, - int32(1), "int", - }, - { - model.Column{Name: "longlong", Value: int64(1), Type: mysql.TypeLonglong}, - rowcodec.ColInfo{ - ID: 5, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeLonglong), - }, - avroSchema{Type: "long", Parameters: map[string]string{"tidb_type": "BIGINT"}}, - int64(1), "long", - }, - { - model.Column{ - Name: "tinyunsigned", - Value: uint64(1), - Type: mysql.TypeTiny, - Flag: model.UnsignedFlag, - }, - rowcodec.ColInfo{ - ID: 6, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeTiny)), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}}, - int32(1), "int", - }, - { - model.Column{ - Name: "shortunsigned", - Value: uint64(1), - Type: mysql.TypeShort, - Flag: model.UnsignedFlag, - }, - rowcodec.ColInfo{ - ID: 7, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeShort)), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}}, - int32(1), "int", - }, - { - model.Column{ - Name: "int24unsigned", - Value: uint64(1), - Type: mysql.TypeInt24, - Flag: model.UnsignedFlag, - }, - rowcodec.ColInfo{ - ID: 8, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeInt24)), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}}, - int32(1), "int", - }, - { - model.Column{ - Name: "longunsigned", - Value: uint64(1), - Type: mysql.TypeLong, - Flag: model.UnsignedFlag, - }, - rowcodec.ColInfo{ - ID: 9, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLong)), - }, - avroSchema{Type: "long", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}}, - int64(1), "long", - }, - { - model.Column{ - Name: "longlongunsigned", - Value: uint64(1), - Type: mysql.TypeLonglong, - Flag: model.UnsignedFlag, - }, - rowcodec.ColInfo{ - ID: 10, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)), - }, - avroSchema{Type: "long", Parameters: map[string]string{"tidb_type": "BIGINT UNSIGNED"}}, - int64(1), "long", - }, - { - model.Column{Name: "float", Value: float32(3.14), Type: mysql.TypeFloat}, - rowcodec.ColInfo{ - ID: 11, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeFloat), - }, - avroSchema{Type: "float", Parameters: map[string]string{"tidb_type": "FLOAT"}}, - float32(3.14), "float", - }, - { - model.Column{Name: "double", Value: float64(3.14), Type: mysql.TypeDouble}, - rowcodec.ColInfo{ - ID: 12, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeDouble), - }, - avroSchema{Type: "double", Parameters: map[string]string{"tidb_type": "DOUBLE"}}, - float64(3.14), "double", - }, - { - model.Column{Name: "bit", Value: uint64(683), Type: mysql.TypeBit}, - rowcodec.ColInfo{ - ID: 13, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeBit), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BIT", "length": "1"}}, - []byte("\x02\xab"), "bytes", - }, - { - model.Column{Name: "decimal", Value: "129012.1230000", Type: mysql.TypeNewDecimal}, - rowcodec.ColInfo{ - ID: 14, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeNewDecimal), - }, - avroLogicalTypeSchema{ - avroSchema: avroSchema{ - Type: "bytes", - Parameters: map[string]string{"tidb_type": "DECIMAL"}, - }, - LogicalType: "decimal", - Precision: 10, - Scale: 0, - }, - big.NewRat(129012123, 1000), "bytes.decimal", - }, - { - model.Column{Name: "tinytext", Value: []byte("hello world"), Type: mysql.TypeTinyBlob}, - rowcodec.ColInfo{ - ID: 15, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeBlob), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "mediumtext", Value: []byte("hello world"), Type: mysql.TypeMediumBlob}, - rowcodec.ColInfo{ - ID: 16, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeMediumBlob), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "text", Value: []byte("hello world"), Type: mysql.TypeBlob}, - rowcodec.ColInfo{ - ID: 17, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeBlob), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "longtext", Value: []byte("hello world"), Type: mysql.TypeLongBlob}, - rowcodec.ColInfo{ - ID: 18, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeLongBlob), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "varchar", Value: []byte("hello world"), Type: mysql.TypeVarchar}, - rowcodec.ColInfo{ - ID: 19, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "varstring", Value: []byte("hello world"), Type: mysql.TypeVarString}, - rowcodec.ColInfo{ - ID: 20, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeVarString), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{Name: "string", Value: []byte("hello world"), Type: mysql.TypeString}, - rowcodec.ColInfo{ - ID: 21, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeString), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TEXT"}}, - "hello world", "string", - }, - { - model.Column{ - Name: "tinyblob", - Value: []byte("hello world"), - Type: mysql.TypeTinyBlob, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 22, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "mediumblob", - Value: []byte("hello world"), - Type: mysql.TypeMediumBlob, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 23, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "blob", - Value: []byte("hello world"), - Type: mysql.TypeBlob, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 24, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeBlob)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "longblob", - Value: []byte("hello world"), - Type: mysql.TypeLongBlob, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 25, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "varbinary", - Value: []byte("hello world"), - Type: mysql.TypeVarchar, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 26, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "varbinary1", - Value: []byte("hello world"), - Type: mysql.TypeVarString, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 27, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeVarString)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{ - Name: "binary", - Value: []byte("hello world"), - Type: mysql.TypeString, - Flag: model.BinaryFlag, - }, - rowcodec.ColInfo{ - ID: 28, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeString)), - }, - avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}}, - []byte("hello world"), "bytes", - }, - { - model.Column{Name: "enum", Value: uint64(1), Type: mysql.TypeEnum}, - rowcodec.ColInfo{ - ID: 29, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetElems(types.NewFieldType(mysql.TypeEnum), []string{"a", "b"}), - }, - avroSchema{ - Type: "string", - Parameters: map[string]string{"tidb_type": "ENUM", "allowed": "a,b"}, - }, - "a", "string", - }, - { - model.Column{Name: "set", Value: uint64(1), Type: mysql.TypeSet}, - rowcodec.ColInfo{ - ID: 30, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b"}), - }, - avroSchema{ - Type: "string", - Parameters: map[string]string{"tidb_type": "SET", "allowed": "a,b"}, - }, - "a", "string", - }, - { - model.Column{Name: "json", Value: `{"key": "value"}`, Type: mysql.TypeJSON}, - rowcodec.ColInfo{ - ID: 31, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeJSON), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "JSON"}}, - `{"key": "value"}`, "string", - }, - { - model.Column{Name: "date", Value: "2000-01-01", Type: mysql.TypeDate}, - rowcodec.ColInfo{ - ID: 32, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeDate), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "DATE"}}, - "2000-01-01", "string", - }, - { - model.Column{Name: "datetime", Value: "2015-12-20 23:58:58", Type: mysql.TypeDatetime}, - rowcodec.ColInfo{ - ID: 33, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeDatetime), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "DATETIME"}}, - "2015-12-20 23:58:58", "string", - }, - { - model.Column{Name: "timestamp", Value: "1973-12-30 15:30:00", Type: mysql.TypeTimestamp}, - rowcodec.ColInfo{ - ID: 34, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeTimestamp), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TIMESTAMP"}}, - "1973-12-30 15:30:00", "string", - }, - { - model.Column{Name: "time", Value: "23:59:59", Type: mysql.TypeDuration}, - rowcodec.ColInfo{ - ID: 35, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeDuration), - }, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "TIME"}}, - "23:59:59", "string", - }, - { - model.Column{Name: "year", Value: int64(1970), Type: mysql.TypeYear}, - rowcodec.ColInfo{ - ID: 36, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeYear), - }, - avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "YEAR"}}, - int32(1970), "int", - }, -} + _, event, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + colInfos := event.TableInfo.GetColInfosForRowChangedEvent() -func TestColumnToAvroSchema(t *testing.T) { - for _, v := range avroTestColumns { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "precise", - AvroBigintUnsignedHandlingMode: "long", - }) - schema, err := encoder.(*BatchEncoder).columnToAvroSchema(&v.col, v.colInfo.Ft) - require.NoError(t, err) - require.Equal(t, v.expectedSchema, schema) - if v.col.Name == "decimal" { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "string", - AvroBigintUnsignedHandlingMode: "long", - }) - schema, err := encoder.(*BatchEncoder).columnToAvroSchema(&v.col, v.colInfo.Ft) - require.NoError(t, err) - require.Equal( - t, - avroSchema{Type: "string", Parameters: map[string]string{"tidb_type": "DECIMAL"}}, - schema, - ) - } - if v.col.Name == "longlongunsigned" { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "precise", - AvroBigintUnsignedHandlingMode: "string", - }) - schema, err := encoder.(*BatchEncoder).columnToAvroSchema(&v.col, v.colInfo.Ft) - require.NoError(t, err) - require.Equal( - t, - avroSchema{ - Type: "string", - Parameters: map[string]string{"tidb_type": "BIGINT UNSIGNED"}, - }, - schema, - ) - } - } -} + rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(event.Columns), func(i, j int) { + event.Columns[i], event.Columns[j] = event.Columns[j], event.Columns[i] + colInfos[i], colInfos[j] = colInfos[j], colInfos[i] + }) -func TestColumnToAvroData(t *testing.T) { - t.Parallel() + for _, decimalHandling := range []string{"precise", "string"} { + for _, unsignedBigintHandling := range []string{"long", "string"} { + codecConfig.AvroDecimalHandlingMode = decimalHandling + codecConfig.AvroBigintUnsignedHandlingMode = unsignedBigintHandling - for _, v := range avroTestColumns { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "precise", - AvroBigintUnsignedHandlingMode: "long", - }) - data, str, err := encoder.(*BatchEncoder).columnToAvroData(&v.col, v.colInfo.Ft) - require.NoError(t, err) - require.Equal(t, v.expectedData, data) - require.Equal(t, v.expectedType, str) - if v.col.Name == "decimal" { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "string", - AvroBigintUnsignedHandlingMode: "long", - }) - data, str, err := encoder.(*BatchEncoder).columnToAvroData(&v.col, v.colInfo.Ft) + encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) require.NoError(t, err) - require.Equal(t, "129012.1230000", data) - require.Equal(t, "string", str) - } - if v.col.Name == "longlongunsigned" { - encoder := NewAvroEncoder("namespace", nil, &common.Config{ - AvroDecimalHandlingMode: "precise", - AvroBigintUnsignedHandlingMode: "string", - }) - data, str, err := encoder.(*BatchEncoder).columnToAvroData(&v.col, v.colInfo.Ft) + require.NotNil(t, encoder) + + topic := "avro-test-topic" + err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {}) require.NoError(t, err) - require.Equal(t, "1", data) - require.Equal(t, "string", str) - } - } -} -func indentJSON(j string) string { - var buf bytes.Buffer - _ = json.Indent(&buf, []byte(j), "", " ") - return buf.String() -} + messages := encoder.Build() + require.Len(t, messages, 1) + message := messages[0] -func newLargeEvent() *model.RowChangedEvent { - cols := make([]*model.Column, 0) - colInfos := make([]rowcodec.ColInfo, 0) - - cols = append( - cols, - &model.Column{ - Name: "id", - Value: int64(1), - Type: mysql.TypeLong, - Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, - }, - ) - colInfos = append( - colInfos, - rowcodec.ColInfo{ - ID: 1000, - IsPKHandle: true, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeLong), - }, - ) - - for _, v := range avroTestColumns { - cols = append(cols, &v.col) - colInfos = append(colInfos, v.colInfo) + schemaM, err := NewConfluentSchemaManager(ctx, "http://127.0.0.1:8081", nil) + require.NoError(t, err) - colNew := v.col - colNew.Name = colNew.Name + "nullable" - colNew.Value = nil - colNew.Flag.SetIsNullable() + decoder := NewDecoder(codecConfig, schemaM, topic) + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) - colInfoNew := v.colInfo - colInfoNew.ID += int64(len(avroTestColumns)) + messageType, exist, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, exist) + require.Equal(t, model.MessageTypeRow, messageType) - cols = append(cols, &colNew) - colInfos = append(colInfos, colInfoNew) - } + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.NotNil(t, decodedEvent) - nameToIDMap := make(map[string]int64, len(cols)) - for i, col := range cols { - nameToIDMap[col.Name] = colInfos[i].ID - } - tidbTableInfo := model.BuildTiDBTableInfoImpl( - "avroencode", - cols, - [][]int{{0}}, - model.NewNameBasedColumnIDAllocator(nameToIDMap)) - model.AddExtraColumnInfo(tidbTableInfo, colInfos) - tableInfo := model.WrapTableInfo(100, "testdb", 100, tidbTableInfo) - return &model.RowChangedEvent{ - CommitTs: 417318403368288260, - TableInfo: tableInfo, - Columns: model.Columns2ColumnDatas(cols, tableInfo), + TeardownEncoderAndSchemaRegistry4Testing() + } } } -func TestRowToAvroSchemaEnableChecksum(t *testing.T) { - t.Parallel() - - event := newLargeEvent() - columns := event.GetColumns() - colInfos := event.TableInfo.GetColInfosForRowChangedEvent() - input := &avroEncodeInput{ - columns, - colInfos, - } - - rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(input.columns), func(i, j int) { - input.columns[i], input.columns[j] = input.columns[j], input.columns[i] - input.colInfos[i], input.colInfos[j] = input.colInfos[j], input.colInfos[i] - }) - +func TestDDLEventE2E(t *testing.T) { codecConfig := common.NewConfig(config.ProtocolAvro) codecConfig.EnableTiDBExtension = true - codecConfig.EnableRowChecksum = true - codecConfig.AvroDecimalHandlingMode = "string" - codecConfig.AvroBigintUnsignedHandlingMode = "string" + codecConfig.AvroEnableWatermark = true encoder := NewAvroEncoder(model.DefaultNamespace, nil, codecConfig) - schema, err := encoder.(*BatchEncoder).value2AvroSchema(&event.TableInfo.TableName, input) + ddl, _, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + message, err := encoder.EncodeDDLEvent(ddl) require.NoError(t, err) - require.Equal(t, expectedSchemaWithExtensionEnableChecksum, indentJSON(schema)) - _, err = goavro.NewCodec(schema) + require.NotNil(t, message) + + topic := "test-topic" + decoder := NewDecoder(codecConfig, nil, topic) + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, exist, err := decoder.HasNext() require.NoError(t, err) + require.True(t, exist) + require.Equal(t, model.MessageTypeDDL, messageType) - require.True(t, sort.IsSorted(input)) + decodedEvent, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedEvent) + require.Equal(t, ddl.CommitTs, decodedEvent.CommitTs) + require.Equal(t, timodel.ActionCreateTable, decodedEvent.Type) + require.NotEmpty(t, decodedEvent.Query) + require.NotEmpty(t, decodedEvent.TableInfo.TableName.Schema) + require.NotEmpty(t, decodedEvent.TableInfo.TableName.Table) } -func TestRowToAvroSchema(t *testing.T) { +func TestResolvedE2E(t *testing.T) { t.Parallel() - event := newLargeEvent() - columns := event.GetColumns() - colInfos := event.TableInfo.GetColInfosForRowChangedEvent() - input := &avroEncodeInput{ - columns, - colInfos, - } - codecConfig := common.NewConfig(config.ProtocolAvro) + codecConfig.EnableTiDBExtension = true + codecConfig.AvroEnableWatermark = true + encoder := NewAvroEncoder(model.DefaultNamespace, nil, codecConfig) - schema, err := encoder.(*BatchEncoder).value2AvroSchema(&event.TableInfo.TableName, input) - require.NoError(t, err) - require.Equal(t, expectedSchemaWithoutExtension, indentJSON(schema)) - _, err = goavro.NewCodec(schema) + resolvedTs := uint64(1591943372224) + message, err := encoder.EncodeCheckpointEvent(resolvedTs) require.NoError(t, err) + require.NotNil(t, message) - codecConfig.EnableTiDBExtension = true - encoder = NewAvroEncoder(model.DefaultNamespace, nil, codecConfig) - - schema, err = encoder.(*BatchEncoder).value2AvroSchema(&event.TableInfo.TableName, input) + topic := "test-topic" + decoder := NewDecoder(codecConfig, nil, topic) + err = decoder.AddKeyValue(message.Key, message.Value) require.NoError(t, err) - require.Equal(t, expectedSchemaWithExtension, indentJSON(schema)) - _, err = goavro.NewCodec(schema) - require.NoError(t, err) -} - -func TestRowToAvroData(t *testing.T) { - t.Parallel() - event := newLargeEvent() - columns := event.GetColumns() - colInfos := event.TableInfo.GetColInfosForRowChangedEvent() - input := &avroEncodeInput{ - columns, - colInfos, - } - - codecConfig := common.NewConfig(config.ProtocolAvro) - encoder := NewAvroEncoder(model.DefaultNamespace, nil, codecConfig) - - data, err := encoder.(*BatchEncoder).columns2AvroData(input) + messageType, exist, err := decoder.HasNext() require.NoError(t, err) + require.True(t, exist) + require.Equal(t, model.MessageTypeResolved, messageType) - for _, col := range input.columns { - _, exists := data[col.Name] - require.True(t, exists) - } + obtained, err := decoder.NextResolvedEvent() + require.NoError(t, err) + require.Equal(t, resolvedTs, obtained) } func TestAvroEncode4EnableChecksum(t *testing.T) { @@ -756,7 +158,7 @@ func TestAvroEncode4EnableChecksum(t *testing.T) { require.NoError(t, err) require.NotNil(t, encoder) - event := newLargeEvent() + _, event, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) topic := "default" bin, err := encoder.encodeValue(ctx, "default", event) require.NoError(t, err) @@ -796,7 +198,7 @@ func TestAvroEncode(t *testing.T) { require.NoError(t, err) require.NotNil(t, encoder) - event := newLargeEvent() + _, event, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) topic := "default" bin, err := encoder.encodeKey(ctx, topic, event) require.NoError(t, err) @@ -815,7 +217,7 @@ func TestAvroEncode(t *testing.T) { require.Fail(t, "key shall not include extension fields") } } - require.Equal(t, int32(1), res.(map[string]interface{})["id"]) + require.Equal(t, int32(127), res.(map[string]interface{})["t"]) bin, err = encoder.encodeValue(ctx, topic, event) require.NoError(t, err) diff --git a/pkg/sink/codec/avro/avro_test_data.go b/pkg/sink/codec/avro/avro_test_data.go deleted file mode 100644 index 78f29a46a15..00000000000 --- a/pkg/sink/codec/avro/avro_test_data.go +++ /dev/null @@ -1,2516 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package avro - -var expectedSchemaWithoutExtension = `{ - "type": "record", - "name": "avroencode", - "namespace": "default.testdb", - "fields": [ - { - "name": "id", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "tiny", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "tinynullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "short", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "shortnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "int24", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "int24nullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "long", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "longnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "longlong", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - }, - { - "default": null, - "name": "longlongnullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - ] - }, - { - "name": "tinyunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "tinyunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "shortunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "shortunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "int24unsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "int24unsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "longunsigned", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "longunsignednullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "longlongunsigned", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - }, - { - "default": null, - "name": "longlongunsignednullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - ] - }, - { - "name": "float", - "type": { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - }, - { - "default": null, - "name": "floatnullable", - "type": [ - "null", - { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - ] - }, - { - "name": "double", - "type": { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - }, - { - "default": null, - "name": "doublenullable", - "type": [ - "null", - { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - ] - }, - { - "name": "bit", - "type": { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - }, - { - "default": null, - "name": "bitnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - ] - }, - { - "name": "decimal", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "DECIMAL" - }, - "logicalType": "decimal", - "precision": 10, - "scale": 0 - } - }, - { - "default": null, - "name": "decimalnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "DECIMAL" - }, - "logicalType": "decimal", - "precision": 10, - "scale": 0 - } - ] - }, - { - "name": "tinytext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "tinytextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "mediumtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "mediumtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "text", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "textnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "longtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "longtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "varchar", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "varcharnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "varstring", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "varstringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "string", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "stringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "tinyblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "tinyblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "mediumblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "mediumblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "blob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "blobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "longblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "longblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "varbinary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "varbinarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "varbinary1", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "varbinary1nullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "binary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "binarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "enum", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - }, - { - "default": null, - "name": "enumnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - ] - }, - { - "name": "set", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - }, - { - "default": null, - "name": "setnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - ] - }, - { - "name": "json", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - }, - { - "default": null, - "name": "jsonnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - ] - }, - { - "name": "date", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - }, - { - "default": null, - "name": "datenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - ] - }, - { - "name": "datetime", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - }, - { - "default": null, - "name": "datetimenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - ] - }, - { - "name": "timestamp", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - }, - { - "default": null, - "name": "timestampnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - ] - }, - { - "name": "time", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - }, - { - "default": null, - "name": "timenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - ] - }, - { - "name": "year", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - }, - { - "default": null, - "name": "yearnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - ] - } - ] -}` - -var expectedSchemaWithExtension = `{ - "type": "record", - "name": "avroencode", - "namespace": "default.testdb", - "fields": [ - { - "name": "id", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "tiny", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "tinynullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "short", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "shortnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "int24", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "int24nullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "long", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": null, - "name": "longnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "name": "longlong", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - }, - { - "default": null, - "name": "longlongnullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - ] - }, - { - "name": "tinyunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "tinyunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "shortunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "shortunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "int24unsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "int24unsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "longunsigned", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "default": null, - "name": "longunsignednullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "name": "longlongunsigned", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - }, - { - "default": null, - "name": "longlongunsignednullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - ] - }, - { - "name": "float", - "type": { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - }, - { - "default": null, - "name": "floatnullable", - "type": [ - "null", - { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - ] - }, - { - "name": "double", - "type": { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - }, - { - "default": null, - "name": "doublenullable", - "type": [ - "null", - { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - ] - }, - { - "name": "bit", - "type": { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - }, - { - "default": null, - "name": "bitnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - ] - }, - { - "name": "decimal", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "DECIMAL" - }, - "logicalType": "decimal", - "precision": 10, - "scale": 0 - } - }, - { - "default": null, - "name": "decimalnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "DECIMAL" - }, - "logicalType": "decimal", - "precision": 10, - "scale": 0 - } - ] - }, - { - "name": "tinytext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "tinytextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "mediumtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "mediumtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "text", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "textnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "longtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "longtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "varchar", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "varcharnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "varstring", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "varstringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "string", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "default": null, - "name": "stringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "name": "tinyblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "tinyblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "mediumblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "mediumblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "blob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "blobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "longblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "longblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "varbinary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "varbinarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "varbinary1", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "varbinary1nullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "binary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "default": null, - "name": "binarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "name": "enum", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - }, - { - "default": null, - "name": "enumnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - ] - }, - { - "name": "set", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - }, - { - "default": null, - "name": "setnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - ] - }, - { - "name": "json", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - }, - { - "default": null, - "name": "jsonnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - ] - }, - { - "name": "date", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - }, - { - "default": null, - "name": "datenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - ] - }, - { - "name": "datetime", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - }, - { - "default": null, - "name": "datetimenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - ] - }, - { - "name": "timestamp", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - }, - { - "default": null, - "name": "timestampnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - ] - }, - { - "name": "time", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - }, - { - "default": null, - "name": "timenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - ] - }, - { - "name": "year", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - }, - { - "default": null, - "name": "yearnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - ] - }, - { - "default": "", - "name": "_tidb_op", - "type": "string" - }, - { - "default": 0, - "name": "_tidb_commit_ts", - "type": "long" - }, - { - "default": 0, - "name": "_tidb_commit_physical_time", - "type": "long" - } - ] -}` - -var expectedSchemaWithExtensionEnableChecksum = `{ - "type": "record", - "name": "avroencode", - "namespace": "default.testdb", - "fields": [ - { - "name": "tiny", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "short", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "int24", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "long", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "name": "longlong", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - }, - { - "name": "tinyunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "name": "shortunsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "name": "int24unsigned", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "name": "longunsigned", - "type": { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - }, - { - "name": "longlongunsigned", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - }, - { - "name": "float", - "type": { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - }, - { - "name": "double", - "type": { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - }, - { - "name": "bit", - "type": { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - }, - { - "name": "decimal", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DECIMAL" - } - } - }, - { - "name": "tinytext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "mediumtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "text", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "longtext", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "varchar", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "varstring", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "string", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - }, - { - "name": "tinyblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "mediumblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "blob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "longblob", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "varbinary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "varbinary1", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "binary", - "type": { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - }, - { - "name": "enum", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - }, - { - "name": "set", - "type": { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - }, - { - "name": "json", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - }, - { - "name": "date", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - }, - { - "name": "datetime", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - }, - { - "name": "timestamp", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - }, - { - "name": "time", - "type": { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - }, - { - "name": "year", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - }, - { - "default": null, - "name": "tinynullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "default": null, - "name": "shortnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "default": null, - "name": "int24nullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "default": null, - "name": "longnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - ] - }, - { - "default": null, - "name": "longlongnullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "BIGINT" - } - } - ] - }, - { - "default": null, - "name": "tinyunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "default": null, - "name": "shortunsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "default": null, - "name": "int24unsignednullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "default": null, - "name": "longunsignednullable", - "type": [ - "null", - { - "type": "long", - "connect.parameters": { - "tidb_type": "INT UNSIGNED" - } - } - ] - }, - { - "default": null, - "name": "longlongunsignednullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "BIGINT UNSIGNED" - } - } - ] - }, - { - "default": null, - "name": "floatnullable", - "type": [ - "null", - { - "type": "float", - "connect.parameters": { - "tidb_type": "FLOAT" - } - } - ] - }, - { - "default": null, - "name": "doublenullable", - "type": [ - "null", - { - "type": "double", - "connect.parameters": { - "tidb_type": "DOUBLE" - } - } - ] - }, - { - "default": null, - "name": "bitnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "length": "1", - "tidb_type": "BIT" - } - } - ] - }, - { - "default": null, - "name": "decimalnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DECIMAL" - } - } - ] - }, - { - "default": null, - "name": "tinytextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "mediumtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "textnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "longtextnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "varcharnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "varstringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "stringnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TEXT" - } - } - ] - }, - { - "default": null, - "name": "tinyblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "mediumblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "blobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "longblobnullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "varbinarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "varbinary1nullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "binarynullable", - "type": [ - "null", - { - "type": "bytes", - "connect.parameters": { - "tidb_type": "BLOB" - } - } - ] - }, - { - "default": null, - "name": "enumnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "ENUM" - } - } - ] - }, - { - "default": null, - "name": "setnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "allowed": "a,b", - "tidb_type": "SET" - } - } - ] - }, - { - "default": null, - "name": "jsonnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "JSON" - } - } - ] - }, - { - "default": null, - "name": "datenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATE" - } - } - ] - }, - { - "default": null, - "name": "datetimenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "DATETIME" - } - } - ] - }, - { - "default": null, - "name": "timestampnullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIMESTAMP" - } - } - ] - }, - { - "default": null, - "name": "timenullable", - "type": [ - "null", - { - "type": "string", - "connect.parameters": { - "tidb_type": "TIME" - } - } - ] - }, - { - "default": null, - "name": "yearnullable", - "type": [ - "null", - { - "type": "int", - "connect.parameters": { - "tidb_type": "YEAR" - } - } - ] - }, - { - "name": "id", - "type": { - "type": "int", - "connect.parameters": { - "tidb_type": "INT" - } - } - }, - { - "default": "", - "name": "_tidb_op", - "type": "string" - }, - { - "default": 0, - "name": "_tidb_commit_ts", - "type": "long" - }, - { - "default": 0, - "name": "_tidb_commit_physical_time", - "type": "long" - }, - { - "default": "", - "name": "_tidb_row_level_checksum", - "type": "string" - }, - { - "default": false, - "name": "_tidb_corrupted", - "type": "boolean" - }, - { - "default": 0, - "name": "_tidb_checksum_version", - "type": "int" - } - ] -}` diff --git a/pkg/sink/codec/avro/decoder_test.go b/pkg/sink/codec/avro/decoder_test.go deleted file mode 100644 index b92e8d08e58..00000000000 --- a/pkg/sink/codec/avro/decoder_test.go +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package avro - -import ( - "context" - "math/rand" - "testing" - "time" - - timodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tiflow/cdc/entry" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/stretchr/testify/require" -) - -func TestDMLEventE2E(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("use test") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - codecConfig := common.NewConfig(config.ProtocolAvro) - codecConfig.EnableTiDBExtension = true - encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) - defer TeardownEncoderAndSchemaRegistry4Testing() - require.NoError(t, err) - require.NotNil(t, encoder) - - _ = helper.DDL2Event(`create table t(a varchar(64) not null, b varchar(64) default null, primary key(a))`) - - event := helper.DML2Event(`insert into t values('a', 'b')`, "test", "t") - - topic := "avro-test-topic" - err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {}) - require.NoError(t, err) - - event = helper.DML2Event(`insert into t(a) values ('b')`, "test", "t") - err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {}) - require.NoError(t, err) - - event = helper.DML2Event(`insert into t(a) values ('')`, "test", "t") - err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {}) - require.NoError(t, err) -} - -func TestDecodeEvent(t *testing.T) { - codecConfig := common.NewConfig(config.ProtocolAvro) - codecConfig.EnableTiDBExtension = true - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) - defer TeardownEncoderAndSchemaRegistry4Testing() - require.NoError(t, err) - require.NotNil(t, encoder) - - event := newLargeEvent() - colInfos := event.TableInfo.GetColInfosForRowChangedEvent() - - rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(event.Columns), func(i, j int) { - event.Columns[i], event.Columns[j] = event.Columns[j], event.Columns[i] - colInfos[i], colInfos[j] = colInfos[j], colInfos[i] - }) - - topic := "avro-test-topic" - err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {}) - require.NoError(t, err) - - messages := encoder.Build() - require.Len(t, messages, 1) - message := messages[0] - - schemaM, err := NewConfluentSchemaManager(ctx, "http://127.0.0.1:8081", nil) - require.NoError(t, err) - - decoder := NewDecoder(codecConfig, schemaM, topic) - err = decoder.AddKeyValue(message.Key, message.Value) - require.NoError(t, err) - - messageType, exist, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, exist) - require.Equal(t, model.MessageTypeRow, messageType) - - decodedEvent, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - require.NotNil(t, decodedEvent) -} - -func TestDecodeDDLEvent(t *testing.T) { - t.Parallel() - - config := &common.Config{ - EnableTiDBExtension: true, - AvroEnableWatermark: true, - } - - encoder := &BatchEncoder{ - namespace: model.DefaultNamespace, - result: make([]*common.Message, 0, 1), - config: config, - } - - message, err := encoder.EncodeDDLEvent(&model.DDLEvent{ - StartTs: 1020, - CommitTs: 1030, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "test", - Table: "t1", - TableID: 0, - IsPartition: false, - }, - }, - Type: timodel.ActionAddColumn, - Query: "ALTER TABLE test.t1 ADD COLUMN a int", - }) - require.NoError(t, err) - require.NotNil(t, message) - - topic := "test-topic" - decoder := NewDecoder(config, nil, topic) - err = decoder.AddKeyValue(message.Key, message.Value) - require.NoError(t, err) - - messageType, exist, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, exist) - require.Equal(t, model.MessageTypeDDL, messageType) - - decodedEvent, err := decoder.NextDDLEvent() - require.NoError(t, err) - require.NotNil(t, decodedEvent) - require.Equal(t, uint64(1030), decodedEvent.CommitTs) - require.Equal(t, timodel.ActionAddColumn, decodedEvent.Type) - require.Equal(t, "ALTER TABLE test.t1 ADD COLUMN a int", decodedEvent.Query) - require.Equal(t, "test", decodedEvent.TableInfo.TableName.Schema) - require.Equal(t, "t1", decodedEvent.TableInfo.TableName.Table) - require.Equal(t, int64(0), decodedEvent.TableInfo.TableName.TableID) - require.False(t, decodedEvent.TableInfo.TableName.IsPartition) -} - -func TestDecodeResolvedEvent(t *testing.T) { - t.Parallel() - - config := &common.Config{ - EnableTiDBExtension: true, - AvroEnableWatermark: true, - } - - encoder := &BatchEncoder{ - namespace: model.DefaultNamespace, - config: config, - result: make([]*common.Message, 0, 1), - } - - resolvedTs := uint64(1591943372224) - message, err := encoder.EncodeCheckpointEvent(resolvedTs) - require.NoError(t, err) - require.NotNil(t, message) - - topic := "test-topic" - decoder := NewDecoder(config, nil, topic) - err = decoder.AddKeyValue(message.Key, message.Value) - require.NoError(t, err) - - messageType, exist, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, exist) - require.Equal(t, model.MessageTypeResolved, messageType) - - obtained, err := decoder.NextResolvedEvent() - require.NoError(t, err) - require.Equal(t, resolvedTs, obtained) -}