diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index d3ee0ecd2ab..784a1affe00 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -198,11 +198,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] + // already-executed DDL could filted by finishedTs. if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), + zap.String("state", job.State.String()), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index daf8eb2c936..6a5691cfcf8 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -301,7 +301,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { return false, nil } - if job.BinlogInfo.FinishedTS <= p.getResolvedTs() { + if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || + job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ { log.Info("ddl job finishedTs less than puller resolvedTs,"+ "discard the ddl job", zap.String("namespace", p.changefeedID.Namespace), diff --git a/tests/integration_tests/ddl_with_exists/conf/diff_config.toml b/tests/integration_tests/ddl_with_exists/conf/diff_config.toml new file mode 100644 index 00000000000..b13ade0d46e --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["ddl_with_exists.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/ddl_with_exists/run.sh b/tests/integration_tests/ddl_with_exists/run.sh new file mode 100755 index 00000000000..8115ce4ed1a --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + # this test contains `recover table`, which requires super privilege, so we + # can't use the normal user + TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; + esac + changefeed_id="ddl-with-exists" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} + + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + run_sql "CREATE DATABASE ddl_with_exists" + + cd $CUR + GO111MODULE=on go run test.go + + run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);" + check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_with_exists/test.go b/tests/integration_tests/ddl_with_exists/test.go new file mode 100644 index 00000000000..e7c018e49de --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/test.go @@ -0,0 +1,102 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "fmt" + "log" + "math/rand" + "os" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +func main() { + upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1") + upPort := GetEnvDefault("UP_TIDB_PORT", "4000") + dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort) + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Fatal("open db failed:", dsn, ", err: ", err) + } + defer db.Close() + + if err = db.Ping(); err != nil { + log.Fatal("ping db failed:", dsn, ", err: ", err) + } + log.Println("connect to tidb success, dsn: ", dsn) + + createTable := `create table if not exists ddl_with_exists.t%d ( + id int primary key auto_increment, + name varchar(255) + );` + addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;" + dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;" + addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);" + dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;" + + concurrency := 16 + maxTableCnt := 20 + db.SetMaxOpenConns(concurrency) + + start := time.Now() + for i := 0; i < maxTableCnt; i++ { + _, err := db.Exec(fmt.Sprintf(createTable, i)) + if err != nil { + log.Fatal("create table failed:", i, ", err: ", err) + } + } + log.Println("create table cost:", time.Since(start).Seconds(), "s") + + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + log.Println("worker start:", i) + for j := 0; j < 20; j++ { + idx := rand.Intn(maxTableCnt) + ddl := fmt.Sprintf(createTable, idx) + switch rand.Intn(5) { + case 0: + ddl = fmt.Sprintf(addColumn, idx) + case 1: + ddl = fmt.Sprintf(dropColumn, idx) + case 2: + ddl = fmt.Sprintf(addIndex, idx) + case 3: + ddl = fmt.Sprintf(dropIndex, idx) + default: + } + _, err := db.Exec(ddl) + if err != nil { + log.Println(err) + } + } + log.Println("worker exit:", i) + }() + } + wg.Wait() +} + +func GetEnvDefault(key, defaultV string) string { + val, ok := os.LookupEnv(key) + if !ok { + return defaultV + } + return val +} diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 9a248f2a2a8..78f86f40935 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -38,7 +38,7 @@ groups=( # G02 "$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv" # G03 - 'row_format drop_many_tables processor_stop_delay partition_table' + 'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists' # G04 'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop' # G05