diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 3c0ac06dcef..d2407c2cc30 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -134,8 +134,8 @@ func newWriter(ctx context.Context, o *option) *writer { zap.String("dsn", o.upstreamTiDBDSN)) } } + decoder, err := NewDecoder(ctx, o, db) for i := 0; i < int(o.partitionNum); i++ { - decoder, err := NewDecoder(ctx, o, db) if err != nil { log.Panic("cannot create the decoder", zap.Error(err)) } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index cd1a0a92c98..bc381e16986 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -26,6 +26,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" + timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" @@ -37,6 +40,11 @@ import ( "golang.org/x/text/encoding/charmap" ) +type tableKey struct { + schema string + table string +} + // batchDecoder decodes the byte into the original message. type batchDecoder struct { data []byte @@ -48,6 +56,8 @@ type batchDecoder struct { upstreamTiDB *sql.DB bytesDecoder *encoding.Decoder + + tableInfoCache map[tableKey]*model.TableInfo } // NewBatchDecoder return a decoder for canal-json @@ -72,10 +82,11 @@ func NewBatchDecoder( } return &batchDecoder{ - config: codecConfig, - storage: externalStorage, - upstreamTiDB: db, - bytesDecoder: charmap.ISO8859_1.NewDecoder(), + config: codecConfig, + storage: externalStorage, + upstreamTiDB: db, + bytesDecoder: charmap.ISO8859_1.NewDecoder(), + tableInfoCache: make(map[tableKey]*model.TableInfo), }, nil } @@ -263,6 +274,52 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( return b.NextRowChangedEvent() } +func setColumnInfos( + tableInfo *timodel.TableInfo, + rawColumns map[string]interface{}, + mysqlType map[string]string, + pkNames map[string]struct{}, +) { + mockColumnID := int64(100) + for name := range rawColumns { + columnInfo := new(timodel.ColumnInfo) + columnInfo.ID = mockColumnID + columnInfo.Name = pmodel.NewCIStr(name) + if utils.IsBinaryMySQLType(mysqlType[name]) { + columnInfo.AddFlag(mysql.BinaryFlag) + } + if _, isPK := pkNames[name]; isPK { + columnInfo.AddFlag(mysql.PriKeyFlag) + } + tableInfo.Columns = append(tableInfo.Columns, columnInfo) + mockColumnID++ + } +} + +func setIndexes( + tableInfo *timodel.TableInfo, + pkNames map[string]struct{}, +) { + indexColumns := make([]*timodel.IndexColumn, 0, len(pkNames)) + for idx, col := range tableInfo.Columns { + name := col.Name.O + if _, ok := pkNames[name]; ok { + indexColumns = append(indexColumns, &timodel.IndexColumn{ + Name: pmodel.NewCIStr(name), + Offset: idx, + }) + } + } + indexInfo := &timodel.IndexInfo{ + ID: 1, + Name: pmodel.NewCIStr("primary"), + Columns: indexColumns, + Unique: true, + Primary: true, + } + tableInfo.Indices = append(tableInfo.Indices, indexInfo) +} + // NextRowChangedEvent implements the RowEventDecoder interface // `HasNext` should be called before this. func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { @@ -282,7 +339,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } } - result, err := canalJSONMessage2RowChange(b.msg) + result, err := b.canalJSONMessage2RowChange() if err != nil { return nil, err } @@ -299,6 +356,17 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } result := canalJSONMessage2DDLEvent(b.msg) + + schema := *b.msg.getSchema() + table := *b.msg.getTable() + // if receive a table level DDL, just remove the table info to trigger create a new one. + if schema != "" && table != "" { + cacheKey := tableKey{ + schema: schema, + table: table, + } + delete(b.tableInfoCache, cacheKey) + } b.msg = nil return result, nil } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index c4ce63f6ddb..74234875ed8 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -14,12 +14,12 @@ package canal import ( - "sort" "strconv" "strings" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -154,36 +154,68 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 { return c.Extensions.CommitTs } -func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) { +func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo { + cacheKey := tableKey{ + schema: *msg.getSchema(), + table: *msg.getTable(), + } + tableInfo, ok := b.tableInfoCache[cacheKey] + if !ok { + tableInfo = newTableInfo(msg) + b.tableInfoCache[cacheKey] = tableInfo + } + return tableInfo +} + +func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo { + schema := *msg.getSchema() + table := *msg.getTable() + tidbTableInfo := &timodel.TableInfo{} + tidbTableInfo.Name = pmodel.NewCIStr(table) + + rawColumns := msg.getData() + pkNames := msg.pkNameSet() + mysqlType := msg.getMySQLType() + setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames) + setIndexes(tidbTableInfo, pkNames) + return model.WrapTableInfo(100, schema, 1000, tidbTableInfo) +} + +func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) { + msg := b.msg result := new(model.RowChangedEvent) + result.TableInfo = b.queryTableInfo(msg) result.CommitTs = msg.getCommitTs() + mysqlType := msg.getMySQLType() var err error if msg.eventType() == canal.EventType_DELETE { // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) - result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo) - return result, err + result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + return result, nil } // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` - cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) - result.Columns = model.Columns2ColumnDatas(cols, result.TableInfo) + result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) if err != nil { return nil, err } // for `UPDATE`, `old` contain old data, set it as the `PreColumns` if msg.eventType() == canal.EventType_UPDATE { - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) - if len(preCols) < len(cols) { - newPreCols := make([]*model.Column, 0, len(preCols)) + preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + if len(preCols) < len(result.Columns) { + newPreCols := make([]*model.ColumnData, 0, len(preCols)) j := 0 // Columns are ordered by name - for _, col := range cols { - if j < len(preCols) && col.Name == preCols[j].Name { + for _, col := range result.Columns { + if j < len(preCols) && col.ColumnID == preCols[j].ColumnID { newPreCols = append(newPreCols, preCols[j]) j += 1 } else { @@ -192,45 +224,45 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange } preCols = newPreCols } - if len(preCols) != len(cols) { - log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols)) - } - result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo) - if err != nil { - return nil, err + result.PreColumns = preCols + if len(preCols) != len(result.Columns) { + log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns)) } } return result, nil } -func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*model.Column, error) { - result := make([]*model.Column, 0, len(cols)) - for name, value := range cols { +func canalJSONColumnMap2RowChangeColumns( + cols map[string]interface{}, + mysqlType map[string]string, + tableInfo *model.TableInfo, +) ([]*model.ColumnData, error) { + result := make([]*model.ColumnData, 0, len(cols)) + for _, columnInfo := range tableInfo.Columns { + name := columnInfo.Name.O + value, ok := cols[name] + if !ok { + continue + } mysqlTypeStr, ok := mysqlType[name] if !ok { // this should not happen, else we have to check encoding for mysqlType. return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( "mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType) } - col := canalJSONFormatColumn(value, name, mysqlTypeStr) + col := canalJSONFormatColumn(columnInfo.ID, value, mysqlTypeStr) result = append(result, col) } - if len(result) == 0 { - return nil, nil - } - sort.Slice(result, func(i, j int) bool { - return strings.Compare(result[i].Name, result[j].Name) > 0 - }) + return result, nil } -func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column { +func canalJSONFormatColumn(columnID int64, value interface{}, mysqlTypeStr string) *model.ColumnData { mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr) - result := &model.Column{ - Type: mysqlType, - Name: name, - Value: value, + result := &model.ColumnData{ + ColumnID: columnID, + Value: value, } if result.Value == nil { return result diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index e93ffabed49..c75700d2d79 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -640,49 +640,75 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true codecConfig.ContentCompatible = true + codecConfig.OnlyOutputUpdatedColumns = true builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) require.NoError(t, err) encoder := builder.Build() - _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) require.NoError(t, err) - message := encoder.Build()[0] + _, insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + events := []*model.RowChangedEvent{ + insertEvent, + updateEvent, + deleteEvent, + } - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) + for _, event := range events { + err = encoder.AppendRowChangedEvent(ctx, "", event, func() {}) + require.NoError(t, err) - err = decoder.AddKeyValue(message.Key, message.Value) - require.NoError(t, err) + message := encoder.Build()[0] - messageType, hasNext, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, messageType, model.MessageTypeRow) + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) - decodedEvent, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) - require.Equal(t, decodedEvent.TableInfo.GetSchemaName(), insertEvent.TableInfo.GetSchemaName()) - require.Equal(t, decodedEvent.TableInfo.GetTableName(), insertEvent.TableInfo.GetTableName()) + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) - obtainedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) - for _, column := range decodedEvent.Columns { - colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) - obtainedColumns[colName] = column - } - for _, col := range insertEvent.Columns { - colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := obtainedColumns[colName] - require.True(t, ok) - switch v := col.Value.(type) { - case types.VectorFloat32: - require.EqualValues(t, v.String(), decoded.Value) - default: - require.EqualValues(t, v, decoded.Value) + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Equal(t, decodedEvent.CommitTs, event.CommitTs) + require.Equal(t, decodedEvent.TableInfo.GetSchemaName(), event.TableInfo.GetSchemaName()) + require.Equal(t, decodedEvent.TableInfo.GetTableName(), event.TableInfo.GetTableName()) + + obtainedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) + for _, column := range decodedEvent.Columns { + colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) + obtainedColumns[colName] = column + } + for _, col := range event.Columns { + colName := event.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := obtainedColumns[colName] + require.True(t, ok) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } + } + + obtainedPreColumns := make(map[string]*model.ColumnData, len(decodedEvent.PreColumns)) + for _, column := range decodedEvent.PreColumns { + colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) + obtainedPreColumns[colName] = column + } + for _, col := range event.PreColumns { + colName := event.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := obtainedPreColumns[colName] + require.True(t, ok) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index be97c6a20c5..ea5ba1cbbcd 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -74,7 +74,7 @@ type Config struct { OutputOldValue bool OutputHandleKey bool - // for open protocol + // for open protocol, and canal-json OnlyOutputUpdatedColumns bool // Whether old value should be excluded in the output. OpenOutputOldValue bool diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder.go index 748644e9015..e3b232c9f0c 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder.go @@ -17,6 +17,7 @@ import ( "bytes" "context" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/codec/common" ) @@ -82,6 +83,13 @@ func IsColumnValueEqual(preValue, updatedValue interface{}) bool { if ok1 && ok2 { return bytes.Equal(preValueBytes, updatedValueBytes) } + + preValueVector, ok1 := preValue.(types.VectorFloat32) + updatedValueVector, ok2 := updatedValue.(types.VectorFloat32) + if ok1 && ok2 { + return preValueVector.Compare(updatedValueVector) == 0 + } + // mounter use the same table info to parse the value, // the value type should be the same return preValue == updatedValue diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index e0d13abe0dd..ab915ad1029 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -441,7 +441,7 @@ func decodeColumns( if rawData == nil { return nil } - var result []*model.ColumnData + result := make([]*model.ColumnData, 0, len(tableInfo.Columns)) for _, info := range tableInfo.Columns { value, ok := rawData[info.Name.O] if !ok {