Skip to content

Commit

Permalink
fix the case
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 20, 2024
1 parent de98de0 commit d07e445
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,15 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
table = message.Table
eventType = message.EventType
)

handleKeyData := message.getData()
pkNames := make([]string, 0, len(handleKeyData))
for name := range handleKeyData {
pkNames = append(pkNames, name)
conditions := make(map[string]interface{}, len(message.pkNameSet()))
for name := range message.pkNameSet() {
conditions[name] = message.getData()[name]
}

result := &canalJSONMessageWithTiDBExtension{
JSONMessage: &JSONMessage{
Schema: schema,
Table: table,
PKNames: pkNames,
PKNames: message.PKNames,

EventType: eventType,
},
Expand All @@ -238,30 +235,34 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent(
}
switch eventType {
case "INSERT":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.MySQLType = mysqlType
result.Data = []map[string]interface{}{data}
case "UPDATE":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.MySQLType = mysqlType
result.Data = []map[string]interface{}{data}

holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld())
oldConditions := make(map[string]interface{}, len(message.pkNameSet()))
for name := range message.pkNameSet() {
conditions[name] = message.getOld()[name]
}
holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, oldConditions)
old, _, err := b.buildData(holder)
if err != nil {
return nil, err
}
result.Old = []map[string]interface{}{old}
case "DELETE":
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData)
holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions)
data, mysqlType, err := b.buildData(holder)
if err != nil {
return nil, err
Expand Down

0 comments on commit d07e445

Please sign in to comment.