Skip to content

Commit

Permalink
consumer(ticdc): canal-json decoder cache table info for each message…
Browse files Browse the repository at this point in the history
… to avoid allocate memory on each new message (#11895)

close #11894, close #11907
  • Loading branch information
3AceShowHand authored Dec 19, 2024
1 parent 8ba21cc commit fa598ba
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 73 deletions.
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func newWriter(ctx context.Context, o *option) *writer {
zap.String("dsn", o.upstreamTiDBDSN))
}
}
decoder, err := NewDecoder(ctx, o, db)
for i := 0; i < int(o.partitionNum); i++ {
decoder, err := NewDecoder(ctx, o, db)
if err != nil {
log.Panic("cannot create the decoder", zap.Error(err))
}
Expand Down
78 changes: 73 additions & 5 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
Expand All @@ -37,6 +40,11 @@ import (
"golang.org/x/text/encoding/charmap"
)

type tableKey struct {
schema string
table string
}

// batchDecoder decodes the byte into the original message.
type batchDecoder struct {
data []byte
Expand All @@ -48,6 +56,8 @@ type batchDecoder struct {

upstreamTiDB *sql.DB
bytesDecoder *encoding.Decoder

tableInfoCache map[tableKey]*model.TableInfo
}

// NewBatchDecoder return a decoder for canal-json
Expand All @@ -72,10 +82,11 @@ func NewBatchDecoder(
}

return &batchDecoder{
config: codecConfig,
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
config: codecConfig,
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
tableInfoCache: make(map[tableKey]*model.TableInfo),
}, nil
}

Expand Down Expand Up @@ -263,6 +274,52 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
return b.NextRowChangedEvent()
}

func setColumnInfos(
tableInfo *timodel.TableInfo,
rawColumns map[string]interface{},
mysqlType map[string]string,
pkNames map[string]struct{},
) {
mockColumnID := int64(100)
for name := range rawColumns {
columnInfo := new(timodel.ColumnInfo)
columnInfo.ID = mockColumnID
columnInfo.Name = pmodel.NewCIStr(name)
if utils.IsBinaryMySQLType(mysqlType[name]) {
columnInfo.AddFlag(mysql.BinaryFlag)
}
if _, isPK := pkNames[name]; isPK {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
tableInfo.Columns = append(tableInfo.Columns, columnInfo)
mockColumnID++
}
}

func setIndexes(
tableInfo *timodel.TableInfo,
pkNames map[string]struct{},
) {
indexColumns := make([]*timodel.IndexColumn, 0, len(pkNames))
for idx, col := range tableInfo.Columns {
name := col.Name.O
if _, ok := pkNames[name]; ok {
indexColumns = append(indexColumns, &timodel.IndexColumn{
Name: pmodel.NewCIStr(name),
Offset: idx,
})
}
}
indexInfo := &timodel.IndexInfo{
ID: 1,
Name: pmodel.NewCIStr("primary"),
Columns: indexColumns,
Unique: true,
Primary: true,
}
tableInfo.Indices = append(tableInfo.Indices, indexInfo)
}

// NextRowChangedEvent implements the RowEventDecoder interface
// `HasNext` should be called before this.
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
Expand All @@ -282,7 +339,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}
}

result, err := canalJSONMessage2RowChange(b.msg)
result, err := b.canalJSONMessage2RowChange()
if err != nil {
return nil, err
}
Expand All @@ -299,6 +356,17 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}

result := canalJSONMessage2DDLEvent(b.msg)

schema := *b.msg.getSchema()
table := *b.msg.getTable()
// if receive a table level DDL, just remove the table info to trigger create a new one.
if schema != "" && table != "" {
cacheKey := tableKey{
schema: schema,
table: table,
}
delete(b.tableInfoCache, cacheKey)
}
b.msg = nil
return result, nil
}
Expand Down
102 changes: 67 additions & 35 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package canal

import (
"sort"
"strconv"
"strings"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -154,36 +154,68 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 {
return c.Extensions.CommitTs
}

func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) {
func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
cacheKey := tableKey{
schema: *msg.getSchema(),
table: *msg.getTable(),
}
tableInfo, ok := b.tableInfoCache[cacheKey]
if !ok {
tableInfo = newTableInfo(msg)
b.tableInfoCache[cacheKey] = tableInfo
}
return tableInfo
}

func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
schema := *msg.getSchema()
table := *msg.getTable()
tidbTableInfo := &timodel.TableInfo{}
tidbTableInfo.Name = pmodel.NewCIStr(table)

rawColumns := msg.getData()
pkNames := msg.pkNameSet()
mysqlType := msg.getMySQLType()
setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames)
setIndexes(tidbTableInfo, pkNames)
return model.WrapTableInfo(100, schema, 1000, tidbTableInfo)
}

func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) {
msg := b.msg
result := new(model.RowChangedEvent)
result.TableInfo = b.queryTableInfo(msg)
result.CommitTs = msg.getCommitTs()

mysqlType := msg.getMySQLType()
var err error
if msg.eventType() == canal.EventType_DELETE {
// for `DELETE` event, `data` contain the old data, set it as the `PreColumns`
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType)
result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet())
result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo)
return result, err
result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}
return result, nil
}

// for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns`
cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType)
result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet())
result.Columns = model.Columns2ColumnDatas(cols, result.TableInfo)
result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}

// for `UPDATE`, `old` contain old data, set it as the `PreColumns`
if msg.eventType() == canal.EventType_UPDATE {
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType)
if len(preCols) < len(cols) {
newPreCols := make([]*model.Column, 0, len(preCols))
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}
if len(preCols) < len(result.Columns) {
newPreCols := make([]*model.ColumnData, 0, len(preCols))
j := 0
// Columns are ordered by name
for _, col := range cols {
if j < len(preCols) && col.Name == preCols[j].Name {
for _, col := range result.Columns {
if j < len(preCols) && col.ColumnID == preCols[j].ColumnID {
newPreCols = append(newPreCols, preCols[j])
j += 1
} else {
Expand All @@ -192,45 +224,45 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange
}
preCols = newPreCols
}
if len(preCols) != len(cols) {
log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols))
}
result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo)
if err != nil {
return nil, err
result.PreColumns = preCols
if len(preCols) != len(result.Columns) {
log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns))
}
}

return result, nil
}

func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*model.Column, error) {
result := make([]*model.Column, 0, len(cols))
for name, value := range cols {
func canalJSONColumnMap2RowChangeColumns(
cols map[string]interface{},
mysqlType map[string]string,
tableInfo *model.TableInfo,
) ([]*model.ColumnData, error) {
result := make([]*model.ColumnData, 0, len(cols))
for _, columnInfo := range tableInfo.Columns {
name := columnInfo.Name.O
value, ok := cols[name]
if !ok {
continue
}
mysqlTypeStr, ok := mysqlType[name]
if !ok {
// this should not happen, else we have to check encoding for mysqlType.
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack(
"mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType)
}
col := canalJSONFormatColumn(value, name, mysqlTypeStr)
col := canalJSONFormatColumn(columnInfo.ID, value, mysqlTypeStr)
result = append(result, col)
}
if len(result) == 0 {
return nil, nil
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i].Name, result[j].Name) > 0
})

return result, nil
}

func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column {
func canalJSONFormatColumn(columnID int64, value interface{}, mysqlTypeStr string) *model.ColumnData {
mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr)
result := &model.Column{
Type: mysqlType,
Name: name,
Value: value,
result := &model.ColumnData{
ColumnID: columnID,
Value: value,
}
if result.Value == nil {
return result
Expand Down
Loading

0 comments on commit fa598ba

Please sign in to comment.