Skip to content

Commit

Permalink
codec(ticdc): reduce canal-json decode memory consumption by using bu…
Browse files Browse the repository at this point in the history
…ffered json decoder (#11911)

ref #11894
  • Loading branch information
3AceShowHand authored Dec 24, 2024
1 parent 4624acb commit 3f1ab7e
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 214 deletions.
7 changes: 2 additions & 5 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,8 @@ func (c *consumer) emitDMLEvents(
// Always enable tidb extension for canal-json protocol
// because we need to get the commit ts from the extension field.
c.codecCfg.EnableTiDBExtension = true
decoder, err = canal.NewBatchDecoder(ctx, c.codecCfg, nil)
if err != nil {
return errors.Trace(err)
}
err := decoder.AddKeyValue(nil, content)
decoder = canal.NewCanalJSONTxnEventDecoder(c.codecCfg)
err = decoder.AddKeyValue(nil, content)
if err != nil {
return errors.Trace(err)
}
Expand Down
115 changes: 62 additions & 53 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -45,10 +45,44 @@ type tableKey struct {
table string
}

type bufferedJSONDecoder struct {
buf *bytes.Buffer
decoder *json.Decoder
}

func newBufferedJSONDecoder() *bufferedJSONDecoder {
buf := new(bytes.Buffer)
decoder := json.NewDecoder(buf)
return &bufferedJSONDecoder{
buf: buf,
decoder: decoder,
}
}

// Write writes data to the buffer.
func (b *bufferedJSONDecoder) Write(data []byte) (n int, err error) {
return b.buf.Write(data)
}

// Decode decodes the buffer into the original message.
func (b *bufferedJSONDecoder) Decode(v interface{}) error {
return b.decoder.Decode(v)
}

// Len returns the length of the buffer.
func (b *bufferedJSONDecoder) Len() int {
return b.buf.Len()
}

// Bytes returns the buffer content.
func (b *bufferedJSONDecoder) Bytes() []byte {
return b.buf.Bytes()
}

// batchDecoder decodes the byte into the original message.
type batchDecoder struct {
data []byte
msg canalJSONMessageInterface
msg canalJSONMessageInterface
decoder *bufferedJSONDecoder

config *common.Config

Expand Down Expand Up @@ -81,8 +115,18 @@ func NewBatchDecoder(
GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided")
}

var msg canalJSONMessageInterface = &JSONMessage{}
if codecConfig.EnableTiDBExtension {
msg = &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{},
Extensions: &tidbExtension{},
}
}

return &batchDecoder{
config: codecConfig,
msg: msg,
decoder: newBufferedJSONDecoder(),
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
Expand All @@ -100,51 +144,23 @@ func (b *batchDecoder) AddKeyValue(_, value []byte) error {

return errors.Trace(err)
}
b.data = value
if _, err = b.decoder.Write(value); err != nil {
return errors.Trace(err)
}
return nil
}

// HasNext implements the RowEventDecoder interface
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
if b.data == nil {
if b.decoder.Len() == 0 {
return model.MessageTypeUnknown, false, nil
}
var (
msg canalJSONMessageInterface = &JSONMessage{}
encodedData []byte
)

if b.config.EnableTiDBExtension {
msg = &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{},
Extensions: &tidbExtension{},
}
}

if len(b.config.Terminator) > 0 {
idx := bytes.IndexAny(b.data, b.config.Terminator)
if idx >= 0 {
encodedData = b.data[:idx]
b.data = b.data[idx+len(b.config.Terminator):]
} else {
encodedData = b.data
b.data = nil
}
} else {
encodedData = b.data
b.data = nil
}

if len(encodedData) == 0 {
return model.MessageTypeUnknown, false, nil
}

if err := json.Unmarshal(encodedData, msg); err != nil {
log.Error("canal-json decoder unmarshal data failed",
zap.Error(err), zap.ByteString("data", encodedData))
if err := b.decoder.Decode(b.msg); err != nil {
log.Error("canal-json decoder decode failed",
zap.Error(err), zap.ByteString("data", b.decoder.Bytes()))
return model.MessageTypeUnknown, false, err
}
b.msg = msg
return b.msg.messageType(), true, nil
}

Expand Down Expand Up @@ -217,18 +233,15 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
table = message.Table
eventType = message.EventType
)

handleKeyData := message.getData()
pkNames := make([]string, 0, len(handleKeyData))
for name := range handleKeyData {
pkNames = append(pkNames, name)
conditions := make(map[string]interface{}, len(message.pkNameSet()))
for name := range message.pkNameSet() {
conditions[name] = message.getData()[name]
}

result := &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{
Schema: schema,
Table: table,
PKNames: pkNames,
PKNames: message.PKNames,

EventType: eventType,
},
Expand All @@ -238,30 +251,30 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
}
switch eventType {
case "INSERT":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.MySQLType = mysqlType
result.Data = []map[string]interface{}{data}
case "UPDATE":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.MySQLType = mysqlType
result.Data = []map[string]interface{}{data}

holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld())
holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions)
old, _, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.Old = []map[string]interface{}{old}
case "DELETE":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
Expand Down Expand Up @@ -343,7 +356,6 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if err != nil {
return nil, err
}
b.msg = nil
return result, nil
}

Expand All @@ -356,7 +368,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}

result := canalJSONMessage2DDLEvent(b.msg)

schema := *b.msg.getSchema()
table := *b.msg.getTable()
// if receive a table level DDL, just remove the table info to trigger create a new one.
Expand All @@ -367,7 +378,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}
delete(b.tableInfoCache, cacheKey)
}
b.msg = nil
return result, nil
}

Expand All @@ -386,6 +396,5 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) {
return 0, cerror.ErrCanalDecodeFailed.
GenWithStack("MessageTypeResolved tidb extension not found")
}
b.msg = nil
return withExtensionEvent.Extensions.WatermarkTs, nil
}
Loading

0 comments on commit 3f1ab7e

Please sign in to comment.