Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl(ticdc): ignore ddl with schemaversion 0 (#11856) #11922

Open
wants to merge 1 commit into
base: release-8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
var snap *schema.Snapshot
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions.
// already-executed DDL could filted by finishedTs.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.String("state", job.State.String()),
zap.Int64("jobID", job.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
Expand Down
3 changes: 2 additions & 1 deletion cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, nil
}

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() {
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
"discard the ddl job",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_with_exists/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ddl_with_exists.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/ddl_with_exists/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://[email protected]:3306/" ;;
esac
changefeed_id="ddl-with-exists"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE ddl_with_exists"

cd $CUR
GO111MODULE=on go run test.go

run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);"
check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
102 changes: 102 additions & 0 deletions tests/integration_tests/ddl_with_exists/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"database/sql"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"

_ "github.com/go-sql-driver/mysql"
)

func main() {
upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1")
upPort := GetEnvDefault("UP_TIDB_PORT", "4000")
dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort)
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal("open db failed:", dsn, ", err: ", err)
}
defer db.Close()

if err = db.Ping(); err != nil {
log.Fatal("ping db failed:", dsn, ", err: ", err)
}
log.Println("connect to tidb success, dsn: ", dsn)

createTable := `create table if not exists ddl_with_exists.t%d (
id int primary key auto_increment,
name varchar(255)
);`
addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;"
dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;"
addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);"
dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;"

concurrency := 16
maxTableCnt := 20
db.SetMaxOpenConns(concurrency)

start := time.Now()
for i := 0; i < maxTableCnt; i++ {
_, err := db.Exec(fmt.Sprintf(createTable, i))
if err != nil {
log.Fatal("create table failed:", i, ", err: ", err)
}
}
log.Println("create table cost:", time.Since(start).Seconds(), "s")

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
log.Println("worker start:", i)
for j := 0; j < 20; j++ {
idx := rand.Intn(maxTableCnt)
ddl := fmt.Sprintf(createTable, idx)
switch rand.Intn(5) {
case 0:
ddl = fmt.Sprintf(addColumn, idx)
case 1:
ddl = fmt.Sprintf(dropColumn, idx)
case 2:
ddl = fmt.Sprintf(addIndex, idx)
case 3:
ddl = fmt.Sprintf(dropIndex, idx)
default:
}
_, err := db.Exec(ddl)
if err != nil {
log.Println(err)
}
}
log.Println("worker exit:", i)
}()
}
wg.Wait()
}

func GetEnvDefault(key, defaultV string) string {
val, ok := os.LookupEnv(key)
if !ok {
return defaultV
}
return val
}
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ groups=(
# G02
"$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
# G03
'row_format drop_many_tables processor_stop_delay partition_table'
'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists'
# G04
'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
# G05
Expand Down
Loading