Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): reduce canal-json decode memory consumption by using buffered json decoder #11911

Merged
merged 15 commits into from
Dec 24, 2024
97 changes: 55 additions & 42 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -45,10 +45,45 @@ type tableKey struct {
table string
}

type bufferedJSONDecoder struct {
buf *bytes.Buffer
decoder *json.Decoder
}

func newBufferedJSONDecoder() *bufferedJSONDecoder {
buf := new(bytes.Buffer)
decoder := json.NewDecoder(buf)
return &bufferedJSONDecoder{
buf: buf,
decoder: decoder,
}
}

// Write writes data to the buffer.
func (b *bufferedJSONDecoder) Write(data []byte) (n int, err error) {
return b.buf.Write(data)
}

// Decode decodes the buffer into the original message.
func (b *bufferedJSONDecoder) Decode(v interface{}) error {
return b.decoder.Decode(v)
}

// Len returns the length of the buffer.
func (b *bufferedJSONDecoder) Len() int {
return b.buf.Len()
}

// Bytes returns the buffer content.
func (b *bufferedJSONDecoder) Bytes() []byte {
return b.buf.Bytes()
}

// batchDecoder decodes the byte into the original message.
type batchDecoder struct {
data []byte
msg canalJSONMessageInterface
// data []byte
msg canalJSONMessageInterface
decoder *bufferedJSONDecoder

config *common.Config

Expand Down Expand Up @@ -81,8 +116,18 @@ func NewBatchDecoder(
GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided")
}

var msg canalJSONMessageInterface = &JSONMessage{}
if codecConfig.EnableTiDBExtension {
msg = &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{},
Extensions: &tidbExtension{},
}
}

return &batchDecoder{
config: codecConfig,
msg: msg,
decoder: newBufferedJSONDecoder(),
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
Expand All @@ -100,51 +145,23 @@ func (b *batchDecoder) AddKeyValue(_, value []byte) error {

return errors.Trace(err)
}
b.data = value
if _, err = b.decoder.Write(value); err != nil {
return errors.Trace(err)
}
return nil
}

// HasNext implements the RowEventDecoder interface
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
if b.data == nil {
return model.MessageTypeUnknown, false, nil
}
var (
msg canalJSONMessageInterface = &JSONMessage{}
encodedData []byte
)

if b.config.EnableTiDBExtension {
msg = &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{},
Extensions: &tidbExtension{},
}
}

if len(b.config.Terminator) > 0 {
idx := bytes.IndexAny(b.data, b.config.Terminator)
if idx >= 0 {
encodedData = b.data[:idx]
b.data = b.data[idx+len(b.config.Terminator):]
} else {
encodedData = b.data
b.data = nil
}
} else {
encodedData = b.data
b.data = nil
}

if len(encodedData) == 0 {
if b.decoder.Len() == 0 {
return model.MessageTypeUnknown, false, nil
}

if err := json.Unmarshal(encodedData, msg); err != nil {
log.Error("canal-json decoder unmarshal data failed",
zap.Error(err), zap.ByteString("data", encodedData))
if err := b.decoder.Decode(b.msg); err != nil {
log.Error("canal-json decoder decode failed",
zap.Error(err), zap.ByteString("data", b.decoder.Bytes()))
return model.MessageTypeUnknown, false, err
}
b.msg = msg
return b.msg.messageType(), true, nil
}

Expand Down Expand Up @@ -343,7 +360,6 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if err != nil {
return nil, err
}
b.msg = nil
return result, nil
}

Expand All @@ -356,7 +372,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}

result := canalJSONMessage2DDLEvent(b.msg)

schema := *b.msg.getSchema()
table := *b.msg.getTable()
// if receive a table level DDL, just remove the table info to trigger create a new one.
Expand All @@ -367,7 +382,6 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
}
delete(b.tableInfoCache, cacheKey)
}
b.msg = nil
return result, nil
}

Expand All @@ -386,6 +400,5 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) {
return 0, cerror.ErrCanalDecodeFailed.
GenWithStack("MessageTypeResolved tidb extension not found")
}
b.msg = nil
return withExtensionEvent.Extensions.WatermarkTs, nil
}
156 changes: 0 additions & 156 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,159 +764,3 @@ func TestE2EPartitionTable(t *testing.T) {
require.Equal(t, decodedEvent.GetTableID(), int64(0))
}
}

func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
_, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig())
ctx := context.Background()

for _, encodeEnable := range []bool{false, true} {
encodeConfig := common.NewConfig(config.ProtocolCanalJSON)
encodeConfig.EnableTiDBExtension = encodeEnable
encodeConfig.Terminator = config.CRLF

builder, err := NewJSONRowEventEncoderBuilder(ctx, encodeConfig)
require.NoError(t, err)
encoder := builder.Build()

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)

messages := encoder.Build()
require.Equal(t, 1, len(messages))
msg := messages[0]

for _, decodeEnable := range []bool{false, true} {
decodeConfig := common.NewConfig(config.ProtocolCanalJSON)
decodeConfig.EnableTiDBExtension = decodeEnable
decoder, err := NewBatchDecoder(ctx, decodeConfig, nil)
require.NoError(t, err)
err = decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, ty)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)

if encodeEnable && decodeEnable {
require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs)
}
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName())

decodedColumns := make(map[string]*model.ColumnData, len(decodedEvent.Columns))
for _, column := range decodedEvent.Columns {
colName := decodedEvent.TableInfo.ForceGetColumnName(column.ColumnID)
decodedColumns[colName] = column
}
for _, col := range insertEvent.Columns {
colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID)
decoded, ok := decodedColumns[colName]
require.True(t, ok)
switch v := col.Value.(type) {
case types.VectorFloat32:
require.EqualValues(t, v.String(), decoded.Value)
default:
require.EqualValues(t, v, decoded.Value)
}
}

_, hasNext, _ = decoder.HasNext()
require.False(t, hasNext)

decodedEvent, err = decoder.NextRowChangedEvent()
require.Error(t, err)
require.Nil(t, decodedEvent)
}
}
}

func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))`
ddlEvent := helper.DDL2Event(sql)

ctx := context.Background()
for _, encodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = encodeEnable

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

result, err := encoder.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)
require.NotNil(t, result)

for _, decodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = decodeEnable
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
err = decoder.AddKeyValue(nil, result.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, ty)

consumed, err := decoder.NextDDLEvent()
require.Nil(t, err)

if encodeEnable && decodeEnable {
require.Equal(t, ddlEvent.CommitTs, consumed.CommitTs)
} else {
require.Equal(t, uint64(0), consumed.CommitTs)
}

require.Equal(t, ddlEvent.TableInfo.TableName.Schema, consumed.TableInfo.TableName.Schema)
require.Equal(t, ddlEvent.TableInfo.TableName.Table, consumed.TableInfo.TableName.Table)
require.Equal(t, ddlEvent.Query, consumed.Query)

ty, hasNext, err = decoder.HasNext()
require.Nil(t, err)
require.False(t, hasNext)
require.Equal(t, model.MessageTypeUnknown, ty)

consumed, err = decoder.NextDDLEvent()
require.NotNil(t, err)
require.Nil(t, consumed)
}
}
}

func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) {
encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}`
ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.Terminator = "\n"
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = decoder.AddKeyValue(nil, []byte(encodedValue))
require.NoError(t, err)

cnt := 0
for {
tp, hasNext, err := decoder.HasNext()
if !hasNext {
break
}
require.NoError(t, err)
require.Equal(t, model.MessageTypeRow, tp)
cnt++
event, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotNil(t, event)
}
require.Equal(t, 3, cnt)
}
Loading
Loading