Skip to content

Commit

Permalink
ddl (ticdc): support replicate ddl in BDR mode (#10299)
Browse files Browse the repository at this point in the history
close #10301
  • Loading branch information
asddongmen authored Jan 5, 2024
1 parent 368d20f commit f80995c
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 101 deletions.
5 changes: 4 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ type DDLEvent struct {
Charset string `msg:"-"`
Collate string `msg:"-"`
IsBootstrap bool `msg:"-"`
// BDRRole is the role of the TiDB cluster, it is used to determine whether
// the DDL is executed by the primary cluster.
BDRRole string `msg:"-"`
}

// FromJob fills the values with DDLEvent from DDL job
Expand All @@ -710,7 +713,7 @@ func (d *DDLEvent) FromJobWithArgs(
d.TableInfo = tableInfo
d.Charset = job.Charset
d.Collate = job.Collate

d.BDRRole = job.BDRRole
switch d.Type {
// The query for "DROP TABLE" and "DROP VIEW" statements need
// to be rebuilt. The reason is elaborated as follows:
Expand Down
23 changes: 10 additions & 13 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/ast"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -332,12 +333,15 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
return nil
}

// If changefeed is in BDRMode, skip ddl.
if m.BDRMode {
log.Info("changefeed is in BDRMode, skip a ddl event",
// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && m.executingDDL.BDRRole != string(ast.BDRRolePrimary) {
log.Info("changefeed is in BDRMode and "+
"the DDL is not executed by Primary Cluster, skip it",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", m.executingDDL))
zap.Any("ddlEvent", m.executingDDL),
zap.String("bdrRole", m.executingDDL.BDRRole))
tableName := m.executingDDL.TableInfo.TableName
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
Expand Down Expand Up @@ -373,6 +377,7 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
tableName := m.executingDDL.TableInfo.TableName
log.Info("execute a ddl event successfully",
zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.executingDDL.BDRRole),
zap.Uint64("commitTs", m.executingDDL.CommitTs),
zap.Stringer("table", tableName),
)
Expand Down Expand Up @@ -526,12 +531,8 @@ func (m *ddlManager) allPhysicalTables(ctx context.Context) ([]model.TableID, er

// getSnapshotTs returns the ts that we should use
// to get the snapshot of the schema, the rules are:
// 1. If the changefeed is just started, we use the startTs,
// If the changefeed is just started, we use the startTs,
// otherwise we use the checkpointTs.
// 2. If the changefeed is in BDRMode, we use the ddlManager.ddlResolvedTs.
// Since TiCDC ignore the DDLs in BDRMode, we don't need to care about whether
// the DDLs are executed or not. We should use the ddlResolvedTs to get the up-to-date
// schema.
func (m *ddlManager) getSnapshotTs() (ts uint64) {
ts = m.checkpointTs

Expand All @@ -549,10 +550,6 @@ func (m *ddlManager) getSnapshotTs() (ts uint64) {
return
}

if m.BDRMode {
ts = m.ddlResolvedTs
}

log.Debug("snapshotTs", zap.Uint64("ts", ts))
return ts
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,18 @@ func TestGetSnapshotTs(t *testing.T) {
dm := createDDLManagerForTest(t)
dm.startTs = 0
dm.checkpointTs = 1
require.Equal(t, dm.getSnapshotTs(), dm.startTs)
require.Equal(t, dm.startTs, dm.getSnapshotTs())

dm.startTs = 1
dm.checkpointTs = 10
dm.BDRMode = true
dm.ddlResolvedTs = 15
require.Equal(t, dm.getSnapshotTs(), dm.ddlResolvedTs)
require.Equal(t, dm.checkpointTs, dm.getSnapshotTs())

dm.startTs = 1
dm.checkpointTs = 10
dm.BDRMode = false
require.Equal(t, dm.getSnapshotTs(), dm.checkpointTs)
require.Equal(t, dm.checkpointTs, dm.getSnapshotTs())
}

func TestExecRenameTablesDDL(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestAllTables(t *testing.T) {
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 102,
TableID: 104,
}, tableName)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
Expand All @@ -127,7 +127,7 @@ func TestAllTables(t *testing.T) {
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 102,
TableID: 104,
}, tableName)
}

Expand Down
18 changes: 18 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/quotes"
Expand Down Expand Up @@ -89,6 +90,11 @@ func NewDDLSink(
return nil, err
}

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
return nil, err
}

m := &DDLSink{
id: changefeedID,
db: db,
Expand Down Expand Up @@ -201,6 +207,18 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
}
}

// we try to set cdc write source for the ddl
if err = pmysql.SetWriteSource(pctx, m.cfg, tx); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Error("Failed to rollback",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
}
}
return err
}

if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query),
Expand Down
11 changes: 11 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ func TestWriteDDLEvent(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 1").WillReturnResult(sqlmock.NewResult(1, 0))

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 0").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 0").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
WillReturnError(&dmysql.MySQLError{
Number: uint16(infoschema.ErrColumnExists.Code()),
Expand Down Expand Up @@ -163,6 +170,10 @@ func TestAsyncExecAddIndex(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
23 changes: 1 addition & 22 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
// Set session variables first and then execute the transaction.
// we try to set write source for each txn,
// so we can use it to trace the data source
if err = s.setWriteSource(pctx, tx); err != nil {
if err = pmysql.SetWriteSource(pctx, s.cfg, tx); err != nil {
err := logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed,
Expand Down Expand Up @@ -872,24 +872,3 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) {
func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) {
s.dmlMaxRetry = maxRetry
}

// setWriteSource sets write source for the transaction.
func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error {
// we only set write source when donwstream is TiDB and write source is existed.
if !s.cfg.IsWriteSourceExisted {
return nil
}
// downstream is TiDB, set system variables.
// We should always try to set this variable, and ignore the error if
// downstream does not support this variable, it is by design.
query := fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", s.cfg.SourceID)
_, err := txn.ExecContext(ctx, query)
if err != nil {
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
mysqlErr.Number == mysql.ErrUnknownSystemVariable {
return nil
}
return err
}
return nil
}
27 changes: 14 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ require (
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20231204093812-96c40585233f
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20231212043317-b478056bbf73
github.com/pingcap/tidb v1.1.0-beta.0.20240105042433-54d8a1416ab0
github.com/pingcap/tidb-tools v0.0.0-20231228035519-c4bdf178b3d6
github.com/pingcap/tidb/pkg/parser v0.0.0-20231212043317-b478056bbf73
github.com/prometheus/client_golang v1.17.0
github.com/pingcap/tidb/pkg/parser v0.0.0-20231229060758-e19e06e1bc19
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -88,9 +88,9 @@ require (
github.com/swaggo/swag v1.16.2
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0
github.com/tikv/client-go/v2 v2.0.8-0.20231227070846-61c486af13a5
github.com/tikv/pd v1.1.0-beta.0.20231212061647-ab97b9a267f3
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e
github.com/tikv/pd/client v0.0.0-20240103101103-a4d2f1ca365a
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -105,21 +105,21 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.13.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.3.0
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/net v0.19.0
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231211222908-989df2bf70f3
google.golang.org/grpc v1.60.0
google.golang.org/protobuf v1.31.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.4.5
gorm.io/gorm v1.24.5
Expand Down Expand Up @@ -149,6 +149,7 @@ require (
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down Expand Up @@ -363,7 +364,7 @@ require (
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/tools v0.16.1 // indirect
Expand Down
Loading

0 comments on commit f80995c

Please sign in to comment.