From db46721715d4d0867cb6124b2a233e6717a75537 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sat, 21 Dec 2024 12:57:27 +0800 Subject: [PATCH] This is an automated cherry-pick of #11856 Signed-off-by: ti-chi-bot --- cdc/entry/schema_storage.go | 6 ++ cdc/puller/ddl_puller.go | 18 ++++ .../ddl_with_exists/conf/diff_config.toml | 28 +++++ .../integration_tests/ddl_with_exists/run.sh | 57 ++++++++++ .../integration_tests/ddl_with_exists/test.go | 102 ++++++++++++++++++ tests/integration_tests/run_group.sh | 39 +++++++ 6 files changed, 250 insertions(+) create mode 100644 tests/integration_tests/ddl_with_exists/conf/diff_config.toml create mode 100755 tests/integration_tests/ddl_with_exists/run.sh create mode 100644 tests/integration_tests/ddl_with_exists/test.go diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 86dde5f1141..8db4b393906 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -202,13 +202,19 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] +<<<<<<< HEAD // We use schemaVersion to check if an already-executed DDL job is processed for a second time. // Unexecuted DDL jobs should have largest schemaVersions. if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { +======= + // already-executed DDL could filted by finishedTs. + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { +>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856)) log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), + zap.String("state", job.State.String()), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Int64("schemaVersion", s.schemaVersion), diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 4134ed53466..06089cb1c67 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -374,6 +374,24 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { return false, nil } +<<<<<<< HEAD +======= + if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || + job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ { + log.Info("ddl job finishedTs less than puller resolvedTs,"+ + "discard the ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("query", job.Query), + zap.Uint64("pullerResolvedTs", p.getResolvedTs())) + return true, nil + } + +>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856)) defer func() { if skip && err == nil { log.Info("ddl job schema or table does not match, discard it", diff --git a/tests/integration_tests/ddl_with_exists/conf/diff_config.toml b/tests/integration_tests/ddl_with_exists/conf/diff_config.toml new file mode 100644 index 00000000000..b13ade0d46e --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["ddl_with_exists.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/ddl_with_exists/run.sh b/tests/integration_tests/ddl_with_exists/run.sh new file mode 100755 index 00000000000..b7810ac2e8f --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + # this test contains `recover table`, which requires super privilege, so we + # can't use the normal user + TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + pulsar) + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + ;; + *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; + esac + changefeed_id="ddl-with-exists" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} + + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + esac + + run_sql "CREATE DATABASE ddl_with_exists" + + cd $CUR + GO111MODULE=on go run test.go + + run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);" + check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +# run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_with_exists/test.go b/tests/integration_tests/ddl_with_exists/test.go new file mode 100644 index 00000000000..e7c018e49de --- /dev/null +++ b/tests/integration_tests/ddl_with_exists/test.go @@ -0,0 +1,102 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "fmt" + "log" + "math/rand" + "os" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +func main() { + upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1") + upPort := GetEnvDefault("UP_TIDB_PORT", "4000") + dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort) + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Fatal("open db failed:", dsn, ", err: ", err) + } + defer db.Close() + + if err = db.Ping(); err != nil { + log.Fatal("ping db failed:", dsn, ", err: ", err) + } + log.Println("connect to tidb success, dsn: ", dsn) + + createTable := `create table if not exists ddl_with_exists.t%d ( + id int primary key auto_increment, + name varchar(255) + );` + addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;" + dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;" + addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);" + dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;" + + concurrency := 16 + maxTableCnt := 20 + db.SetMaxOpenConns(concurrency) + + start := time.Now() + for i := 0; i < maxTableCnt; i++ { + _, err := db.Exec(fmt.Sprintf(createTable, i)) + if err != nil { + log.Fatal("create table failed:", i, ", err: ", err) + } + } + log.Println("create table cost:", time.Since(start).Seconds(), "s") + + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + log.Println("worker start:", i) + for j := 0; j < 20; j++ { + idx := rand.Intn(maxTableCnt) + ddl := fmt.Sprintf(createTable, idx) + switch rand.Intn(5) { + case 0: + ddl = fmt.Sprintf(addColumn, idx) + case 1: + ddl = fmt.Sprintf(dropColumn, idx) + case 2: + ddl = fmt.Sprintf(addIndex, idx) + case 3: + ddl = fmt.Sprintf(dropIndex, idx) + default: + } + _, err := db.Exec(ddl) + if err != nil { + log.Println(err) + } + } + log.Println("worker exit:", i) + }() + } + wg.Wait() +} + +func GetEnvDefault(key, defaultV string) string { + val, ok := os.LookupEnv(key) + if !ok { + return defaultV + } + return val +} diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index d7fb39a03e1..dda90fa22d2 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -31,6 +31,7 @@ storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_t declare -A groups groups=( # Note: only the tests in the first three groups are running in storage sink pipeline. +<<<<<<< HEAD ["G00"]="$mysql_only $kafka_only $storage_only" ["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl" ["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv" @@ -50,6 +51,44 @@ groups=( # currently G16 is not running in kafka pipeline ["G16"]='owner_resign processor_etcd_worker_delay sink_hang' ["G17"]='clustered_index processor_resolved_ts_fallback' +======= + # G00 + "$mysql_only $kafka_only $storage_only" + # G01 + "$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl" + # G02 + "$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv" + # G03 + 'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists' + # G04 + 'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop' + # G05 + 'charset_gbk ddl_manager multi_source vector' + # G06 + 'sink_retry changefeed_error ddl_sequence resourcecontrol' + # G07 pulsar oauth2 authentication enabled + 'kv_client_stream_reconnect cdc split_region' + # G08 + 'processor_err_chan changefeed_reconstruct multi_capture synced_status_with_redo' + # G09 + 'gc_safepoint changefeed_pause_resume cli_with_auth savepoint synced_status' + # G10 + 'default_value simple cdc_server_tips event_filter sql_mode' + # G11 + 'resolve_lock move_table autorandom generate_column' + # G12 + 'many_pk_or_uk capture_session_done_during_task ddl_attributes' + # G13 pulsar mtls authentication enabled + 'tiflash region_merge common_1' + # G14 + 'changefeed_finish force_replicate_table' + # G15 + 'new_ci_collation batch_add_table multi_rocks ci_collation_compatibility' + # G16, currently G16 is not running in kafka pipeline + 'owner_resign processor_etcd_worker_delay sink_hang' + # G17 + 'clustered_index processor_resolved_ts_fallback' +>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856)) # only run the following tests in mysql pipeline ["G18"]='availability http_proxies sequence' ["G19"]='changefeed_fast_fail batch_update_to_no_batch changefeed_resume_with_checkpoint_ts'