From ceed123a50b719599c2b9b754b14c78bc68e803b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Dec 2024 17:40:33 +0800 Subject: [PATCH 1/4] make the physical table id private --- cdc/entry/mounter.go | 21 +- cdc/model/sink.go | 13 +- cdc/model/sink_test.go | 2 +- .../sinkmanager/table_sink_worker_test.go | 17 +- cdc/redo/manager_test.go | 66 ++-- cdc/redo/writer/file/file_log_writer_test.go | 15 +- cdc/redo/writer/memory/mem_log_writer_test.go | 25 +- .../dmlsink/cloudstorage/defragmenter_test.go | 4 +- .../dmlsink/cloudstorage/dml_worker_test.go | 19 +- .../cloudstorage/encoding_worker_test.go | 54 ++-- cdc/sink/dmlsink/txn/event_test.go | 153 ++++------ cdc/sink/dmlsink/txn/mysql/mysql_test.go | 284 +++++++++--------- cmd/kafka-consumer/writer.go | 12 +- cmd/storage-consumer/main.go | 2 +- pkg/sink/codec/bootstraper_test.go | 16 +- pkg/sink/codec/builder/codec_test.go | 4 +- pkg/sink/codec/craft/craft_decoder.go | 2 +- .../codec/open/open_protocol_encoder_test.go | 2 +- pkg/sink/codec/open/open_protocol_message.go | 10 +- pkg/sink/codec/simple/encoder_test.go | 2 +- pkg/sink/codec/simple/message.go | 10 +- 21 files changed, 355 insertions(+), 378 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index d7ed71229a5..5fc0774b4f5 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -792,20 +792,21 @@ func (m *mounter) mountRowKVEntry( } } - return &model.RowChangedEvent{ - StartTs: row.StartTs, - CommitTs: row.CRTs, - RowID: intRowID, - HandleKey: row.RecordID, - PhysicalTableID: row.PhysicalTableID, - TableInfo: tableInfo, - Columns: cols, - PreColumns: preCols, + event := &model.RowChangedEvent{ + StartTs: row.StartTs, + CommitTs: row.CRTs, + RowID: intRowID, + HandleKey: row.RecordID, + TableInfo: tableInfo, + Columns: cols, + PreColumns: preCols, Checksum: checksum, ApproximateDataSize: dataSize, - }, rawRow, nil + } + event.SetTableID(row.PhysicalTableID) + return event, rawRow, nil } var emptyBytes = make([]byte, 0) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index cc33d2b1008..b0efbb10dbf 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -329,7 +329,7 @@ type RowChangedEvent struct { RowID int64 // Deprecated. It is empty when the RowID comes from clustered index table. - PhysicalTableID int64 + physicalTableID int64 // NOTICE: We probably store the logical ID inside TableInfo's TableName, // not the physical ID. @@ -399,7 +399,7 @@ func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent { row := &RowChangedEvent{ StartTs: r.StartTs, CommitTs: r.CommitTs, - PhysicalTableID: r.Table.TableID, + physicalTableID: r.Table.TableID, TableInfo: tableInfo, Columns: Columns2ColumnDatas(r.Columns, tableInfo), PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo), @@ -436,7 +436,12 @@ func (e txnRows) Swap(i, j int) { // GetTableID returns the table ID of the event. func (r *RowChangedEvent) GetTableID() int64 { - return r.PhysicalTableID + return r.physicalTableID +} + +// SetTableID set the row event physical table id +func (r *RowChangedEvent) SetTableID(tableID int64) { + r.physicalTableID = tableID } // GetCommitTs returns the commit timestamp of this event. @@ -701,7 +706,7 @@ func NewIncrementalColumnIDAllocator() *IncrementalColumnIDAllocator { } // GetColumnID return the next mock column id -func (d *IncrementalColumnIDAllocator) GetColumnID(name string) int64 { +func (d *IncrementalColumnIDAllocator) GetColumnID(_ string) int64 { result := d.nextColID d.nextColID += 1 return result diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 49761869dbc..3086987d0a9 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -650,7 +650,7 @@ func TestToRedoLog(t *testing.T) { event := &RowChangedEvent{ StartTs: 100, CommitTs: 1000, - PhysicalTableID: 1, + physicalTableID: 1, TableInfo: tableInfo, Columns: Columns2ColumnDatas([]*Column{ { diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index f1dcfaed257..a99e095c484 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -86,14 +86,15 @@ func genRowChangedEvent(startTs, commitTs uint64, span tablepb.Span) *model.RowC {Name: "a", Value: 1}, } tableInfo := model.BuildTableInfo("table", "table", columns, nil) - return &model.RowChangedEvent{ - StartTs: startTs, - CommitTs: commitTs, - PhysicalTableID: span.TableID, - TableInfo: tableInfo, - Columns: model.Columns2ColumnDatas(columns, tableInfo), - PreColumns: model.Columns2ColumnDatas(preColumns, tableInfo), - } + result := &model.RowChangedEvent{ + StartTs: startTs, + CommitTs: commitTs, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas(columns, tableInfo), + PreColumns: model.Columns2ColumnDatas(preColumns, tableInfo), + } + result.SetTableID(span.TableID) + return result } type tableSinkWorkerSuite struct { diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index d1302da01c8..de05db92e39 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -149,37 +149,58 @@ func TestLogManagerInProcessor(t *testing.T) { tableInfo := &model.TableInfo{ TableName: model.TableName{Schema: "test", Table: "t"}, } + span1 := spanz.TableIDToComparableSpan(53) + event1 := &model.RowChangedEvent{CommitTs: 120, TableInfo: tableInfo} + event1.SetTableID(span1.TableID) + event2 := &model.RowChangedEvent{CommitTs: 125, TableInfo: tableInfo} + event2.SetTableID(span1.TableID) + event3 := &model.RowChangedEvent{CommitTs: 130, TableInfo: tableInfo} + event3.SetTableID(span1.TableID) + + span2 := spanz.TableIDToComparableSpan(55) + event4 := &model.RowChangedEvent{CommitTs: 130, TableInfo: tableInfo} + event4.SetTableID(span2.TableID) + event5 := &model.RowChangedEvent{CommitTs: 135, TableInfo: tableInfo} + event5.SetTableID(span2.TableID) + + span3 := spanz.TableIDToComparableSpan(57) + event6 := &model.RowChangedEvent{CommitTs: 130, TableInfo: tableInfo} + event6.SetTableID(span3.TableID) + + span4 := spanz.TableIDToComparableSpan(59) + event7 := &model.RowChangedEvent{CommitTs: 128, TableInfo: tableInfo} + event7.SetTableID(span4.TableID) + event8 := &model.RowChangedEvent{CommitTs: 130, TableInfo: tableInfo} + event8.SetTableID(span4.TableID) + event9 := &model.RowChangedEvent{CommitTs: 133, TableInfo: tableInfo} + event9.SetTableID(span4.TableID) + testCases := []struct { span tablepb.Span rows []*model.RowChangedEvent }{ { - span: spanz.TableIDToComparableSpan(53), + span: span1, rows: []*model.RowChangedEvent{ - {CommitTs: 120, PhysicalTableID: 53, TableInfo: tableInfo}, - {CommitTs: 125, PhysicalTableID: 53, TableInfo: tableInfo}, - {CommitTs: 130, PhysicalTableID: 53, TableInfo: tableInfo}, + event1, event2, event3, }, }, { - span: spanz.TableIDToComparableSpan(55), + span: span2, rows: []*model.RowChangedEvent{ - {CommitTs: 130, PhysicalTableID: 55, TableInfo: tableInfo}, - {CommitTs: 135, PhysicalTableID: 55, TableInfo: tableInfo}, + event4, event5, }, }, { - span: spanz.TableIDToComparableSpan(57), + span: span3, rows: []*model.RowChangedEvent{ - {CommitTs: 130, PhysicalTableID: 57, TableInfo: tableInfo}, + event6, }, }, { - span: spanz.TableIDToComparableSpan(59), + span: span4, rows: []*model.RowChangedEvent{ - {CommitTs: 128, PhysicalTableID: 59, TableInfo: tableInfo}, - {CommitTs: 130, PhysicalTableID: 59, TableInfo: tableInfo}, - {CommitTs: 133, PhysicalTableID: 59, TableInfo: tableInfo}, + event7, event8, event9, }, }, } @@ -298,16 +319,21 @@ func TestLogManagerError(t *testing.T) { tableInfo := &model.TableInfo{ TableName: model.TableName{Schema: "test", Table: "t"}, } + span := spanz.TableIDToComparableSpan(53) + event1 := &model.RowChangedEvent{CommitTs: 120, TableInfo: tableInfo} + event1.SetTableID(span.TableID) + event2 := &model.RowChangedEvent{CommitTs: 125, TableInfo: tableInfo} + event2.SetTableID(span.TableID) + event3 := &model.RowChangedEvent{CommitTs: 130, TableInfo: tableInfo} + event3.SetTableID(span.TableID) testCases := []struct { span tablepb.Span rows []writer.RedoEvent }{ { - span: spanz.TableIDToComparableSpan(53), + span: span, rows: []writer.RedoEvent{ - &model.RowChangedEvent{CommitTs: 120, PhysicalTableID: 53, TableInfo: tableInfo}, - &model.RowChangedEvent{CommitTs: 125, PhysicalTableID: 53, TableInfo: tableInfo}, - &model.RowChangedEvent{CommitTs: 130, PhysicalTableID: 53, TableInfo: tableInfo}, + event1, event2, event3, }, }, } @@ -385,10 +411,10 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { // prepare new row change events b.StopTimer() *maxCommitTs += rand.Uint64() % 10 + event := &model.RowChangedEvent{CommitTs: *maxCommitTs, TableInfo: tableInfo} + event.SetTableID(span.TableID) rows = []*model.RowChangedEvent{ - {CommitTs: *maxCommitTs, PhysicalTableID: span.TableID, TableInfo: tableInfo}, - {CommitTs: *maxCommitTs, PhysicalTableID: span.TableID, TableInfo: tableInfo}, - {CommitTs: *maxCommitTs, PhysicalTableID: span.TableID, TableInfo: tableInfo}, + event, event, event, } b.StartTimer() diff --git a/cdc/redo/writer/file/file_log_writer_test.go b/cdc/redo/writer/file/file_log_writer_test.go index b007be30917..625fc402d79 100644 --- a/cdc/redo/writer/file/file_log_writer_test.go +++ b/cdc/redo/writer/file/file_log_writer_test.go @@ -39,6 +39,8 @@ func TestLogWriterWriteLog(t *testing.T) { Table: "t", }, } + event := &model.RowChangedEvent{CommitTs: 1, TableInfo: tableInfo} + event.SetTableID(111) tests := []struct { name string args arg @@ -50,10 +52,8 @@ func TestLogWriterWriteLog(t *testing.T) { { name: "happy", args: arg{ - ctx: context.Background(), - rows: []writer.RedoEvent{ - &model.RowChangedEvent{CommitTs: 1, PhysicalTableID: 111, TableInfo: tableInfo}, - }, + ctx: context.Background(), + rows: []writer.RedoEvent{event}, }, isRunning: true, writerErr: nil, @@ -61,11 +61,8 @@ func TestLogWriterWriteLog(t *testing.T) { { name: "writer err", args: arg{ - ctx: context.Background(), - rows: []writer.RedoEvent{ - nil, - &model.RowChangedEvent{CommitTs: 1, PhysicalTableID: 11, TableInfo: tableInfo}, - }, + ctx: context.Background(), + rows: []writer.RedoEvent{nil, event}, }, writerErr: errors.New("err"), wantErr: errors.New("err"), diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 12f34ce11e8..b7d39d6162c 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -31,23 +31,16 @@ import ( func TestWriteDDL(t *testing.T) { t.Parallel() + event1 := &model.RowChangedEvent{CommitTs: 11, TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t1"}}} + event1.SetTableID(11) + + event2 := &model.RowChangedEvent{CommitTs: 15, TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t2"}}} + event2.SetTableID(12) + + event3 := &model.RowChangedEvent{CommitTs: 8, TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t3"}}} + event3.SetTableID(12) rows := []writer.RedoEvent{ - nil, - &model.RowChangedEvent{ - PhysicalTableID: 11, - CommitTs: 11, - TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t1"}}, - }, - &model.RowChangedEvent{ - PhysicalTableID: 12, - CommitTs: 15, - TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t2"}}, - }, - &model.RowChangedEvent{ - PhysicalTableID: 12, - CommitTs: 8, - TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "test", Table: "t2"}}, - }, + nil, event1, event2, event3, } testWriteEvents(t, rows) } diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go index b6ea7da97be..d66c5df108c 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go @@ -99,13 +99,13 @@ func TestDeframenter(t *testing.T) { n := 1 + rand.Intn(1000) for j := 0; j < n; j++ { row := &model.RowChangedEvent{ - PhysicalTableID: 100, - TableInfo: tableInfo, + TableInfo: tableInfo, Columns: []*model.ColumnData{ {ColumnID: 1, Value: j + 1}, {ColumnID: 2, Value: "hello world"}, }, } + row.SetTableID(100) frag.event.Event.Rows = append(frag.event.Event.Rows, row) } err := encoder.AppendTxnEvent(frag.event.Event, nil) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index 879581f54f0..154aacc9da3 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -83,6 +83,14 @@ func TestDMLWorkerRun(t *testing.T) { } tableInfo := model.WrapTableInfo(100, "test", 99, tidbTableInfo) for i := 0; i < 5; i++ { + row := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: []*model.ColumnData{ + {ColumnID: 1, Value: 100}, + {ColumnID: 2, Value: "hello world"}, + }, + } + row.SetTableID(100) frag := eventFragment{ seqNumber: uint64(i), versionedTable: cloudstorage.VersionedTableName{ @@ -92,16 +100,7 @@ func TestDMLWorkerRun(t *testing.T) { event: &dmlsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{ TableInfo: tableInfo, - Rows: []*model.RowChangedEvent{ - { - PhysicalTableID: 100, - TableInfo: tableInfo, - Columns: []*model.ColumnData{ - {ColumnID: 1, Value: 100}, - {ColumnID: 2, Value: "hello world"}, - }, - }, - }, + Rows: []*model.RowChangedEvent{row}, }, }, encodedMsgs: []*common.Message{ diff --git a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go index 84e1aa688f8..a70907651d6 100644 --- a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go @@ -76,6 +76,24 @@ func TestEncodeEvents(t *testing.T) { } tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo) + event1 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: []*model.ColumnData{ + {ColumnID: 1, Value: 100}, + {ColumnID: 2, Value: "hello world"}, + }, + } + event1.SetTableID(100) + + event2 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: []*model.ColumnData{ + {ColumnID: 1, Value: 200}, + {ColumnID: 2, Value: "你好,世界"}, + }, + } + event2.SetTableID(100) + err := encodingWorker.encodeEvents(eventFragment{ versionedTable: cloudstorage.VersionedTableName{ TableNameWithPhysicTableID: model.TableName{ @@ -89,22 +107,7 @@ func TestEncodeEvents(t *testing.T) { Event: &model.SingleTableTxn{ TableInfo: tableInfo, Rows: []*model.RowChangedEvent{ - { - PhysicalTableID: 100, - TableInfo: tableInfo, - Columns: []*model.ColumnData{ - {ColumnID: 1, Value: 100}, - {ColumnID: 2, Value: "hello world"}, - }, - }, - { - PhysicalTableID: 100, - TableInfo: tableInfo, - Columns: []*model.ColumnData{ - {ColumnID: 1, Value: 200}, - {ColumnID: 2, Value: "你好,世界"}, - }, - }, + event1, event2, }, }, }, @@ -140,19 +143,18 @@ func TestEncodingWorkerRun(t *testing.T) { }, } tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo) - event := &model.SingleTableTxn{ + row := &model.RowChangedEvent{ TableInfo: tableInfo, - Rows: []*model.RowChangedEvent{ - { - PhysicalTableID: 100, - TableInfo: tableInfo, - Columns: []*model.ColumnData{ - {ColumnID: 1, Value: 100}, - {ColumnID: 2, Value: "hello world"}, - }, - }, + Columns: []*model.ColumnData{ + {ColumnID: 1, Value: 100}, + {ColumnID: 2, Value: "hello world"}, }, } + row.SetTableID(100) + event := &model.SingleTableTxn{ + TableInfo: tableInfo, + Rows: []*model.RowChangedEvent{row}, + } for i := 0; i < 3; i++ { frag := eventFragment{ diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 29d27e13759..34fd2ce5319 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -75,6 +75,57 @@ func TestGenKeys(t *testing.T) { }, }, [][]int{{0}, {1}}) + event1 := &model.RowChangedEvent{ + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + TableInfo: tableInfoWithOneCompositeUniqueKey, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a1", + Value: 12, + }, + { + Name: "a3", + Value: 1, + }, + }, tableInfoWithOneCompositeUniqueKey), + } + event1.SetTableID(47) + + event2 := &model.RowChangedEvent{ + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + TableInfo: tableInfoWithOneCompositeUniqueKey, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a1", + Value: 1, + }, + { + Name: "a3", + Value: 21, + }, + }, tableInfoWithOneCompositeUniqueKey), + } + event2.SetTableID(47) + + event3 := &model.RowChangedEvent{ + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + TableInfo: tableInfoWithTwoUniqueKeys, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a1", + Value: nil, + }, + { + Name: "a3", + Value: nil, + }, + }, tableInfoWithTwoUniqueKeys), + } + event3.SetTableID(47) + testCases := []struct { txn *model.SingleTableTxn expected []uint64 @@ -83,113 +134,17 @@ func TestGenKeys(t *testing.T) { expected: nil, }, { txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - PhysicalTableID: 47, - TableInfo: tableInfoWithOneCompositeUniqueKey, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: 12, - }, - { - Name: "a3", - Value: 1, - }, - }, tableInfoWithOneCompositeUniqueKey), - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - PhysicalTableID: 47, - TableInfo: tableInfoWithOneCompositeUniqueKey, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: 1, - }, - { - Name: "a3", - Value: 21, - }, - }, tableInfoWithOneCompositeUniqueKey), - }, - }, + Rows: []*model.RowChangedEvent{event1, event2}, }, expected: []uint64{2072713494, 3710968706}, }, { txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - PhysicalTableID: 47, - TableInfo: tableInfoWithTwoUniqueKeys, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: 12, - }, - { - Name: "a3", - Value: 1, - }, - }, tableInfoWithTwoUniqueKeys), - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - TableInfo: tableInfoWithTwoUniqueKeys, - PhysicalTableID: 47, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: 1, - }, - { - Name: "a3", - Value: 21, - }, - }, tableInfoWithTwoUniqueKeys), - }, - }, + Rows: []*model.RowChangedEvent{event1, event2}, }, expected: []uint64{318190470, 2109733718, 2658640457, 2989258527}, }, { txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - PhysicalTableID: 47, - TableInfo: tableInfoWithTwoUniqueKeys, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: nil, - }, - { - Name: "a3", - Value: nil, - }, - }, tableInfoWithTwoUniqueKeys), - }, { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - TableInfo: tableInfoWithTwoUniqueKeys, - PhysicalTableID: 47, - PreColumns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a1", - Value: 1, - }, - { - Name: "a3", - Value: 21, - }, - }, tableInfoWithTwoUniqueKeys), - }, - }, + Rows: []*model.RowChangedEvent{event3, event2}, }, expected: []uint64{318190470, 2095136920, 2658640457}, }} diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 61e120ac51f..177c4e0f9e6 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -361,40 +361,40 @@ func TestNewMySQLBackendExecDML(t *testing.T) { Flag: 0, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - StartTs: 1, - CommitTs: 2, - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - { - Name: "b", - Value: "test", - }, - }, tableInfo), - }, - { - StartTs: 5, - CommitTs: 6, - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 2, - }, - { - Name: "b", - Value: "test", - }, - }, tableInfo), - }, + event1 := &model.RowChangedEvent{ + StartTs: 1, + CommitTs: 2, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + { + Name: "b", + Value: "test", + }, + }, tableInfo), } + event1.SetTableID(1) + + event2 := &model.RowChangedEvent{ + StartTs: 5, + CommitTs: 6, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 2, + }, + { + Name: "b", + Value: "test", + }, + }, tableInfo), + } + event2.SetTableID(1) + rows := []*model.RowChangedEvent{event1, event2} var flushedTs uint64 = 0 _ = sink.OnTxnEvent(&dmlsink.TxnCallbackableEvent{ @@ -423,28 +423,28 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - }, tableInfo), - }, - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 2, - }, - }, tableInfo), - }, + event1 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + }, tableInfo), + } + event1.SetTableID(1) + + event2 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 2, + }, + }, tableInfo), } + event2.SetTableID(1) + rows := []*model.RowChangedEvent{event1, event2} errDatabaseNotExists := &dmysql.MySQLError{ Number: uint16(infoschema.ErrDatabaseNotExists.Code()), @@ -490,28 +490,28 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { Value: 1, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - }, tableInfo), - }, - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 2, - }, - }, tableInfo), - }, + event1 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + }, tableInfo), + } + event1.SetTableID(1) + + event2 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 2, + }, + }, tableInfo), } + event2.SetTableID(1) + rows := []*model.RowChangedEvent{event1, event2} errTableNotExists := &dmysql.MySQLError{ Number: uint16(infoschema.ErrTableNotExists.Code()), @@ -557,28 +557,27 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { Value: 1, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - }, tableInfo), - }, - { - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 2, - }, - }, tableInfo), - }, + event1 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + }, tableInfo), + } + event1.SetTableID(1) + event2 := &model.RowChangedEvent{ + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 2, + }, + }, tableInfo), } + event2.SetTableID(1) + rows := []*model.RowChangedEvent{event1, event2} errLockDeadlock := &dmysql.MySQLError{ Number: mysql.ErrLockDeadlock, @@ -627,21 +626,20 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - StartTs: 2, - CommitTs: 3, - ReplicatingTs: 1, - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - }, tableInfo), - }, + event := &model.RowChangedEvent{ + StartTs: 2, + CommitTs: 3, + ReplicatingTs: 1, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + }, tableInfo), } + event.SetTableID(1) + rows := []*model.RowChangedEvent{event} dbConnFactory := pmysql.NewDBConnectionFactoryForTest() dbConnFactory.SetStandardConnectionFactory(func(ctx context.Context, dsnStr string) (*sql.DB, error) { @@ -812,40 +810,40 @@ func TestMySQLSinkExecDMLError(t *testing.T) { Flag: 0, }, }, [][]int{{0}}) - rows := []*model.RowChangedEvent{ - { - StartTs: 1, - CommitTs: 2, - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 1, - }, - { - Name: "b", - Value: "test", - }, - }, tableInfo), - }, - { - StartTs: 2, - CommitTs: 3, - TableInfo: tableInfo, - PhysicalTableID: 1, - Columns: model.Columns2ColumnDatas([]*model.Column{ - { - Name: "a", - Value: 2, - }, - { - Name: "b", - Value: "test", - }, - }, tableInfo), - }, + event1 := &model.RowChangedEvent{ + StartTs: 1, + CommitTs: 2, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, + { + Name: "b", + Value: "test", + }, + }, tableInfo), + } + event1.SetTableID(1) + + event2 := &model.RowChangedEvent{ + StartTs: 2, + CommitTs: 3, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 2, + }, + { + Name: "b", + Value: "test", + }, + }, tableInfo), } + event2.SetTableID(1) + rows := []*model.RowChangedEvent{event1, event2} _ = sink.OnTxnEvent(&dmlsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{Rows: rows}, diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index a277348605e..028b7322635 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -325,15 +325,16 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool if simple, ok := decoder.(*simple.Decoder); ok { cachedEvents := simple.GetCachedEvents() for _, row := range cachedEvents { - row.TableInfo.TableName.TableID = row.PhysicalTableID + tableID := row.GetTableID() + row.TableInfo.TableName.TableID = tableID w.checkPartition(row, partition, message) if w.checkOldMessage(progress, row.CommitTs, row, partition, message) { continue } - group, ok := eventGroup[row.PhysicalTableID] + group, ok := eventGroup[tableID] if !ok { group = NewEventsGroup() - eventGroup[row.PhysicalTableID] = group + eventGroup[tableID] = group } group.Append(row) } @@ -363,11 +364,11 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool continue } - tableID := row.PhysicalTableID + tableID := row.GetTableID() // simple protocol decoder should have set the table id already. if w.option.protocol != config.ProtocolSimple { tableID = w.fakeTableIDGenerator. - generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), row.PhysicalTableID) + generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID) row.TableInfo.TableName.TableID = tableID } @@ -387,7 +388,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), zap.Uint64("commitTs", row.CommitTs), - zap.Int64("physicalTableID", row.PhysicalTableID), zap.Int64("tableID", tableID), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName())) diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 19af984164c..40b320bd917 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -368,7 +368,7 @@ func (c *consumer) emitDMLEvents( ) continue } - row.PhysicalTableID = tableID + row.SetTableID(tableID) c.tableSinkMap[tableID].AppendRowChangedEvents(row) filteredCnt++ } diff --git a/pkg/sink/codec/bootstraper_test.go b/pkg/sink/codec/bootstraper_test.go index ae11bdc1e1d..f755a4e9754 100644 --- a/pkg/sink/codec/bootstraper_test.go +++ b/pkg/sink/codec/bootstraper_test.go @@ -48,9 +48,9 @@ func getMockTableStatus(tableName string, TotalPartition: totalPartition, } row := &model.RowChangedEvent{ - PhysicalTableID: tableID, - TableInfo: tableInfo, + TableInfo: tableInfo, } + row.SetTableID(tableID) tb := newTableStatistic(key, row) return key, row, tb } @@ -192,9 +192,9 @@ func TestUpdateTableStatistic(t *testing.T) { );` tableInfo1 := helper.DDL2Event(sql).TableInfo row1 := &model.RowChangedEvent{ - PhysicalTableID: tableInfo1.ID, - TableInfo: tableInfo1, + TableInfo: tableInfo1, } + row1.SetTableID(tableInfo1.ID) tableStatistic := newTableStatistic(model.TopicPartitionKey{}, row1) // case 1: The tableStatistic should not be updated if the tableInfo is the same @@ -205,9 +205,9 @@ func TestUpdateTableStatistic(t *testing.T) { sql = `alter table test.t1 add column address varchar(255) not null;` tableInfo2 := helper.DDL2Event(sql).TableInfo row2 := &model.RowChangedEvent{ - PhysicalTableID: tableInfo2.ID, - TableInfo: tableInfo2, + TableInfo: tableInfo2, } + row2.SetTableID(tableInfo2.ID) tableStatistic.update(row2, 1) require.Equal(t, tableInfo2, tableStatistic.tableInfo.Load().(*model.TableInfo)) @@ -215,9 +215,9 @@ func TestUpdateTableStatistic(t *testing.T) { sql = `alter table test.t1 rename to test.t2;` tableInfo3 := helper.DDL2Event(sql).TableInfo row3 := &model.RowChangedEvent{ - PhysicalTableID: tableInfo3.ID, - TableInfo: tableInfo3, + TableInfo: tableInfo3, } + row3.SetTableID(tableInfo3.ID) tableStatistic.update(row3, 1) require.Equal(t, tableInfo3, tableStatistic.tableInfo.Load().(*model.TableInfo)) } diff --git a/pkg/sink/codec/builder/codec_test.go b/pkg/sink/codec/builder/codec_test.go index ce9b28fce7c..ae4fe915787 100644 --- a/pkg/sink/codec/builder/codec_test.go +++ b/pkg/sink/codec/builder/codec_test.go @@ -373,7 +373,7 @@ func benchmarkProtobuf1Decoding(b *testing.B) []*model.RowChangedEvent { ev.Columns = model.Columns2ColumnDatas(codecDecodeRowChangedPB1(value.NewValue), ev.TableInfo) ev.CommitTs = key.Ts if key.Partition >= 0 { - ev.PhysicalTableID = key.Partition + ev.SetTableID(key.Partition) ev.TableInfo.TableName.IsPartition = true } result = append(result, ev) @@ -431,7 +431,7 @@ func benchmarkProtobuf2Decoding(b *testing.B) []*model.RowChangedEvent { } ev.CommitTs = ts if keys.Partition[i] >= 0 { - ev.PhysicalTableID = keys.Partition[i] + ev.SetTableID(keys.Partition[i]) ev.TableInfo.TableName.IsPartition = true } result = append(result, ev) diff --git a/pkg/sink/codec/craft/craft_decoder.go b/pkg/sink/codec/craft/craft_decoder.go index 0d008bdc8a1..7426b3e468c 100644 --- a/pkg/sink/codec/craft/craft_decoder.go +++ b/pkg/sink/codec/craft/craft_decoder.go @@ -92,7 +92,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } partition := b.headers.GetPartition(b.index) if partition >= 0 { - ev.PhysicalTableID = partition + ev.SetTableID(partition) ev.TableInfo.TableName.IsPartition = true } b.index++ diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index e781819134e..5c7c05410ee 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -310,7 +310,7 @@ func TestE2EPartitionTable(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - // table id should be set to the partition table id, the PhysicalTableID + // table id should be set to the partition table id, the physicalTableID require.Equal(t, decodedEvent.GetTableID(), event.GetTableID()) } } diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index d9124c2063e..5ad94afe1fe 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -191,7 +191,7 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang // TODO: we lost the tableID from kafka message if key.Partition != nil { - e.PhysicalTableID = *key.Partition + e.SetTableID(*key.Partition) e.TableInfo.TableName.IsPartition = true } @@ -201,10 +201,10 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang func rowChangeColumns2CodecColumns(cols []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { - colx := model.GetColumnDataX(col, tb) - if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { - continue - } + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { + continue + } c := internal.Column{} c.FromRowChangeColumn(colx) jsonCols[colx.GetName()] = c diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 9fad6dae03b..a78239bcf7e 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -269,7 +269,7 @@ func TestE2EPartitionTable(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - // table id should be set to the partition table id, the PhysicalTableID + // table id should be set to the partition table id, the physicalTableID require.Equal(t, decodedEvent.GetTableID(), event.GetTableID()) } } diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index e0d13abe0dd..1639c40df96 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -385,12 +385,12 @@ func buildRowChangedEvent( msg *message, tableInfo *model.TableInfo, enableRowChecksum bool, db *sql.DB, ) (*model.RowChangedEvent, error) { result := &model.RowChangedEvent{ - CommitTs: msg.CommitTs, - PhysicalTableID: msg.TableID, - TableInfo: tableInfo, - Columns: decodeColumns(msg.Data, tableInfo), - PreColumns: decodeColumns(msg.Old, tableInfo), + CommitTs: msg.CommitTs, + TableInfo: tableInfo, + Columns: decodeColumns(msg.Data, tableInfo), + PreColumns: decodeColumns(msg.Old, tableInfo), } + result.SetTableID(msg.TableID) if enableRowChecksum && msg.Checksum != nil { result.Checksum = &integrity.Checksum{ From d54fb81fc905ec8d1b3c65bacfb824ad77761ad2 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Dec 2024 17:58:07 +0800 Subject: [PATCH 2/4] make fmt --- cdc/sink/dmlsink/txn/event_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 34fd2ce5319..4f9a51c562c 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -108,7 +108,7 @@ func TestGenKeys(t *testing.T) { }, tableInfoWithOneCompositeUniqueKey), } event2.SetTableID(47) - + event3 := &model.RowChangedEvent{ StartTs: 418658114257813514, CommitTs: 418658114257813515, From 44db34f5837bf2fd035af8e4da5cc60e83e51a54 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Dec 2024 18:00:00 +0800 Subject: [PATCH 3/4] make fmt --- cdc/model/sink.go | 12 ++++++------ cdc/model/sink_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index b0efbb10dbf..007f80180fb 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -397,13 +397,13 @@ func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent { tableInfo.TableName.TableID = r.Table.TableID tableInfo.TableName.IsPartition = r.Table.IsPartition row := &RowChangedEvent{ - StartTs: r.StartTs, - CommitTs: r.CommitTs, - physicalTableID: r.Table.TableID, - TableInfo: tableInfo, - Columns: Columns2ColumnDatas(r.Columns, tableInfo), - PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo), + StartTs: r.StartTs, + CommitTs: r.CommitTs, + TableInfo: tableInfo, + Columns: Columns2ColumnDatas(r.Columns, tableInfo), + PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo), } + row.SetTableID(r.Table.TableID) return row } diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 3086987d0a9..479d033f4df 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -648,10 +648,9 @@ func TestToRedoLog(t *testing.T) { } tableInfo := BuildTableInfo("test", "t", cols, [][]int{{1}}) event := &RowChangedEvent{ - StartTs: 100, - CommitTs: 1000, - physicalTableID: 1, - TableInfo: tableInfo, + StartTs: 100, + CommitTs: 1000, + TableInfo: tableInfo, Columns: Columns2ColumnDatas([]*Column{ { Name: "col1", @@ -663,6 +662,7 @@ func TestToRedoLog(t *testing.T) { }, }, tableInfo), } + event.SetTableID(1) eventInRedoLog := event.ToRedoLog() require.Equal(t, event.StartTs, eventInRedoLog.RedoRow.Row.StartTs) require.Equal(t, event.CommitTs, eventInRedoLog.RedoRow.Row.CommitTs) From 7dbc7de93b905dd5a14a5da84a2fa62b10a433ac Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 10 Dec 2024 18:41:05 +0800 Subject: [PATCH 4/4] fix the mysql gen key unit test --- cdc/sink/dmlsink/txn/event_test.go | 38 ++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 4f9a51c562c..d6fe5f18981 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -126,6 +126,40 @@ func TestGenKeys(t *testing.T) { } event3.SetTableID(47) + event4 := &model.RowChangedEvent{ + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + TableInfo: tableInfoWithTwoUniqueKeys, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a1", + Value: 12, + }, + { + Name: "a3", + Value: 1, + }, + }, tableInfoWithTwoUniqueKeys), + } + event4.SetTableID(47) + + event5 := &model.RowChangedEvent{ + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + TableInfo: tableInfoWithTwoUniqueKeys, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a1", + Value: 1, + }, + { + Name: "a3", + Value: 21, + }, + }, tableInfoWithTwoUniqueKeys), + } + event5.SetTableID(47) + testCases := []struct { txn *model.SingleTableTxn expected []uint64 @@ -139,12 +173,12 @@ func TestGenKeys(t *testing.T) { expected: []uint64{2072713494, 3710968706}, }, { txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{event1, event2}, + Rows: []*model.RowChangedEvent{event4, event5}, }, expected: []uint64{318190470, 2109733718, 2658640457, 2989258527}, }, { txn: &model.SingleTableTxn{ - Rows: []*model.RowChangedEvent{event3, event2}, + Rows: []*model.RowChangedEvent{event3, event5}, }, expected: []uint64{318190470, 2095136920, 2658640457}, }}