From f9cbca3d2298fffc13c94851dc43790e491d5d40 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 17 Dec 2024 18:10:18 +0800 Subject: [PATCH 01/16] cache the table info, to avoid create new memory on each incoming message --- pkg/sink/codec/canal/canal_json_decoder.go | 20 +++++++++++++++- pkg/sink/codec/canal/canal_json_message.go | 27 +++++++++++++++++----- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index cd1a0a92c98..c6cfda0cfac 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -37,6 +37,11 @@ import ( "golang.org/x/text/encoding/charmap" ) +type tableKey struct { + schema string + table string +} + // batchDecoder decodes the byte into the original message. type batchDecoder struct { data []byte @@ -48,6 +53,8 @@ type batchDecoder struct { upstreamTiDB *sql.DB bytesDecoder *encoding.Decoder + + tableInfoCache map[tableKey]*model.TableInfo } // NewBatchDecoder return a decoder for canal-json @@ -282,7 +289,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } } - result, err := canalJSONMessage2RowChange(b.msg) + result, err := b.canalJSONMessage2RowChange(b.msg) if err != nil { return nil, err } @@ -300,6 +307,17 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { result := canalJSONMessage2DDLEvent(b.msg) b.msg = nil + + schema := *b.msg.getSchema() + table := *b.msg.getTable() + // if receive a table level DDL, just remove the table info to trigger create a new one. + if schema != "" && table != "" { + cacheKey := tableKey{ + schema: schema, + table: table, + } + delete(b.tableInfoCache, cacheKey) + } return result, nil } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index c4ce63f6ddb..be11b4e4bed 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -154,23 +154,38 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 { return c.Extensions.CommitTs } -func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) { +func (b *batchDecoder) queryTableInfo(schema, table string, columns []*model.Column, pkNames map[string]struct{}) *model.TableInfo { + cacheKey := tableKey{ + schema: schema, + table: table, + } + tableInfo, ok := b.tableInfoCache[cacheKey] + if !ok { + tableInfo = model.BuildTableInfoWithPKNames4Test(schema, table, columns, pkNames) + b.tableInfoCache[cacheKey] = tableInfo + } + return tableInfo +} + +func (b *batchDecoder) canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) { result := new(model.RowChangedEvent) result.CommitTs = msg.getCommitTs() mysqlType := msg.getMySQLType() - var err error + if msg.eventType() == canal.EventType_DELETE { // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) - result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo) + tableInfo := b.queryTableInfo(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) + result.TableInfo = tableInfo + result.PreColumns = model.Columns2ColumnDatas(preCols, tableInfo) return result, err } // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) - result.Columns = model.Columns2ColumnDatas(cols, result.TableInfo) + tableInfo := b.queryTableInfo(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) + result.TableInfo = tableInfo + result.Columns = model.Columns2ColumnDatas(cols, tableInfo) if err != nil { return nil, err } From 64f991c840db1ab90e3114f0e1282f000f703a7e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 17 Dec 2024 18:14:09 +0800 Subject: [PATCH 02/16] all partition share the same decoder, since only the first one partition can receive the ddl message --- cmd/kafka-consumer/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 3c0ac06dcef..d2407c2cc30 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -134,8 +134,8 @@ func newWriter(ctx context.Context, o *option) *writer { zap.String("dsn", o.upstreamTiDBDSN)) } } + decoder, err := NewDecoder(ctx, o, db) for i := 0; i < int(o.partitionNum); i++ { - decoder, err := NewDecoder(ctx, o, db) if err != nil { log.Panic("cannot create the decoder", zap.Error(err)) } From c5ca16dcbaf3677027ca95f231358e3006e0a153 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 17 Dec 2024 18:49:22 +0800 Subject: [PATCH 03/16] new the table info cache --- pkg/sink/codec/canal/canal_json_decoder.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index c6cfda0cfac..8f63a80b859 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -79,10 +79,11 @@ func NewBatchDecoder( } return &batchDecoder{ - config: codecConfig, - storage: externalStorage, - upstreamTiDB: db, - bytesDecoder: charmap.ISO8859_1.NewDecoder(), + config: codecConfig, + storage: externalStorage, + upstreamTiDB: db, + bytesDecoder: charmap.ISO8859_1.NewDecoder(), + tableInfoCache: make(map[tableKey]*model.TableInfo), }, nil } From ad3c830cbb2b8a2a76ffb76a5be8423bf245e7bc Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 11:51:02 +0800 Subject: [PATCH 04/16] fix the canal-json decoder --- pkg/sink/codec/canal/canal_json_decoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 8f63a80b859..eb19778bd00 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -307,7 +307,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } result := canalJSONMessage2DDLEvent(b.msg) - b.msg = nil schema := *b.msg.getSchema() table := *b.msg.getTable() @@ -319,6 +318,7 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } delete(b.tableInfoCache, cacheKey) } + b.msg = nil return result, nil } From e5eedaef17c1a68d9296d98dba76e47b8b58b292 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 14:49:01 +0800 Subject: [PATCH 05/16] build table info from the dml events --- pkg/sink/codec/canal/canal_json_decoder.go | 71 ++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index eb19778bd00..9fa2f1a0a89 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -17,6 +17,9 @@ import ( "bytes" "context" "database/sql" + timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "path/filepath" "strconv" "strings" @@ -271,6 +274,72 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( return b.NextRowChangedEvent() } +func setColumnInfos( + tableInfo *timodel.TableInfo, + rawColumns map[string]interface{}, + mysqlType map[string]string, + pkNames map[string]struct{}, +) { + mockColumnID := int64(100) + for name, _ := range rawColumns { + _, isPK := pkNames[name] + columnInfo := new(timodel.ColumnInfo) + columnInfo.ID = mockColumnID + columnInfo.Name = pmodel.NewCIStr(name) + if utils.IsBinaryMySQLType(mysqlType[name]) { + columnInfo.AddFlag(mysql.BinaryFlag) + } + if isPK { + columnInfo.AddFlag(mysql.PriKeyFlag) + } + tableInfo.Columns = append(tableInfo.Columns, columnInfo) + mockColumnID++ + } +} + +func setIndexes( + tableInfo *timodel.TableInfo, + pkNames map[string]struct{}, +) { + indexColumns := make([]*timodel.IndexColumn, 0, len(pkNames)) + offsets := make(map[string]int, len(pkNames)) + for idx, col := range tableInfo.Columns { + name := col.Name.O + if _, ok := pkNames[name]; ok { + offsets[name] = idx + } + } + for name, _ := range pkNames { + indexColumns = append(indexColumns, &timodel.IndexColumn{ + Name: pmodel.NewCIStr(name), + Offset: offsets[name], + }) + } + + indexInfo := &timodel.IndexInfo{ + ID: 1, + Name: pmodel.NewCIStr("PRIMARY"), + Columns: indexColumns, + Unique: true, + Primary: true, + } + tableInfo.Indices = append(tableInfo.Indices, indexInfo) +} + +func newTableInfo(msg canalJSONMessageInterface) (*model.TableInfo, error) { + schema := *msg.getSchema() + table := *msg.getTable() + tidbTableInfo := &timodel.TableInfo{} + tidbTableInfo.Name = pmodel.NewCIStr(table) + + rawColumns := msg.getData() + pkNames := msg.pkNameSet() + mysqlType := msg.getMySQLType() + setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames) + setIndexes(tidbTableInfo, pkNames) + return model.WrapTableInfo(100, schema, 1000, tidbTableInfo), nil +} + // NextRowChangedEvent implements the RowEventDecoder interface // `HasNext` should be called before this. func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { @@ -290,6 +359,8 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } } + //tableInfo := b.queryTableInfo(b.msg.getSchema(), ) + result, err := b.canalJSONMessage2RowChange(b.msg) if err != nil { return nil, err From ffc3c3bcce1500ff7cff6b07a4ccd4d872a43c61 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 15:23:25 +0800 Subject: [PATCH 06/16] try to build table schema from the dml message --- pkg/sink/codec/canal/canal_json_decoder.go | 16 ---- pkg/sink/codec/canal/canal_json_message.go | 96 +++++++++++++--------- 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 9fa2f1a0a89..4e42408f4f2 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -326,20 +326,6 @@ func setIndexes( tableInfo.Indices = append(tableInfo.Indices, indexInfo) } -func newTableInfo(msg canalJSONMessageInterface) (*model.TableInfo, error) { - schema := *msg.getSchema() - table := *msg.getTable() - tidbTableInfo := &timodel.TableInfo{} - tidbTableInfo.Name = pmodel.NewCIStr(table) - - rawColumns := msg.getData() - pkNames := msg.pkNameSet() - mysqlType := msg.getMySQLType() - setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames) - setIndexes(tidbTableInfo, pkNames) - return model.WrapTableInfo(100, schema, 1000, tidbTableInfo), nil -} - // NextRowChangedEvent implements the RowEventDecoder interface // `HasNext` should be called before this. func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { @@ -359,8 +345,6 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } } - //tableInfo := b.queryTableInfo(b.msg.getSchema(), ) - result, err := b.canalJSONMessage2RowChange(b.msg) if err != nil { return nil, err diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index be11b4e4bed..23dda6c5ec6 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -14,7 +14,7 @@ package canal import ( - "sort" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "strconv" "strings" @@ -167,38 +167,56 @@ func (b *batchDecoder) queryTableInfo(schema, table string, columns []*model.Col return tableInfo } -func (b *batchDecoder) canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) { +func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo { + schema := *msg.getSchema() + table := *msg.getTable() + tidbTableInfo := &timodel.TableInfo{} + tidbTableInfo.Name = pmodel.NewCIStr(table) + + rawColumns := msg.getData() + pkNames := msg.pkNameSet() + mysqlType := msg.getMySQLType() + setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames) + setIndexes(tidbTableInfo, pkNames) + return model.WrapTableInfo(100, schema, 1000, tidbTableInfo) +} + +func (b *batchDecoder) canalJSONMessage2RowChange( + msg canalJSONMessageInterface, +) (*model.RowChangedEvent, error) { result := new(model.RowChangedEvent) + result.TableInfo = newTableInfo(b.msg) result.CommitTs = msg.getCommitTs() - mysqlType := msg.getMySQLType() + mysqlType := msg.getMySQLType() + var err error if msg.eventType() == canal.EventType_DELETE { // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - tableInfo := b.queryTableInfo(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) - result.TableInfo = tableInfo - result.PreColumns = model.Columns2ColumnDatas(preCols, tableInfo) - return result, err + result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + return result, nil } // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` - cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - tableInfo := b.queryTableInfo(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) - result.TableInfo = tableInfo - result.Columns = model.Columns2ColumnDatas(cols, tableInfo) + result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo) if err != nil { return nil, err } // for `UPDATE`, `old` contain old data, set it as the `PreColumns` if msg.eventType() == canal.EventType_UPDATE { - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) - if len(preCols) < len(cols) { - newPreCols := make([]*model.Column, 0, len(preCols)) + preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType, result.TableInfo) + if err != nil { + return nil, err + } + if len(preCols) < len(result.Columns) { + newPreCols := make([]*model.ColumnData, 0, len(preCols)) j := 0 // Columns are ordered by name - for _, col := range cols { - if j < len(preCols) && col.Name == preCols[j].Name { + for _, col := range result.Columns { + if j < len(preCols) && col.ColumnID == preCols[j].ColumnID { newPreCols = append(newPreCols, preCols[j]) j += 1 } else { @@ -207,45 +225,47 @@ func (b *batchDecoder) canalJSONMessage2RowChange(msg canalJSONMessageInterface) } preCols = newPreCols } - if len(preCols) != len(cols) { - log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols)) - } - result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo) - if err != nil { - return nil, err + result.PreColumns = preCols + if len(preCols) != len(result.Columns) { + log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns)) } } return result, nil } -func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*model.Column, error) { - result := make([]*model.Column, 0, len(cols)) - for name, value := range cols { +func canalJSONColumnMap2RowChangeColumns( + cols map[string]interface{}, + mysqlType map[string]string, + tableInfo *model.TableInfo, +) ([]*model.ColumnData, error) { + result := make([]*model.ColumnData, 0, len(cols)) + for _, columnInfo := range tableInfo.Columns { + name := columnInfo.Name.O mysqlTypeStr, ok := mysqlType[name] if !ok { // this should not happen, else we have to check encoding for mysqlType. return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( "mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType) } - col := canalJSONFormatColumn(value, name, mysqlTypeStr) + value, ok := cols[name] + if !ok { + // this should not happen, else we have to check encoding for cols. + return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( + "column value does not found, column: %+v, cols: %+v", name, cols) + } + col := canalJSONFormatColumn(columnInfo.ID, value, mysqlTypeStr) result = append(result, col) } - if len(result) == 0 { - return nil, nil - } - sort.Slice(result, func(i, j int) bool { - return strings.Compare(result[i].Name, result[j].Name) > 0 - }) + return result, nil } -func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column { +func canalJSONFormatColumn(columnID int64, value interface{}, mysqlTypeStr string) *model.ColumnData { mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr) - result := &model.Column{ - Type: mysqlType, - Name: name, - Value: value, + result := &model.ColumnData{ + ColumnID: columnID, + Value: value, } if result.Value == nil { return result From 3ffb1d7e787214611470359b5b7aee5e773f167e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 15:33:41 +0800 Subject: [PATCH 07/16] try to build table schema from the dml message --- pkg/sink/codec/canal/canal_json_decoder.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 4e42408f4f2..1a1784ff94b 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -282,14 +282,13 @@ func setColumnInfos( ) { mockColumnID := int64(100) for name, _ := range rawColumns { - _, isPK := pkNames[name] columnInfo := new(timodel.ColumnInfo) columnInfo.ID = mockColumnID columnInfo.Name = pmodel.NewCIStr(name) if utils.IsBinaryMySQLType(mysqlType[name]) { columnInfo.AddFlag(mysql.BinaryFlag) } - if isPK { + if _, isPK := pkNames[name]; isPK { columnInfo.AddFlag(mysql.PriKeyFlag) } tableInfo.Columns = append(tableInfo.Columns, columnInfo) @@ -302,23 +301,18 @@ func setIndexes( pkNames map[string]struct{}, ) { indexColumns := make([]*timodel.IndexColumn, 0, len(pkNames)) - offsets := make(map[string]int, len(pkNames)) for idx, col := range tableInfo.Columns { name := col.Name.O if _, ok := pkNames[name]; ok { - offsets[name] = idx + indexColumns = append(indexColumns, &timodel.IndexColumn{ + Name: pmodel.NewCIStr(name), + Offset: idx, + }) } } - for name, _ := range pkNames { - indexColumns = append(indexColumns, &timodel.IndexColumn{ - Name: pmodel.NewCIStr(name), - Offset: offsets[name], - }) - } - indexInfo := &timodel.IndexInfo{ ID: 1, - Name: pmodel.NewCIStr("PRIMARY"), + Name: pmodel.NewCIStr("primary"), Columns: indexColumns, Unique: true, Primary: true, From 9a7d6c82dc4605392d70bf0add32aac69668ac31 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 15:53:53 +0800 Subject: [PATCH 08/16] use flag pointer, instead of copy --- cdc/model/schema_storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index b7ad2d5db42..5aade7cc880 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -151,7 +151,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode ti.rowColInfos[i] = rowcodec.ColInfo{ ID: col.ID, IsPKHandle: pkIsHandle, - Ft: col.FieldType.Clone(), + Ft: &col.FieldType, VirtualGenCol: col.IsGenerated(), } ti.rowColFieldTps[col.ID] = ti.rowColInfos[i].Ft From 74cc7eaa63f128e3fd6150044db7aec4c6cc3bf8 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 16:05:48 +0800 Subject: [PATCH 09/16] query table info --- cdc/model/schema_storage.go | 2 +- pkg/sink/codec/canal/canal_json_message.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 5aade7cc880..b7ad2d5db42 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -151,7 +151,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode ti.rowColInfos[i] = rowcodec.ColInfo{ ID: col.ID, IsPKHandle: pkIsHandle, - Ft: &col.FieldType, + Ft: col.FieldType.Clone(), VirtualGenCol: col.IsGenerated(), } ti.rowColFieldTps[col.ID] = ti.rowColInfos[i].Ft diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 23dda6c5ec6..181db668790 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -154,14 +154,14 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 { return c.Extensions.CommitTs } -func (b *batchDecoder) queryTableInfo(schema, table string, columns []*model.Column, pkNames map[string]struct{}) *model.TableInfo { +func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo { cacheKey := tableKey{ - schema: schema, - table: table, + schema: *msg.getSchema(), + table: *msg.getTable(), } tableInfo, ok := b.tableInfoCache[cacheKey] if !ok { - tableInfo = model.BuildTableInfoWithPKNames4Test(schema, table, columns, pkNames) + tableInfo = newTableInfo(msg) b.tableInfoCache[cacheKey] = tableInfo } return tableInfo @@ -185,7 +185,7 @@ func (b *batchDecoder) canalJSONMessage2RowChange( msg canalJSONMessageInterface, ) (*model.RowChangedEvent, error) { result := new(model.RowChangedEvent) - result.TableInfo = newTableInfo(b.msg) + result.TableInfo = b.queryTableInfo(msg) result.CommitTs = msg.getCommitTs() mysqlType := msg.getMySQLType() From 981de3a6a7655de0ea101dff7c679c1369f71d92 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 17:05:58 +0800 Subject: [PATCH 10/16] refactor the signature --- pkg/sink/codec/canal/canal_json_decoder.go | 2 +- pkg/sink/codec/canal/canal_json_message.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 1a1784ff94b..0db71081dbd 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -339,7 +339,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } } - result, err := b.canalJSONMessage2RowChange(b.msg) + result, err := b.canalJSONMessage2RowChange() if err != nil { return nil, err } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 181db668790..d55b9a2c3be 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -181,9 +181,8 @@ func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo { return model.WrapTableInfo(100, schema, 1000, tidbTableInfo) } -func (b *batchDecoder) canalJSONMessage2RowChange( - msg canalJSONMessageInterface, -) (*model.RowChangedEvent, error) { +func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) { + msg := b.msg result := new(model.RowChangedEvent) result.TableInfo = b.queryTableInfo(msg) result.CommitTs = msg.getCommitTs() From 8e6a322e7e071bdf763930574c5c0c727848fc57 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 17:34:24 +0800 Subject: [PATCH 11/16] fix the make fmt --- pkg/sink/codec/canal/canal_json_decoder.go | 6 +++--- pkg/sink/codec/canal/canal_json_message.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 0db71081dbd..1419dfdd70d 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -17,9 +17,6 @@ import ( "bytes" "context" "database/sql" - timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" "path/filepath" "strconv" "strings" @@ -29,6 +26,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" + timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index d55b9a2c3be..afafb80c5a9 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -14,12 +14,12 @@ package canal import ( - pmodel "github.com/pingcap/tidb/pkg/parser/model" "strconv" "strings" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" From 1cf6afef2f6769b94d24d32c6a91231fdb1b3dd8 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 17:50:36 +0800 Subject: [PATCH 12/16] fix the make fmt --- pkg/sink/codec/canal/canal_json_decoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 1419dfdd70d..bc381e16986 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -281,7 +281,7 @@ func setColumnInfos( pkNames map[string]struct{}, ) { mockColumnID := int64(100) - for name, _ := range rawColumns { + for name := range rawColumns { columnInfo := new(timodel.ColumnInfo) columnInfo.ID = mockColumnID columnInfo.Name = pmodel.NewCIStr(name) From 2683725b78ba62403a08eb2a9b90f35b2fc9f9fb Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 18:14:11 +0800 Subject: [PATCH 13/16] reduce memory cost --- pkg/sink/codec/simple/message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index e0d13abe0dd..ab915ad1029 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -441,7 +441,7 @@ func decodeColumns( if rawData == nil { return nil } - var result []*model.ColumnData + result := make([]*model.ColumnData, 0, len(tableInfo.Columns)) for _, info := range tableInfo.Columns { value, ok := rawData[info.Name.O] if !ok { From df4d5cfbc8f3bf9419c6bae096485a51295c703e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 19:09:18 +0800 Subject: [PATCH 14/16] fix the case --- pkg/sink/codec/canal/canal_json_message.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index afafb80c5a9..74234875ed8 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -241,18 +241,16 @@ func canalJSONColumnMap2RowChangeColumns( result := make([]*model.ColumnData, 0, len(cols)) for _, columnInfo := range tableInfo.Columns { name := columnInfo.Name.O + value, ok := cols[name] + if !ok { + continue + } mysqlTypeStr, ok := mysqlType[name] if !ok { // this should not happen, else we have to check encoding for mysqlType. return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( "mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType) } - value, ok := cols[name] - if !ok { - // this should not happen, else we have to check encoding for cols. - return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( - "column value does not found, column: %+v, cols: %+v", name, cols) - } col := canalJSONFormatColumn(columnInfo.ID, value, mysqlTypeStr) result = append(result, col) } From 10e21dca14b7804ff95edeede788bd39212b0a58 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 19:39:58 +0800 Subject: [PATCH 15/16] fix the unit test --- .../canal_json_row_event_encoder_test.go | 86 ++++++++++++------- pkg/sink/codec/common/config.go | 2 +- pkg/sink/codec/encoder.go | 8 ++ 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index e93ffabed49..c75700d2d79 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -640,49 +640,75 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true codecConfig.ContentCompatible = true + codecConfig.OnlyOutputUpdatedColumns = true builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) require.NoError(t, err) encoder := builder.Build() - _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) require.NoError(t, err) - message := encoder.Build()[0] + _, insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + events := []*model.RowChangedEvent{ + insertEvent, + updateEvent, + deleteEvent, + } - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) + for _, event := range events { + err = encoder.AppendRowChangedEvent(ctx, "", event, func() {}) + require.NoError(t, err) - err = decoder.AddKeyValue(message.Key, message.Value) - require.NoError(t, err) + message := encoder.Build()[0] - messageType, hasNext, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, messageType, model.MessageTypeRow) + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) - decodedEvent, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) - require.Equal(t, decodedEvent.TableInfo.GetSchemaName(), insertEvent.TableInfo.GetSchemaName()) - require.Equal(t, decodedEvent.TableInfo.GetTableName(), insertEvent.TableInfo.GetTableName()) + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) - obtainedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) - for _, column := range decodedEvent.Columns { - colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) - obtainedColumns[colName] = column - } - for _, col := range insertEvent.Columns { - colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := obtainedColumns[colName] - require.True(t, ok) - switch v := col.Value.(type) { - case types.VectorFloat32: - require.EqualValues(t, v.String(), decoded.Value) - default: - require.EqualValues(t, v, decoded.Value) + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Equal(t, decodedEvent.CommitTs, event.CommitTs) + require.Equal(t, decodedEvent.TableInfo.GetSchemaName(), event.TableInfo.GetSchemaName()) + require.Equal(t, decodedEvent.TableInfo.GetTableName(), event.TableInfo.GetTableName()) + + obtainedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns)) + for _, column := range decodedEvent.Columns { + colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) + obtainedColumns[colName] = column + } + for _, col := range event.Columns { + colName := event.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := obtainedColumns[colName] + require.True(t, ok) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } + } + + obtainedPreColumns := make(map[string]*model.ColumnData, len(decodedEvent.PreColumns)) + for _, column := range decodedEvent.PreColumns { + colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID) + obtainedPreColumns[colName] = column + } + for _, col := range event.PreColumns { + colName := event.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := obtainedPreColumns[colName] + require.True(t, ok) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index be97c6a20c5..ea5ba1cbbcd 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -74,7 +74,7 @@ type Config struct { OutputOldValue bool OutputHandleKey bool - // for open protocol + // for open protocol, and canal-json OnlyOutputUpdatedColumns bool // Whether old value should be excluded in the output. OpenOutputOldValue bool diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder.go index 748644e9015..5f8d938f612 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder.go @@ -16,6 +16,7 @@ package codec import ( "bytes" "context" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -82,6 +83,13 @@ func IsColumnValueEqual(preValue, updatedValue interface{}) bool { if ok1 && ok2 { return bytes.Equal(preValueBytes, updatedValueBytes) } + + preValueVector, ok1 := preValue.(types.VectorFloat32) + updatedValueVector, ok2 := updatedValue.(types.VectorFloat32) + if ok1 && ok2 { + return preValueVector.Compare(updatedValueVector) == 0 + } + // mounter use the same table info to parse the value, // the value type should be the same return preValue == updatedValue From 1e536cc1d541c491b799da33c35315b710660eba Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Dec 2024 20:43:03 +0800 Subject: [PATCH 16/16] fix the unit test --- pkg/sink/codec/encoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder.go index 5f8d938f612..e3b232c9f0c 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder.go @@ -16,8 +16,8 @@ package codec import ( "bytes" "context" - "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/codec/common" )