Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer(ticdc): canal-json decoder cache table info for each message to avoid allocate memory on each new message #11895

Merged
merged 16 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func newWriter(ctx context.Context, o *option) *writer {
zap.String("dsn", o.upstreamTiDBDSN))
}
}
decoder, err := NewDecoder(ctx, o, db)
for i := 0; i < int(o.partitionNum); i++ {
decoder, err := NewDecoder(ctx, o, db)
if err != nil {
log.Panic("cannot create the decoder", zap.Error(err))
}
Expand Down
78 changes: 73 additions & 5 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
Expand All @@ -37,6 +40,11 @@ import (
"golang.org/x/text/encoding/charmap"
)

type tableKey struct {
schema string
table string
}

// batchDecoder decodes the byte into the original message.
type batchDecoder struct {
data []byte
Expand All @@ -48,6 +56,8 @@ type batchDecoder struct {

upstreamTiDB *sql.DB
bytesDecoder *encoding.Decoder

tableInfoCache map[tableKey]*model.TableInfo
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cached item is deleted if receive table level DDL, and recreate a new table info.

}

// NewBatchDecoder return a decoder for canal-json
Expand All @@ -72,10 +82,11 @@ func NewBatchDecoder(
}

return &batchDecoder{
config: codecConfig,
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
config: codecConfig,
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
tableInfoCache: make(map[tableKey]*model.TableInfo),
}, nil
}

Expand Down Expand Up @@ -263,6 +274,52 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
return b.NextRowChangedEvent()
}

func setColumnInfos(
tableInfo *timodel.TableInfo,
rawColumns map[string]interface{},
mysqlType map[string]string,
pkNames map[string]struct{},
) {
mockColumnID := int64(100)
for name := range rawColumns {
columnInfo := new(timodel.ColumnInfo)
columnInfo.ID = mockColumnID
columnInfo.Name = pmodel.NewCIStr(name)
if utils.IsBinaryMySQLType(mysqlType[name]) {
columnInfo.AddFlag(mysql.BinaryFlag)
}
if _, isPK := pkNames[name]; isPK {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
tableInfo.Columns = append(tableInfo.Columns, columnInfo)
mockColumnID++
}
}

func setIndexes(
tableInfo *timodel.TableInfo,
pkNames map[string]struct{},
) {
indexColumns := make([]*timodel.IndexColumn, 0, len(pkNames))
for idx, col := range tableInfo.Columns {
name := col.Name.O
if _, ok := pkNames[name]; ok {
indexColumns = append(indexColumns, &timodel.IndexColumn{
Name: pmodel.NewCIStr(name),
Offset: idx,
})
}
}
indexInfo := &timodel.IndexInfo{
ID: 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is only primary key, and should 1, it's the previous behavior.

Name: pmodel.NewCIStr("primary"),
Columns: indexColumns,
Unique: true,
Primary: true,
}
tableInfo.Indices = append(tableInfo.Indices, indexInfo)
}

// NextRowChangedEvent implements the RowEventDecoder interface
// `HasNext` should be called before this.
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
Expand All @@ -282,7 +339,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}
}

result, err := canalJSONMessage2RowChange(b.msg)
result, err := b.canalJSONMessage2RowChange()
if err != nil {
return nil, err
}
Expand All @@ -299,6 +356,17 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}

result := canalJSONMessage2DDLEvent(b.msg)

schema := *b.msg.getSchema()
table := *b.msg.getTable()
// if receive a table level DDL, just remove the table info to trigger create a new one.
if schema != "" && table != "" {
cacheKey := tableKey{
schema: schema,
table: table,
}
delete(b.tableInfoCache, cacheKey)
}
b.msg = nil
return result, nil
}
Expand Down
102 changes: 67 additions & 35 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package canal

import (
"sort"
"strconv"
"strings"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -154,36 +154,68 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 {
return c.Extensions.CommitTs
}

func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChangedEvent, error) {
func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
cacheKey := tableKey{
schema: *msg.getSchema(),
table: *msg.getTable(),
}
tableInfo, ok := b.tableInfoCache[cacheKey]
if !ok {
tableInfo = newTableInfo(msg)
b.tableInfoCache[cacheKey] = tableInfo
}
return tableInfo
}

func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
schema := *msg.getSchema()
table := *msg.getTable()
tidbTableInfo := &timodel.TableInfo{}
tidbTableInfo.Name = pmodel.NewCIStr(table)

rawColumns := msg.getData()
pkNames := msg.pkNameSet()
mysqlType := msg.getMySQLType()
setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames)
setIndexes(tidbTableInfo, pkNames)
return model.WrapTableInfo(100, schema, 1000, tidbTableInfo)
}

func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) {
msg := b.msg
result := new(model.RowChangedEvent)
result.TableInfo = b.queryTableInfo(msg)
result.CommitTs = msg.getCommitTs()

mysqlType := msg.getMySQLType()
var err error
if msg.eventType() == canal.EventType_DELETE {
// for `DELETE` event, `data` contain the old data, set it as the `PreColumns`
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType)
result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet())
result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo)
return result, err
result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}
return result, nil
}

// for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns`
cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType)
result.TableInfo = model.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet())
result.Columns = model.Columns2ColumnDatas(cols, result.TableInfo)
result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}

// for `UPDATE`, `old` contain old data, set it as the `PreColumns`
if msg.eventType() == canal.EventType_UPDATE {
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType)
if len(preCols) < len(cols) {
newPreCols := make([]*model.Column, 0, len(preCols))
preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType, result.TableInfo)
if err != nil {
return nil, err
}
if len(preCols) < len(result.Columns) {
newPreCols := make([]*model.ColumnData, 0, len(preCols))
j := 0
// Columns are ordered by name
for _, col := range cols {
if j < len(preCols) && col.Name == preCols[j].Name {
for _, col := range result.Columns {
if j < len(preCols) && col.ColumnID == preCols[j].ColumnID {
newPreCols = append(newPreCols, preCols[j])
j += 1
} else {
Expand All @@ -192,45 +224,45 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange
}
preCols = newPreCols
}
if len(preCols) != len(cols) {
log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols))
}
result.PreColumns = model.Columns2ColumnDatas(preCols, result.TableInfo)
if err != nil {
return nil, err
result.PreColumns = preCols
if len(preCols) != len(result.Columns) {
log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns))
}
}

return result, nil
}

func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*model.Column, error) {
result := make([]*model.Column, 0, len(cols))
for name, value := range cols {
func canalJSONColumnMap2RowChangeColumns(
cols map[string]interface{},
mysqlType map[string]string,
tableInfo *model.TableInfo,
) ([]*model.ColumnData, error) {
result := make([]*model.ColumnData, 0, len(cols))
for _, columnInfo := range tableInfo.Columns {
name := columnInfo.Name.O
value, ok := cols[name]
if !ok {
continue
}
mysqlTypeStr, ok := mysqlType[name]
if !ok {
// this should not happen, else we have to check encoding for mysqlType.
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack(
"mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType)
}
col := canalJSONFormatColumn(value, name, mysqlTypeStr)
col := canalJSONFormatColumn(columnInfo.ID, value, mysqlTypeStr)
result = append(result, col)
}
if len(result) == 0 {
return nil, nil
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i].Name, result[j].Name) > 0
})

return result, nil
}

func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column {
func canalJSONFormatColumn(columnID int64, value interface{}, mysqlTypeStr string) *model.ColumnData {
mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr)
result := &model.Column{
Type: mysqlType,
Name: name,
Value: value,
result := &model.ColumnData{
ColumnID: columnID,
Value: value,
}
if result.Value == nil {
return result
Expand Down
Loading
Loading