diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index bc381e16986..e25b2651e95 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -217,18 +217,15 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( table = message.Table eventType = message.EventType ) - - handleKeyData := message.getData() - pkNames := make([]string, 0, len(handleKeyData)) - for name := range handleKeyData { - pkNames = append(pkNames, name) + conditions := make(map[string]interface{}, len(message.pkNameSet())) + for name := range message.pkNameSet() { + conditions[name] = message.getData()[name] } - result := &canalJSONMessageWithTiDBExtension{ JSONMessage: &JSONMessage{ Schema: schema, Table: table, - PKNames: pkNames, + PKNames: message.PKNames, EventType: eventType, }, @@ -238,7 +235,7 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( } switch eventType { case "INSERT": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err @@ -246,7 +243,7 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( result.MySQLType = mysqlType result.Data = []map[string]interface{}{data} case "UPDATE": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err @@ -254,14 +251,18 @@ func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( result.MySQLType = mysqlType result.Data = []map[string]interface{}{data} - holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld()) + oldConditions := make(map[string]interface{}, len(message.pkNameSet())) + for name := range message.pkNameSet() { + conditions[name] = message.getOld()[name] + } + holder = common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, oldConditions) old, _, err := b.buildData(holder) if err != nil { return nil, err } result.Old = []map[string]interface{}{old} case "DELETE": - holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData) + holder := common.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions) data, mysqlType, err := b.buildData(holder) if err != nil { return nil, err