Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encoder (CDC) shrink the bytes.Buffer to reduce memory usage #10417

Merged
merged 8 commits into from
Jan 5, 2024
6 changes: 3 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ func IsColCDCVisible(col *model.ColumnInfo) bool {
return true
}

// ExistTableUniqueColumn returns whether the table has a unique column
func (ti *TableInfo) ExistTableUniqueColumn() bool {
// HasUniqueColumn returns whether the table has a unique column
func (ti *TableInfo) HasUniqueColumn() bool {
return ti.hasUniqueColumn
}

Expand All @@ -299,7 +299,7 @@ func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
if ti.IsView() {
return true
}
return ti.ExistTableUniqueColumn()
return ti.HasUniqueColumn()
}

// IsIndexUnique returns whether the index is unique
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestTableInfoGetterFuncs(t *testing.T) {
require.Equal(t, 3, len(fts))
require.Equal(t, 3, len(colInfos))

require.True(t, info.ExistTableUniqueColumn())
require.True(t, info.HasUniqueColumn())

// check IsEligible
require.True(t, info.IsEligible(false))
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/codec/canal/canal_json_txn_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func (j *JSONTxnEventEncoder) Build() []*common.Message {
j.valueBuf.Bytes(), j.txnCommitTs, model.MessageTypeRow, j.txnSchema, j.txnTable)
ret.SetRowsCount(j.batchSize)
ret.Callback = j.callback
j.valueBuf.Reset()
if j.valueBuf.Cap() > codec.MemBufShrinkThreshold {
j.valueBuf = &bytes.Buffer{}
} else {
j.valueBuf.Reset()
}
j.callback = nil
j.batchSize = 0
j.txnCommitTs = 0
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil)
ret.SetRowsCount(b.batchSize)
ret.Callback = b.callback
b.valueBuf.Reset()
if b.valueBuf.Cap() > codec.MemBufShrinkThreshold {
b.valueBuf = &bytes.Buffer{}
} else {
b.valueBuf.Reset()
}
b.callback = nil
b.batchSize = 0

Expand Down
3 changes: 3 additions & 0 deletions pkg/sink/codec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1

// MemBufShrinkThreshold represents the threshold of shrinking the buffer.
MemBufShrinkThreshold = 1024 * 1024
)

// DDLEventBatchEncoder is an abstraction for DDL event encoder.
Expand Down
Loading