Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(ticdc): fix unstable cdc daily tests #11892

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions tests/integration_tests/cdc/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,5 @@ func main() {
}
}()

// TODO(dongmen): remove the useless code
sourceDBs, err := util.CreateSourceDBs()
if err != nil {
log.S().Fatal(err)
}
defer func() {
if err := util.CloseDBs(sourceDBs); err != nil {
log.S().Errorf("Failed to close source databases: %s\n", err)
}
}()

dailytest.Run(sourceDB, targetDB, cfg.SourceDBCfg[0].Name, cfg.WorkerCount, cfg.JobCount, cfg.Batch)
}
5 changes: 4 additions & 1 deletion tests/integration_tests/cdc/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) {
"insert into eligible_table (uk, ncol) values (2,2);",
"ALTER TABLE eligible_table ADD COLUMN c1 INT NOT NULL;",
"insert into eligible_table (uk, ncol, c1) values (3,4,5);",

"CREATE TABLE finish_mark(id int primary key);",
}
// execute SQL but don't check
for _, sql := range sqls {
Expand All @@ -216,7 +218,7 @@ TestLoop:
if synced {
break TestLoop
}
if tableName == "eligible_table" {
if tableName == "finish_mark" {
synced = true
}
}
Expand All @@ -227,6 +229,7 @@ TestLoop:
"DROP TABLE ineligible_table1;",
"DROP TABLE ineligible_table2;",
"DROP TABLE eligible_table;",
"DROP TABLE finish_mark;",
}
tr.execSQLs(sqls)
}
Expand Down
24 changes: 11 additions & 13 deletions tests/integration_tests/default_value/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,21 @@ func main() {
log.S().Infof("DefaultValue integration tests take %v", time.Since(start))
}()
wg.Add(2)
go testMultiDDLs([]*sql.DB{sourceDB0, sourceDB1}, &wg)
go testGetDefaultValue([]*sql.DB{sourceDB0, sourceDB1}, &wg)

go func() {
defer wg.Done()
testMultiDDLs([]*sql.DB{sourceDB0, sourceDB1})
}()
go func() {
defer wg.Done()
testGetDefaultValue([]*sql.DB{sourceDB0, sourceDB1})
}()
wg.Wait()
util.MustExec(sourceDB0, "create table mark.finish_mark(a int primary key);")
}

// for every DDL, run the DDL continuously, and one goroutine for one TiDB instance to do some DML op
func testGetDefaultValue(srcs []*sql.DB, wg *sync.WaitGroup) {
defer wg.Done()
func testGetDefaultValue(srcs []*sql.DB) {
start := time.Now()
defer func() {
log.S().Infof("testGetDefaultValue take %v", time.Since(start))
Expand Down Expand Up @@ -342,9 +348,7 @@ func ddlDefaultValueFunc(ctx context.Context, db *sql.DB, format string, table s
}
}

func testMultiDDLs(srcs []*sql.DB, wg *sync.WaitGroup) {
defer wg.Done()

func testMultiDDLs(srcs []*sql.DB) {
type Unit struct {
Fmt string
DefaultValue interface{}
Expand Down Expand Up @@ -886,9 +890,3 @@ func mustCreateTable(db *sql.DB, tableName string) {
sql := fmt.Sprintf(createTableSQL, tableName)
util.MustExec(db, sql)
}

func mustCreateTableWithConn(ctx context.Context, conn *sql.Conn, tableName string) {
util.MustExecWithConn(ctx, conn, createDatabaseSQL)
sql := fmt.Sprintf(createTableSQL, tableName)
util.MustExecWithConn(ctx, conn, sql)
}
52 changes: 0 additions & 52 deletions tests/integration_tests/util/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,55 +156,3 @@ func MustExecWithConn(ctx context.Context, conn *sql.Conn, sql string, args ...i
log.S().Fatal(err)
}
}

// CreateSourceDBs return source sql.DB for test
// we create two TiDB instance now in tests/integration_tests/run.sh, change it if needed
func CreateSourceDBs() (dbs []*sql.DB, err error) {
cfg := DBConfig{
Host: "127.0.0.1",
User: "root",
Password: "",
Name: "test",
Port: 4000,
}

src1, err := CreateDB(cfg)
if err != nil {
return nil, errors.Trace(err)
}

cfg.Port = 4001
src2, err := CreateDB(cfg)
if err != nil {
return nil, errors.Trace(err)
}

dbs = append(dbs, src1, src2)
return
}

// CreateSourceDB return source sql.DB for test
func CreateSourceDB() (db *sql.DB, err error) {
cfg := DBConfig{
Host: "127.0.0.1",
User: "root",
Password: "",
Name: "test",
Port: 4000,
}

return CreateDB(cfg)
}

// CreateSinkDB return sink sql.DB for test
func CreateSinkDB() (db *sql.DB, err error) {
cfg := DBConfig{
Host: "127.0.0.1",
User: "root",
Password: "",
Name: "test",
Port: 3306,
}

return CreateDB(cfg)
}
Loading