Skip to content

Commit

Permalink
sink(ticdc): fix incorrect table info in delete event (#11975)
Browse files Browse the repository at this point in the history
ref #11879
  • Loading branch information
wk989898 authored Jan 7, 2025
1 parent 80f49c6 commit 99e00c0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
15 changes: 8 additions & 7 deletions cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,15 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error {
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
continue
}
var partitionID int64
if row.TableInfo.IsPartitionTable() {
partitionID = row.GetTableID()
}
tableID := row.GetTableID()
// use schema, table and tableID to identify a table
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), partitionID)
row.TableInfo.TableName.TableID = tableID
switch c.option.protocol {
case config.ProtocolCanalJSON:
default:
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID)
row.PhysicalTableID = tableID
}

group, ok := c.eventGroups[tableID]
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err
result.CommitTs = msg.getCommitTs()
result.PhysicalTableID = msg.getPhysicalTableID()
mysqlType := msg.getMySQLType()
result.TableInfo.TableName.IsPartition = msg.isPartition()
result.TableInfo.TableName.TableID = msg.getTableID()

var err error
if msg.eventType() == canal.EventType_DELETE {
Expand All @@ -236,8 +238,6 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err
if err != nil {
return nil, err
}
result.TableInfo.TableName.IsPartition = msg.isPartition()
result.TableInfo.TableName.TableID = msg.getTableID()

// for `UPDATE`, `old` contain old data, set it as the `PreColumns`
if msg.eventType() == canal.EventType_UPDATE {
Expand Down

0 comments on commit 99e00c0

Please sign in to comment.