diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 19af984164c..09561f67c17 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -314,11 +314,8 @@ func (c *consumer) emitDMLEvents( // Always enable tidb extension for canal-json protocol // because we need to get the commit ts from the extension field. c.codecCfg.EnableTiDBExtension = true - decoder, err = canal.NewBatchDecoder(ctx, c.codecCfg, nil) - if err != nil { - return errors.Trace(err) - } - err := decoder.AddKeyValue(nil, content) + decoder = canal.NewCanalJSONTxnEventDecoder(c.codecCfg) + err = decoder.AddKeyValue(nil, content) if err != nil { return errors.Trace(err) } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index bc381e16986..bc3ebd9544d 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -17,12 +17,12 @@ import ( "bytes" "context" "database/sql" + "encoding/json" "path/filepath" "strconv" "strings" "time" - "github.com/goccy/go-json" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" @@ -45,10 +45,44 @@ type tableKey struct { table string } +type bufferedJSONDecoder struct { + buf *bytes.Buffer + decoder *json.Decoder +} + +func newBufferedJSONDecoder() *bufferedJSONDecoder { + buf := new(bytes.Buffer) + decoder := json.NewDecoder(buf) + return &bufferedJSONDecoder{ + buf: buf, + decoder: decoder, + } +} + +// Write writes data to the buffer. +func (b *bufferedJSONDecoder) Write(data []byte) (n int, err error) { + return b.buf.Write(data) +} + +// Decode decodes the buffer into the original message. +func (b *bufferedJSONDecoder) Decode(v interface{}) error { + return b.decoder.Decode(v) +} + +// Len returns the length of the buffer. +func (b *bufferedJSONDecoder) Len() int { + return b.buf.Len() +} + +// Bytes returns the buffer content. +func (b *bufferedJSONDecoder) Bytes() []byte { + return b.buf.Bytes() +} + // batchDecoder decodes the byte into the original message. type batchDecoder struct { - data []byte - msg canalJSONMessageInterface + msg canalJSONMessageInterface + decoder *bufferedJSONDecoder config *common.Config @@ -81,8 +115,18 @@ func NewBatchDecoder( GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided") } + var msg canalJSONMessageInterface = &JSONMessage{} + if codecConfig.EnableTiDBExtension { + msg = &canalJSONMessageWithTiDBExtension{ + JSONMessage: &JSONMessage{}, + Extensions: &tidbExtension{}, + } + } + return &batchDecoder{ config: codecConfig, + msg: msg, + decoder: newBufferedJSONDecoder(), storage: externalStorage, upstreamTiDB: db, bytesDecoder: charmap.ISO8859_1.NewDecoder(), @@ -100,51 +144,23 @@ func (b *batchDecoder) AddKeyValue(_, value []byte) error { return errors.Trace(err) } - b.data = value + if _, err = b.decoder.Write(value); err != nil { + return errors.Trace(err) + } return nil } // HasNext implements the RowEventDecoder interface func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { - if b.data == nil { + if b.decoder.Len() == 0 { return model.MessageTypeUnknown, false, nil } - var ( - msg canalJSONMessageInterface = &JSONMessage{} - encodedData []byte - ) - - if b.config.EnableTiDBExtension { - msg = &canalJSONMessageWithTiDBExtension{ - JSONMessage: &JSONMessage{}, - Extensions: &tidbExtension{}, - } - } - if len(b.config.Terminator) > 0 { - idx := bytes.IndexAny(b.data, b.config.Terminator) - if idx >= 0 { - encodedData = b.data[:idx] - b.data = b.data[idx+len(b.config.Terminator):] - } else { - encodedData = b.data - b.data = nil - } - } else { - encodedData = b.data - b.data = nil - } - - if len(encodedData) == 0 { - return model.MessageTypeUnknown, false, nil - } - - if err := json.Unmarshal(encodedData, msg); err != nil { - log.Error("canal-json decoder unmarshal data failed", - zap.Error(err), zap.ByteString("data", encodedData)) + if err := b.decoder.Decode(b.msg); err != nil { + log.Error("canal-json decoder decode failed", + zap.Error(err), zap.ByteString("data", b.decoder.Bytes())) return model.MessageTypeUnknown, false, err } - b.msg = msg return b.msg.messageType(), true, nil } @@ -217,18 +233,15 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( table = message.Table eventType = message.EventType ) - - handleKeyData := message.getData() - pkNames := make([]string, 0, len(handleKeyData)) - for name := range handleKeyData { - pkNames = append(pkNames, name) + conditions := make(map[string]interface{}, len(message.pkNameSet())) + for name := range message.pkNameSet() { + conditions[name] = message.getData()[name] } - result := &canalJSONMessageWithTiDBExtension{ JSONMessage: &JSONMessage{ Schema: schema, Table: table, - PKNames: pkNames, + PKNames: message.PKNames, EventType: eventType, }, @@ -238,7 +251,7 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( } switch eventType { case "INSERT": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err @@ -246,7 +259,7 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( result.MySQLType = mysqlType result.Data = []map[string]interface{}{data} case "UPDATE": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err @@ -254,14 +267,14 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( result.MySQLType = mysqlType result.Data = []map[string]interface{}{data} - holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld()) + holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions) old, _, err := b.buildData(holder) if err != nil { return nil, err } result.Old = []map[string]interface{}{old} case "DELETE": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err @@ -343,7 +356,6 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if err != nil { return nil, err } - b.msg = nil return result, nil } @@ -356,7 +368,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } result := canalJSONMessage2DDLEvent(b.msg) - schema := *b.msg.getSchema() table := *b.msg.getTable() // if receive a table level DDL, just remove the table info to trigger create a new one. @@ -367,7 +378,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } delete(b.tableInfoCache, cacheKey) } - b.msg = nil return result, nil } @@ -386,6 +396,5 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) { return 0, cerror.ErrCanalDecodeFailed. GenWithStack("MessageTypeResolved tidb extension not found") } - b.msg = nil return withExtensionEvent.Extensions.WatermarkTs, nil } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index c75700d2d79..15c35cd8b5c 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -764,159 +764,3 @@ func TestE2EPartitionTable(t *testing.T) { require.Equal(t, decodedEvent.GetTableID(), int64(0)) } } - -func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { - _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) - ctx := context.Background() - - for _, encodeEnable := range []bool{false, true} { - encodeConfig := common.NewConfig(config.ProtocolCanalJSON) - encodeConfig.EnableTiDBExtension = encodeEnable - encodeConfig.Terminator = config.CRLF - - builder, err := NewJSONRowEventEncoderBuilder(ctx, encodeConfig) - require.NoError(t, err) - encoder := builder.Build() - - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) - require.NoError(t, err) - - messages := encoder.Build() - require.Equal(t, 1, len(messages)) - msg := messages[0] - - for _, decodeEnable := range []bool{false, true} { - decodeConfig := common.NewConfig(config.ProtocolCanalJSON) - decodeConfig.EnableTiDBExtension = decodeEnable - decoder, err := NewBatchDecoder(ctx, decodeConfig, nil) - require.NoError(t, err) - err = decoder.AddKeyValue(msg.Key, msg.Value) - require.NoError(t, err) - - ty, hasNext, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeRow, ty) - - decodedEvent, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - - if encodeEnable && decodeEnable { - require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs) - } - require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName()) - require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName()) - - decodedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) - for _, column := range decodedEvent.Columns { - colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) - decodedColumns[colName] = column - } - for _, col := range insertEvent.Columns { - colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := decodedColumns[colName] - require.True(t, ok) - switch v := col.Value.(type) { - case types.VectorFloat32: - require.EqualValues(t, v.String(), decoded.Value) - default: - require.EqualValues(t, v, decoded.Value) - } - } - - _, hasNext, _ = decoder.HasNext() - require.False(t, hasNext) - - decodedEvent, err = decoder.NextRowChangedEvent() - require.Error(t, err) - require.Nil(t, decodedEvent) - } - } -} - -func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) - defer helper.Close() - - sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))` - ddlEvent := helper.DDL2Event(sql) - - ctx := context.Background() - for _, encodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = encodeEnable - - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() - - result, err := encoder.EncodeDDLEvent(ddlEvent) - require.NoError(t, err) - require.NotNil(t, result) - - for _, decodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = decodeEnable - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - err = decoder.AddKeyValue(nil, result.Value) - require.NoError(t, err) - - ty, hasNext, err := decoder.HasNext() - require.Nil(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeDDL, ty) - - consumed, err := decoder.NextDDLEvent() - require.Nil(t, err) - - if encodeEnable && decodeEnable { - require.Equal(t, ddlEvent.CommitTs, consumed.CommitTs) - } else { - require.Equal(t, uint64(0), consumed.CommitTs) - } - - require.Equal(t, ddlEvent.TableInfo.TableName.Schema, consumed.TableInfo.TableName.Schema) - require.Equal(t, ddlEvent.TableInfo.TableName.Table, consumed.TableInfo.TableName.Table) - require.Equal(t, ddlEvent.Query, consumed.Query) - - ty, hasNext, err = decoder.HasNext() - require.Nil(t, err) - require.False(t, hasNext) - require.Equal(t, model.MessageTypeUnknown, ty) - - consumed, err = decoder.NextDDLEvent() - require.NotNil(t, err) - require.Nil(t, consumed) - } - } -} - -func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { - encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null} -{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]} -{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}` - ctx := context.Background() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.Terminator = "\n" - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - - err = decoder.AddKeyValue(nil, []byte(encodedValue)) - require.NoError(t, err) - - cnt := 0 - for { - tp, hasNext, err := decoder.HasNext() - if !hasNext { - break - } - require.NoError(t, err) - require.Equal(t, model.MessageTypeRow, tp) - cnt++ - event, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - require.NotNil(t, event) - } - require.Equal(t, 3, cnt) -} diff --git a/pkg/sink/codec/canal/canal_json_txn_event_decoder.go b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go new file mode 100644 index 00000000000..18380402a77 --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go @@ -0,0 +1,172 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "bytes" + "encoding/json" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + canal "github.com/pingcap/tiflow/proto/canal" + "go.uber.org/zap" +) + +type canalJSONTxnEventDecoder struct { + data []byte + + config *common.Config + msg canalJSONMessageInterface +} + +// NewCanalJSONTxnEventDecoder return a new CanalJSONTxnEventDecoder. +func NewCanalJSONTxnEventDecoder( + codecConfig *common.Config, +) *canalJSONTxnEventDecoder { + return &canalJSONTxnEventDecoder{ + config: codecConfig, + } +} + +// AddKeyValue set the key value to the decoder +func (d *canalJSONTxnEventDecoder) AddKeyValue(_, value []byte) error { + value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value) + if err != nil { + log.Error("decompress data failed", + zap.String("compression", d.config.LargeMessageHandle.LargeMessageHandleCompression), + zap.Error(err)) + + return errors.Trace(err) + } + d.data = value + return nil +} + +// HasNext return true if there is any event can be returned. +func (d *canalJSONTxnEventDecoder) HasNext() (model.MessageType, bool, error) { + if d.data == nil { + return model.MessageTypeUnknown, false, nil + } + var ( + msg canalJSONMessageInterface = &JSONMessage{} + encodedData []byte + ) + + if d.config.EnableTiDBExtension { + msg = &canalJSONMessageWithTiDBExtension{ + JSONMessage: &JSONMessage{}, + Extensions: &tidbExtension{}, + } + } + + idx := bytes.IndexAny(d.data, d.config.Terminator) + if idx >= 0 { + encodedData = d.data[:idx] + d.data = d.data[idx+len(d.config.Terminator):] + } else { + encodedData = d.data + d.data = nil + } + + if len(encodedData) == 0 { + return model.MessageTypeUnknown, false, nil + } + + if err := json.Unmarshal(encodedData, msg); err != nil { + log.Error("canal-json decoder unmarshal data failed", + zap.Error(err), zap.ByteString("data", encodedData)) + return model.MessageTypeUnknown, false, err + } + d.msg = msg + return d.msg.messageType(), true, nil +} + +// NextRowChangedEvent implements the RowEventDecoder interface +// `HasNext` should be called before this. +func (d *canalJSONTxnEventDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + if d.msg == nil || d.msg.messageType() != model.MessageTypeRow { + return nil, cerror.ErrCanalDecodeFailed. + GenWithStack("not found row changed event message") + } + result, err := d.canalJSONMessage2RowChange() + if err != nil { + return nil, err + } + d.msg = nil + return result, nil +} + +func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) { + msg := d.msg + result := new(model.RowChangedEvent) + result.TableInfo = newTableInfo(msg) + result.CommitTs = msg.getCommitTs() + + mysqlType := msg.getMySQLType() + var err error + if msg.eventType() == canal.EventType_DELETE { + // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` + result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + return result, nil + } + + // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` + result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + + // for `UPDATE`, `old` contain old data, set it as the `PreColumns` + if msg.eventType() == canal.EventType_UPDATE { + preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + if len(preCols) < len(result.Columns) { + newPreCols := make([]*model.ColumnData, 0, len(preCols)) + j := 0 + // Columns are ordered by name + for _, col := range result.Columns { + if j < len(preCols) && col.ColumnID == preCols[j].ColumnID { + newPreCols = append(newPreCols, preCols[j]) + j += 1 + } else { + newPreCols = append(newPreCols, col) + } + } + preCols = newPreCols + } + result.PreColumns = preCols + if len(preCols) != len(result.Columns) { + log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns)) + } + } + return result, nil +} + +// NextResolvedEvent implements the RowEventDecoder interface +func (d *canalJSONTxnEventDecoder) NextResolvedEvent() (uint64, error) { + return 0, nil +} + +// NextDDLEvent implements the RowEventDecoder interface +func (d *canalJSONTxnEventDecoder) NextDDLEvent() (*model.DDLEvent, error) { + return nil, nil +} diff --git a/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go b/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go new file mode 100644 index 00000000000..e2d394f6727 --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go @@ -0,0 +1,122 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" + "github.com/stretchr/testify/require" +) + +func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { + _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + ctx := context.Background() + + for _, encodeEnable := range []bool{false, true} { + encodeConfig := common.NewConfig(config.ProtocolCanalJSON) + encodeConfig.EnableTiDBExtension = encodeEnable + encodeConfig.Terminator = config.CRLF + + builder, err := NewJSONRowEventEncoderBuilder(ctx, encodeConfig) + require.NoError(t, err) + encoder := builder.Build() + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) + + messages := encoder.Build() + require.Equal(t, 1, len(messages)) + msg := messages[0] + + for _, decodeEnable := range []bool{false, true} { + decodeConfig := common.NewConfig(config.ProtocolCanalJSON) + decodeConfig.EnableTiDBExtension = decodeEnable + + decoder := NewCanalJSONTxnEventDecoder(decodeConfig) + err = decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) + + ty, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, ty) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + + if encodeEnable && decodeEnable { + require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs) + } + require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName()) + require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName()) + + decodedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) + for _, column := range decodedEvent.Columns { + colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) + decodedColumns[colName] = column + } + for _, col := range insertEvent.Columns { + colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := decodedColumns[colName] + require.True(t, ok) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } + } + + _, hasNext, _ = decoder.HasNext() + require.False(t, hasNext) + + decodedEvent, err = decoder.NextRowChangedEvent() + require.Error(t, err) + require.Nil(t, decodedEvent) + } + } +} + +func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { + encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null} +{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]} +{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}` + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.Terminator = "\n" + decoder := NewCanalJSONTxnEventDecoder(codecConfig) + + err := decoder.AddKeyValue(nil, []byte(encodedValue)) + require.NoError(t, err) + + cnt := 0 + for { + tp, hasNext, err := decoder.HasNext() + if !hasNext { + break + } + require.NoError(t, err) + require.Equal(t, model.MessageTypeRow, tp) + cnt++ + event, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.NotNil(t, event) + } + require.Equal(t, 3, cnt) +}