Skip to content

Commit

Permalink
This is an automated cherry-pick of #11856
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Dec 21, 2024
1 parent 9f0a11c commit db46721
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,19 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
var snap *schema.Snapshot
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
<<<<<<< HEAD

Check failure on line 205 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }

Check failure on line 205 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
=======

Check failure on line 209 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected }

Check failure on line 209 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected }
// already-executed DDL could filted by finishedTs.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {
>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))

Check failure on line 212 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected }

Check failure on line 212 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 212 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected }

Check failure on line 212 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.String("state", job.State.String()),
zap.Int64("jobID", job.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
Expand Down
18 changes: 18 additions & 0 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,24 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, nil
}

<<<<<<< HEAD
=======
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
"discard the ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Uint64("pullerResolvedTs", p.getResolvedTs()))
return true, nil
}

>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))
defer func() {
if skip && err == nil {
log.Info("ddl job schema or table does not match, discard it",
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_with_exists/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ddl_with_exists.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/ddl_with_exists/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://[email protected]:3306/" ;;
esac
changefeed_id="ddl-with-exists"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE ddl_with_exists"

cd $CUR
GO111MODULE=on go run test.go

run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);"
check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
# run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
102 changes: 102 additions & 0 deletions tests/integration_tests/ddl_with_exists/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"database/sql"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"

_ "github.com/go-sql-driver/mysql"
)

func main() {
upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1")
upPort := GetEnvDefault("UP_TIDB_PORT", "4000")
dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort)
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal("open db failed:", dsn, ", err: ", err)
}
defer db.Close()

if err = db.Ping(); err != nil {
log.Fatal("ping db failed:", dsn, ", err: ", err)
}
log.Println("connect to tidb success, dsn: ", dsn)

createTable := `create table if not exists ddl_with_exists.t%d (
id int primary key auto_increment,
name varchar(255)
);`
addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;"
dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;"
addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);"
dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;"

concurrency := 16
maxTableCnt := 20
db.SetMaxOpenConns(concurrency)

start := time.Now()
for i := 0; i < maxTableCnt; i++ {
_, err := db.Exec(fmt.Sprintf(createTable, i))
if err != nil {
log.Fatal("create table failed:", i, ", err: ", err)
}
}
log.Println("create table cost:", time.Since(start).Seconds(), "s")

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
log.Println("worker start:", i)
for j := 0; j < 20; j++ {
idx := rand.Intn(maxTableCnt)
ddl := fmt.Sprintf(createTable, idx)
switch rand.Intn(5) {
case 0:
ddl = fmt.Sprintf(addColumn, idx)
case 1:
ddl = fmt.Sprintf(dropColumn, idx)
case 2:
ddl = fmt.Sprintf(addIndex, idx)
case 3:
ddl = fmt.Sprintf(dropIndex, idx)
default:
}
_, err := db.Exec(ddl)
if err != nil {
log.Println(err)
}
}
log.Println("worker exit:", i)
}()
}
wg.Wait()
}

func GetEnvDefault(key, defaultV string) string {
val, ok := os.LookupEnv(key)
if !ok {
return defaultV
}
return val
}
39 changes: 39 additions & 0 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_t
declare -A groups
groups=(
# Note: only the tests in the first three groups are running in storage sink pipeline.
<<<<<<< HEAD
["G00"]="$mysql_only $kafka_only $storage_only"
["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
Expand All @@ -50,6 +51,44 @@ groups=(
# currently G16 is not running in kafka pipeline
["G16"]='owner_resign processor_etcd_worker_delay sink_hang'
["G17"]='clustered_index processor_resolved_ts_fallback'
=======
# G00
"$mysql_only $kafka_only $storage_only"
# G01
"$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
# G02
"$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
# G03
'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists'
# G04
'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
# G05
'charset_gbk ddl_manager multi_source vector'
# G06
'sink_retry changefeed_error ddl_sequence resourcecontrol'
# G07 pulsar oauth2 authentication enabled
'kv_client_stream_reconnect cdc split_region'
# G08
'processor_err_chan changefeed_reconstruct multi_capture synced_status_with_redo'
# G09
'gc_safepoint changefeed_pause_resume cli_with_auth savepoint synced_status'
# G10
'default_value simple cdc_server_tips event_filter sql_mode'
# G11
'resolve_lock move_table autorandom generate_column'
# G12
'many_pk_or_uk capture_session_done_during_task ddl_attributes'
# G13 pulsar mtls authentication enabled
'tiflash region_merge common_1'
# G14
'changefeed_finish force_replicate_table'
# G15
'new_ci_collation batch_add_table multi_rocks ci_collation_compatibility'
# G16, currently G16 is not running in kafka pipeline
'owner_resign processor_etcd_worker_delay sink_hang'
# G17
'clustered_index processor_resolved_ts_fallback'
>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))
# only run the following tests in mysql pipeline
["G18"]='availability http_proxies sequence'
["G19"]='changefeed_fast_fail batch_update_to_no_batch changefeed_resume_with_checkpoint_ts'
Expand Down

0 comments on commit db46721

Please sign in to comment.