Skip to content

Commit

Permalink
ddl(ticdc): ignore ddl with schemaversion 0 (#11856)
Browse files Browse the repository at this point in the history
close #11839
  • Loading branch information
CharlesCheung96 authored Dec 21, 2024
1 parent c5b8800 commit 7f57e1f
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
var snap *schema.Snapshot
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
// already-executed DDL could filted by finishedTs.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.String("state", job.State.String()),
zap.Int64("jobID", job.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
Expand Down
3 changes: 2 additions & 1 deletion cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, nil
}

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() {
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
"discard the ddl job",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_with_exists/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ddl_with_exists.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/ddl_with_exists/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://[email protected]:3306/" ;;
esac
changefeed_id="ddl-with-exists"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE ddl_with_exists"

cd $CUR
GO111MODULE=on go run test.go

run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);"
check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
# run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
102 changes: 102 additions & 0 deletions tests/integration_tests/ddl_with_exists/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"database/sql"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"

_ "github.com/go-sql-driver/mysql"
)

func main() {
upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1")
upPort := GetEnvDefault("UP_TIDB_PORT", "4000")
dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort)
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal("open db failed:", dsn, ", err: ", err)
}
defer db.Close()

if err = db.Ping(); err != nil {
log.Fatal("ping db failed:", dsn, ", err: ", err)
}
log.Println("connect to tidb success, dsn: ", dsn)

createTable := `create table if not exists ddl_with_exists.t%d (
id int primary key auto_increment,
name varchar(255)
);`
addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;"
dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;"
addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);"
dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;"

concurrency := 16
maxTableCnt := 20
db.SetMaxOpenConns(concurrency)

start := time.Now()
for i := 0; i < maxTableCnt; i++ {
_, err := db.Exec(fmt.Sprintf(createTable, i))
if err != nil {
log.Fatal("create table failed:", i, ", err: ", err)
}
}
log.Println("create table cost:", time.Since(start).Seconds(), "s")

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
log.Println("worker start:", i)
for j := 0; j < 20; j++ {
idx := rand.Intn(maxTableCnt)
ddl := fmt.Sprintf(createTable, idx)
switch rand.Intn(5) {
case 0:
ddl = fmt.Sprintf(addColumn, idx)
case 1:
ddl = fmt.Sprintf(dropColumn, idx)
case 2:
ddl = fmt.Sprintf(addIndex, idx)
case 3:
ddl = fmt.Sprintf(dropIndex, idx)
default:
}
_, err := db.Exec(ddl)
if err != nil {
log.Println(err)
}
}
log.Println("worker exit:", i)
}()
}
wg.Wait()
}

func GetEnvDefault(key, defaultV string) string {
val, ok := os.LookupEnv(key)
if !ok {
return defaultV
}
return val
}
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ groups=(
# G02
"$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
# G03
'row_format drop_many_tables processor_stop_delay partition_table'
'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists'
# G04
'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
# G05
Expand Down

0 comments on commit 7f57e1f

Please sign in to comment.