Skip to content

Commit

Permalink
query table info
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 18, 2024
1 parent 9a7d6c8 commit 74cc7ea
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
ti.rowColInfos[i] = rowcodec.ColInfo{
ID: col.ID,
IsPKHandle: pkIsHandle,
Ft: &col.FieldType,
Ft: col.FieldType.Clone(),
VirtualGenCol: col.IsGenerated(),
}
ti.rowColFieldTps[col.ID] = ti.rowColInfos[i].Ft
Expand Down
10 changes: 5 additions & 5 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 {
return c.Extensions.CommitTs
}

func (b *batchDecoder) queryTableInfo(schema, table string, columns []*model.Column, pkNames map[string]struct{}) *model.TableInfo {
func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
cacheKey := tableKey{
schema: schema,
table: table,
schema: *msg.getSchema(),
table: *msg.getTable(),
}
tableInfo, ok := b.tableInfoCache[cacheKey]
if !ok {
tableInfo = model.BuildTableInfoWithPKNames4Test(schema, table, columns, pkNames)
tableInfo = newTableInfo(msg)
b.tableInfoCache[cacheKey] = tableInfo
}
return tableInfo
Expand All @@ -185,7 +185,7 @@ func (b *batchDecoder) canalJSONMessage2RowChange(
msg canalJSONMessageInterface,
) (*model.RowChangedEvent, error) {
result := new(model.RowChangedEvent)
result.TableInfo = newTableInfo(b.msg)
result.TableInfo = b.queryTableInfo(msg)
result.CommitTs = msg.getCommitTs()

mysqlType := msg.getMySQLType()
Expand Down

0 comments on commit 74cc7ea

Please sign in to comment.