From c58d75e08e6bc5336f570143690fc3877669486a Mon Sep 17 00:00:00 2001 From: w41ter Date: Wed, 18 Dec 2024 20:10:31 +0800 Subject: [PATCH] Filter all binlogs of the dropped table during partial snapshot --- pkg/ccr/job.go | 21 ++- .../drop/alter/test_cds_tbl_alter_drop.groovy | 29 +++- .../test_cds_tbl_idx_inverted_drop.groovy | 149 ++++++++++++++++++ 3 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 regression-test/suites/cross_ds/table/drop/idx_inverted/test_cds_tbl_idx_inverted_drop.groovy diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index c4a38b76..af054d4e 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -359,15 +359,22 @@ func (j *Job) handlePartialSyncTableNotFound() error { if dropped, err := j.isTableDropped(tableId); err != nil { return err + } else if dropped && j.SyncType == TableSync { + return xerror.Errorf(xerror.Normal, "table sync but table %s has been dropped, table id %d", + table, tableId) } else if dropped { // skip this partial sync because table has been dropped log.Warnf("skip this partial sync because table %s has been dropped, table id: %d", table, tableId) nextCommitSeq := j.progress.CommitSeq - if j.SyncType == DBSync { - j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "") - } else { - j.progress.NextWithPersist(nextCommitSeq, TableIncrementalSync, Done, "") + // Since we don't know the commit seq of the drop table binlog, we set it to the max value to + // skip all binlogs. + // + // FIXME: it will skip drop table binlog too. + if len(j.progress.TableCommitSeqMap) == 0 { + j.progress.TableCommitSeqMap = make(map[int64]int64) } + j.progress.TableCommitSeqMap[tableId] = math.MaxInt64 + j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "") return nil } else if newTableName, err := j.srcMeta.GetTableNameById(tableId); err != nil { return err @@ -1931,11 +1938,17 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error { if _, ok := j.progress.TableMapping[dropTable.TableId]; !ok { log.Warnf("the dest table is not found, skip drop table binlog, src table id: %d, commit seq: %d", dropTable.TableId, binlog.GetCommitSeq()) + // So that the sync state would convert to DBIncrementalSync, + // see handlePartialSyncTableNotFound for details. + delete(j.progress.TableCommitSeqMap, dropTable.TableId) return nil } } if j.isBinlogCommitted(dropTable.TableId, binlog.GetCommitSeq()) { + // So that the sync state would convert to DBIncrementalSync, + // see handlePartialSyncTableNotFound for details. + delete(j.progress.TableCommitSeqMap, dropTable.TableId) return nil } diff --git a/regression-test/suites/cross_ds/table/drop/alter/test_cds_tbl_alter_drop.groovy b/regression-test/suites/cross_ds/table/drop/alter/test_cds_tbl_alter_drop.groovy index 32d6509e..3e981a0a 100644 --- a/regression-test/suites/cross_ds/table/drop/alter/test_cds_tbl_alter_drop.groovy +++ b/regression-test/suites/cross_ds/table/drop/alter/test_cds_tbl_alter_drop.groovy @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -suite("test_cds_tbl_alter_drop") { +suite('test_cds_tbl_alter_drop') { def helper = new GroovyShell(new Binding(['suite': delegate])) - .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + .evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy')) if (!helper.is_version_supported([30003, 20108, 20016])) { // at least doris 3.0.3, 2.1.8 and doris 2.0.16 @@ -26,8 +26,8 @@ suite("test_cds_tbl_alter_drop") { return } - def oldTableName = "tbl_old_" + helper.randomSuffix() - def newTableName = "tbl_new_" + helper.randomSuffix() + def oldTableName = 'tbl_old_' + helper.randomSuffix() + def newTableName = 'tbl_new_' + helper.randomSuffix() def exist = { res -> Boolean return res.size() != 0 @@ -35,8 +35,11 @@ suite("test_cds_tbl_alter_drop") { def notExist = { res -> Boolean return res.size() == 0 } + def has_count = { count -> { res -> Boolean + return res.size() == count + } } - logger.info("=== Create a fake table ===") + logger.info('=== Create a fake table ===') sql """ CREATE TABLE if NOT EXISTS ${oldTableName}_fake ( @@ -66,7 +69,7 @@ suite("test_cds_tbl_alter_drop") { assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}_fake", 60)) - logger.info(" ==== create table and drop ==== ") + logger.info(' ==== create table and drop ==== ') def first_job_progress = helper.get_job_progress() @@ -105,14 +108,24 @@ suite("test_cds_tbl_alter_drop") { """, exist, 30)) - sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1)" + // All binlogs of the dropped table should be ignored. + sql "ALTER TABLE ${oldTableName} ADD COLUMN `value_col` INT DEFAULT \"0\"" + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${oldTableName}" AND State = "FINISHED" + """, + has_count(2), 30)) + + sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1, 2)" sql "DROP TABLE ${oldTableName} FORCE" sql "INSERT INTO ${oldTableName}_fake VALUES (5, 500)" helper.ccrJobResume() assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}_fake", 1, 60)) - assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, "target")) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, 'target')) // no fullsync are triggered def last_job_progress = helper.get_job_progress() diff --git a/regression-test/suites/cross_ds/table/drop/idx_inverted/test_cds_tbl_idx_inverted_drop.groovy b/regression-test/suites/cross_ds/table/drop/idx_inverted/test_cds_tbl_idx_inverted_drop.groovy new file mode 100644 index 00000000..e5e7b1d1 --- /dev/null +++ b/regression-test/suites/cross_ds/table/drop/idx_inverted/test_cds_tbl_idx_inverted_drop.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_cds_tbl_idx_inverted_drop') { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy')) + + if (!helper.is_version_supported([30003, 20108, 20016])) { + // at least doris 3.0.3, 2.1.8 and doris 2.0.16 + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + + def tableName = 'tbl_' + helper.randomSuffix() + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + def has_count = { count -> { res -> Boolean + return res.size() == count + } } + + logger.info('=== Create a fake table ===') + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_fake + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.enableDbBinlog() + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_fake", 60)) + + logger.info(' ==== create table and drop ==== ') + + def first_job_progress = helper.get_job_progress() + + helper.ccrJobPause() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT, + `value` STRING NULL + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `p1` VALUES LESS THAN ("0"), + PARTITION `p2` VALUES LESS THAN ("100"), + PARTITION `p3` VALUES LESS THAN ("200"), + PARTITION `p4` VALUES LESS THAN ("300"), + PARTITION `p5` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + sql "INSERT INTO ${tableName} VALUES (1, 100, ''), (100, 1, ''), (2, 200, ''), (200, 2, '')" + + logger.info('=== add inverted index ===') + sql """ + CREATE INDEX idx_inverted ON ${tableName} (value) USING INVERTED + """ + sql 'sync' + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${tableName}" AND State = "FINISHED" + """, + has_count(1), 30)) + + sql """ INSERT INTO ${tableName} VALUES (1, 1, "1") """ + + def show_indexes_result = sql "show indexes from ${tableName}" + logger.info("show indexes: ${show_indexes_result}") + + // The drop index will be ignored since table is dropped + sql """ + DROP INDEX idx_inverted ON ${tableName} + """ + sql 'sync' + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${tableName}" AND State = "FINISHED" + """, + has_count(2), 30)) + + show_indexes_result = sql "show indexes from ${tableName}" + logger.info("show indexes: ${show_indexes_result}") + + sql "INSERT INTO ${tableName} VALUES (5, 500, 'test')" + sql "DROP TABLE ${tableName} FORCE" + sql "INSERT INTO ${tableName}_fake VALUES (5, 500)" + + helper.ccrJobResume() + + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_fake", 1, 60)) + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", notExist, 60, 'target')) + + // no fullsync are triggered + def last_job_progress = helper.get_job_progress() + assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at) +}