diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 678fda6cbd621..9dc4c3744d92c 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1493,6 +1493,12 @@ func restoreStream( return errors.Annotate(err, "failed to restore kv files") } + failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(10 * time.Second) + } + }) + if err = client.CleanUpKVFiles(ctx); err != nil { return errors.Annotate(err, "failed to clean up") } diff --git a/br/tests/br_pitr_long_running_schema_loading/run.sh b/br/tests/br_pitr_long_running_schema_loading/run.sh new file mode 100644 index 0000000000000..3555d5c710d51 --- /dev/null +++ b/br/tests/br_pitr_long_running_schema_loading/run.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +TASK_NAME="pitr_long_running_schema_loading" +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +DB="$TEST_NAME" + +restart_services + +run_sql "CREATE SCHEMA $DB;" + +# start the log backup +run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + +run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));" +run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');" + + +# do a full backup +run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full" + +run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');" +run_sql "USE $DB; DROP TABLE t1;" +run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);" +run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');" + +# wait checkpoint advance +sleep 10 +current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") +echo "current ts: $current_ts" +i=0 +while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --skip-goleak --pd $PD_ADDR log status --task-name $TASK_NAME --json 2>br.log) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi +done + +restart_services + +export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)" +run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" +export GO_FAILPOINTS="" diff --git a/br/tests/config/tikv.toml b/br/tests/config/tikv.toml index 22126549ab848..07eba85bc268d 100644 --- a/br/tests/config/tikv.toml +++ b/br/tests/config/tikv.toml @@ -37,5 +37,5 @@ path = "/tmp/backup_restore_test/master-key-file" [log-backup] max-flush-interval = "50s" [gc] -ratio-threshold = 1.1 +ratio-threshold = -1.0 diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index ec72d51a636d9..36e9147addbfe 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -356,6 +356,16 @@ func (do *Domain) loadInfoSchema(startTS uint64, isSnapshot bool) (infoschema.In // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } + + // add failpoint to simulate long-running schema load case + failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) { + if val.(bool) && neededSchemaVersion-currentSchemaVersion >= (LoadSchemaDiffVersionGapThreshold+64) { + // this will block until the failpoint is disabled + logutil.BgLogger().Info("###### blocking loading schema") + <-make(chan struct{}) + } + }) + // full load. schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 617821cfc7577..8b9d3f89f9b08 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -261,7 +261,7 @@ start_tiflash() { i=0 while ! run_curl "https://$TIFLASH_HTTP/tiflash/store-status" 1>/dev/null 2>&1; do i=$((i+1)) - if [ "$i" -gt 20 ]; then + if [ "$i" -gt 1 ]; then echo "failed to start tiflash" return 1 fi