Skip to content

Commit

Permalink
This is an automated cherry-pick of #11869
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wlwilliamx authored and ti-chi-bot committed Dec 20, 2024
1 parent 9f0a11c commit c5111c7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error {
return nil
}

<<<<<<< HEAD

Check failure on line 112 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 112 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
// GetSchemaVersion returns the schema version of the meta.
func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
// After we get the schema version at startTs, if the diff corresponding to that version does not exist,
Expand All @@ -127,6 +128,8 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
return version, nil
}

=======

Check failure on line 131 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 131 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869))

Check failure on line 132 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 132 in cdc/entry/schema/snapshot.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
Expand Down
20 changes: 5 additions & 15 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ type schemaStorage struct {
snaps []*schema.Snapshot
snapsMu sync.RWMutex

gcTs uint64
resolvedTs uint64
schemaVersion int64
gcTs uint64
resolvedTs uint64

filter filter.Filter

Expand All @@ -91,9 +90,8 @@ func NewSchemaStorage(
role util.Role, filter filter.Filter,
) (SchemaStorage, error) {
var (
snap *schema.Snapshot
version int64
err error
snap *schema.Snapshot
err error
)
// storage may be nil in some unit test cases.
if storage == nil {
Expand All @@ -104,7 +102,6 @@ func NewSchemaStorage(
if err != nil {
return nil, errors.Trace(err)
}
version, err = schema.GetSchemaVersion(meta)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -115,7 +112,6 @@ func NewSchemaStorage(
forceReplicate: forceReplicate,
filter: filter,
id: id,
schemaVersion: version,
role: role,
}, nil
}
Expand Down Expand Up @@ -193,7 +189,6 @@ func (s *schemaStorage) GetLastSnapshot() *schema.Snapshot {
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
if s.skipJob(job) {
s.schemaVersion = job.BinlogInfo.SchemaVersion
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
return nil
}
Expand All @@ -202,16 +197,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
var snap *schema.Snapshot
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.Int64("jobID", job.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.String("role", s.role.String()))
return nil
Expand All @@ -233,7 +225,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
return errors.Trace(err)
}
s.snaps = append(s.snaps, snap)
s.schemaVersion = job.BinlogInfo.SchemaVersion
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
log.Info("schemaStorage: update snapshot by the DDL job",
zap.String("namespace", s.id.Namespace),
Expand All @@ -242,7 +233,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.Uint64("schemaVersion", uint64(s.schemaVersion)),
zap.String("role", s.role.String()))
return nil
}
Expand Down

0 comments on commit c5111c7

Please sign in to comment.