Skip to content

Commit

Permalink
Merge branch 'release-8.5' into purelind/test-ci-on-release-8.5
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 23, 2024
2 parents dd37e8e + 201a10d commit d1fbf28
Show file tree
Hide file tree
Showing 35 changed files with 888 additions and 193 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

// owner apis
ownerGroup := v2.Group("/owner")
unsafeGroup.Use(ownerMiddleware)
ownerGroup.Use(ownerMiddleware)
ownerGroup.POST("/resign", api.resignOwner)

// common APIs
Expand Down
80 changes: 41 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,20 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}
baseInfo.RecordID = recordID

rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, recordID, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,28 +237,21 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra

func (m *mounter) unmarshalRowKVEntry(
tableInfo *model.TableInfo,
rawKey []byte,
rawValue []byte,
rawOldValue []byte,
base baseKVEntry,
) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
}
base.RecordID = recordID

var (
row, preRow map[int64]types.Datum
rowExist, preRowExist bool
)

row, rowExist, err = m.decodeRow(rawValue, recordID, tableInfo, false)
row, rowExist, err := m.decodeRow(rawValue, base.RecordID, tableInfo, false)
if err != nil {
return nil, errors.Trace(err)
}

preRow, preRowExist, err = m.decodeRow(rawOldValue, recordID, tableInfo, true)
preRow, preRowExist, err = m.decodeRow(rawOldValue, base.RecordID, tableInfo, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -490,33 +489,34 @@ func (m *mounter) verifyColumnChecksum(

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))
return 0, false, err
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -602,7 +602,7 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
handle kv.Handle, key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
Expand All @@ -621,12 +621,14 @@ func verifyRawBytesChecksum(
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, handle, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
Expand All @@ -635,7 +637,10 @@ func verifyRawBytesChecksum(
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))

return expected, false, nil
}
Expand All @@ -645,7 +650,7 @@ func verifyRawBytesChecksum(
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
handle kv.Handle, key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand All @@ -665,17 +670,22 @@ func (m *mounter) verifyChecksum(
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
case 1, 2:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand All @@ -685,7 +695,7 @@ func (m *mounter) verifyChecksum(
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
tableInfo *model.TableInfo, row *rowKVEntry, handle kv.Handle, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
Expand Down Expand Up @@ -719,19 +729,15 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, handle, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -751,18 +757,14 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, handle, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down
10 changes: 6 additions & 4 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package entry
import (
"context"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/ddl"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -273,8 +273,7 @@ func (s *schemaStorage) AllPhysicalTables(ctx context.Context, ts model.Ts) ([]m
log.Debug("get new schema snapshot",
zap.Uint64("ts", ts),
zap.Uint64("snapTs", snap.CurrentTs()),
zap.Any("tables", res),
zap.String("snapshot", snap.DumpToString()))
zap.Any("tables", res))

return res, nil
}
Expand Down Expand Up @@ -410,7 +409,10 @@ func (s *schemaStorage) BuildDDLEvents(
}
case timodel.ActionCreateTables:
if job.BinlogInfo != nil && job.BinlogInfo.MultipleTableInfos != nil {
querys := strings.Split(job.Query, ";")
querys, err := ddl.SplitQueries(job.Query)
if err != nil {
return nil, errors.Trace(err)
}
multiTableInfos := job.BinlogInfo.MultipleTableInfos
for index, tableInfo := range multiTableInfos {
newTableInfo := model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, tableInfo)
Expand Down
14 changes: 12 additions & 2 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,23 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
// DML2Event execute the dml and return the corresponding row changed event.
// caution: it does not support `delete` since the key value cannot be found
// after the query executed.
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent {
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string, partitionID ...string) *model.RowChangedEvent {
s.tk.MustExec(dml)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
tableID := tableInfo.ID

var partitionTableID int64 = -1
if len(partitionID) == 1 {
partitionTableID = tableInfo.TableInfo.GetPartitionInfo().GetPartitionIDByName(partitionID[0])
}
if partitionTableID != -1 {
tableID = partitionTableID
}

key, value := s.getLastKeyValue(tableID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Expand Down
7 changes: 0 additions & 7 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,6 @@ type NameBasedColumnIDAllocator struct {
nameToIDMap map[string]int64
}

// NewNameBasedColumnIDAllocator creates a new NameBasedColumnIDAllocator
func NewNameBasedColumnIDAllocator(nameToIDMap map[string]int64) *NameBasedColumnIDAllocator {
return &NameBasedColumnIDAllocator{
nameToIDMap: nameToIDMap,
}
}

// GetColumnID return the column id of the name
func (n *NameBasedColumnIDAllocator) GetColumnID(name string) int64 {
colID, ok := n.nameToIDMap[name]
Expand Down
12 changes: 12 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ func (m *ddlManager) tick(
}

for _, event := range events {
snap := m.schema.GetLastSnapshot()
if event.Type == timodel.ActionCreateTable ||
event.Type == timodel.ActionCreateTables {
if snap.IsIneligibleTableID(event.TableInfo.ID) {
log.Info("table is ineligible, skip the ddl",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.String("query", job.Query),
zap.Any("table", event.TableInfo))
continue
}
}
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}
Expand Down
Loading

0 comments on commit d1fbf28

Please sign in to comment.