Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter all binlogs of the dropped table during partial snapshot #333

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,22 @@ func (j *Job) handlePartialSyncTableNotFound() error {

if dropped, err := j.isTableDropped(tableId); err != nil {
return err
} else if dropped && j.SyncType == TableSync {
return xerror.Errorf(xerror.Normal, "table sync but table %s has been dropped, table id %d",
table, tableId)
} else if dropped {
// skip this partial sync because table has been dropped
log.Warnf("skip this partial sync because table %s has been dropped, table id: %d", table, tableId)
nextCommitSeq := j.progress.CommitSeq
if j.SyncType == DBSync {
j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "")
} else {
j.progress.NextWithPersist(nextCommitSeq, TableIncrementalSync, Done, "")
// Since we don't know the commit seq of the drop table binlog, we set it to the max value to
// skip all binlogs.
//
// FIXME: it will skip drop table binlog too.
if len(j.progress.TableCommitSeqMap) == 0 {
j.progress.TableCommitSeqMap = make(map[int64]int64)
}
j.progress.TableCommitSeqMap[tableId] = math.MaxInt64
j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "")
return nil
} else if newTableName, err := j.srcMeta.GetTableNameById(tableId); err != nil {
return err
Expand Down Expand Up @@ -1931,11 +1938,17 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
if _, ok := j.progress.TableMapping[dropTable.TableId]; !ok {
log.Warnf("the dest table is not found, skip drop table binlog, src table id: %d, commit seq: %d",
dropTable.TableId, binlog.GetCommitSeq())
// So that the sync state would convert to DBIncrementalSync,
// see handlePartialSyncTableNotFound for details.
delete(j.progress.TableCommitSeqMap, dropTable.TableId)
return nil
}
}

if j.isBinlogCommitted(dropTable.TableId, binlog.GetCommitSeq()) {
// So that the sync state would convert to DBIncrementalSync,
// see handlePartialSyncTableNotFound for details.
delete(j.progress.TableCommitSeqMap, dropTable.TableId)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

suite("test_cds_tbl_alter_drop") {
suite('test_cds_tbl_alter_drop') {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))
.evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))

if (!helper.is_version_supported([30003, 20108, 20016])) {
// at least doris 3.0.3, 2.1.8 and doris 2.0.16
Expand All @@ -26,17 +26,20 @@ suite("test_cds_tbl_alter_drop") {
return
}

def oldTableName = "tbl_old_" + helper.randomSuffix()
def newTableName = "tbl_new_" + helper.randomSuffix()
def oldTableName = 'tbl_old_' + helper.randomSuffix()
def newTableName = 'tbl_new_' + helper.randomSuffix()

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}
def has_count = { count -> { res -> Boolean
return res.size() == count
} }

logger.info("=== Create a fake table ===")
logger.info('=== Create a fake table ===')
sql """
CREATE TABLE if NOT EXISTS ${oldTableName}_fake
(
Expand Down Expand Up @@ -66,7 +69,7 @@ suite("test_cds_tbl_alter_drop") {

assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}_fake", 60))

logger.info(" ==== create table and drop ==== ")
logger.info(' ==== create table and drop ==== ')

def first_job_progress = helper.get_job_progress()

Expand Down Expand Up @@ -105,14 +108,24 @@ suite("test_cds_tbl_alter_drop") {
""",
exist, 30))

sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1)"
// All binlogs of the dropped table should be ignored.
sql "ALTER TABLE ${oldTableName} ADD COLUMN `value_col` INT DEFAULT \"0\""

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${oldTableName}" AND State = "FINISHED"
""",
has_count(2), 30))

sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1, 2)"
sql "DROP TABLE ${oldTableName} FORCE"
sql "INSERT INTO ${oldTableName}_fake VALUES (5, 500)"

helper.ccrJobResume()

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}_fake", 1, 60))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, "target"))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, 'target'))

// no fullsync are triggered
def last_job_progress = helper.get_job_progress()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_cds_tbl_idx_inverted_drop') {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))

if (!helper.is_version_supported([30003, 20108, 20016])) {
// at least doris 3.0.3, 2.1.8 and doris 2.0.16
def version = helper.upstream_version()
logger.info("skip this suite because version is not supported, upstream version ${version}")
return
}

def tableName = 'tbl_' + helper.randomSuffix()

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}
def has_count = { count -> { res -> Boolean
return res.size() == count
} }

logger.info('=== Create a fake table ===')
sql """
CREATE TABLE if NOT EXISTS ${tableName}_fake
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

helper.enableDbBinlog()
helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_fake", 60))

logger.info(' ==== create table and drop ==== ')

def first_job_progress = helper.get_job_progress()

helper.ccrJobPause()

sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT,
`value` STRING NULL
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

sql "INSERT INTO ${tableName} VALUES (1, 100, ''), (100, 1, ''), (2, 200, ''), (200, 2, '')"

logger.info('=== add inverted index ===')
sql """
CREATE INDEX idx_inverted ON ${tableName} (value) USING INVERTED
"""
sql 'sync'

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(1), 30))

sql """ INSERT INTO ${tableName} VALUES (1, 1, "1") """

def show_indexes_result = sql "show indexes from ${tableName}"
logger.info("show indexes: ${show_indexes_result}")

// The drop index will be ignored since table is dropped
sql """
DROP INDEX idx_inverted ON ${tableName}
"""
sql 'sync'

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(2), 30))

show_indexes_result = sql "show indexes from ${tableName}"
logger.info("show indexes: ${show_indexes_result}")

sql "INSERT INTO ${tableName} VALUES (5, 500, 'test')"
sql "DROP TABLE ${tableName} FORCE"
sql "INSERT INTO ${tableName}_fake VALUES (5, 500)"

helper.ccrJobResume()

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_fake", 1, 60))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", notExist, 60, 'target'))

// no fullsync are triggered
def last_job_progress = helper.get_job_progress()
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}
Loading