diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 1757c26725f..08fd9f4b27f 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -103,24 +103,6 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } -// GetSchemaVersion returns the schema version of the meta. -func GetSchemaVersion(meta *timeta.Meta) (int64, error) { - // After we get the schema version at startTs, if the diff corresponding to that version does not exist, - // it means that the job is not committed yet, so we should subtract one from the version, i.e., version--. - version, err := meta.GetSchemaVersion() - if err != nil { - return 0, errors.Trace(err) - } - diff, err := meta.GetSchemaDiff(version) - if err != nil { - return 0, errors.Trace(err) - } - if diff == nil { - version-- - } - return version, nil -} - // NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSnapshotFromMeta( meta *timeta.Meta, diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index f0433939b02..844eaeb338e 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -54,11 +54,10 @@ type SchemaStorage interface { } type schemaStorageImpl struct { - snaps []*schema.Snapshot - snapsMu sync.RWMutex - gcTs uint64 - resolvedTs uint64 - schemaVersion int64 + snaps []*schema.Snapshot + snapsMu sync.RWMutex + gcTs uint64 + resolvedTs uint64 forceReplicate bool @@ -73,9 +72,8 @@ func NewSchemaStorage( role util.Role, filter filter.Filter, ) (SchemaStorage, error) { var ( - snap *schema.Snapshot - err error - version int64 + snap *schema.Snapshot + err error ) if meta == nil { snap = schema.NewEmptySnapshot(forceReplicate) @@ -84,7 +82,6 @@ func NewSchemaStorage( if err != nil { return nil, errors.Trace(err) } - version, err = schema.GetSchemaVersion(meta) if err != nil { return nil, errors.Trace(err) } @@ -98,7 +95,6 @@ func NewSchemaStorage( resolvedTs: startTs, forceReplicate: forceReplicate, id: id, - schemaVersion: version, role: role, } return schema, nil @@ -177,7 +173,6 @@ func (s *schemaStorageImpl) GetLastSnapshot() *schema.Snapshot { // HandleDDLJob creates a new snapshot in storage and handles the ddl job func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { if s.skipJob(job) { - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil } @@ -186,16 +181,13 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] - // We use schemaVersion to check if an already-executed DDL job is processed for a second time. - // Unexecuted DDL jobs should have largest schemaVersions. - if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { - log.Info("ignore foregone DDL", + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { + log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.Int64("schemaVersion", s.schemaVersion), zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), zap.String("role", s.role.String())) return nil @@ -223,7 +215,6 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { zap.String("role", s.role.String())) s.snaps = append(s.snaps, snap) - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil }