Skip to content

Commit

Permalink
Refresh dest table id after partial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Sep 24, 2024
1 parent 5c8bc03 commit c1ed82f
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ func (j *Job) partialSync() error {
case PersistRestoreInfo:
// Step 5: Update job progress && dest table id
// update job info, only for dest table id
var targetName = table
if alias, ok := j.progress.TableAliases[table]; ok {
targetName := table
if j.isTableSyncWithAlias() {
targetName = j.Dest.Table
}
Expand All @@ -519,10 +519,21 @@ func (j *Job) partialSync() error {
}

log.Infof("partial sync status: persist restore info")
destTable, err := j.destMeta.UpdateTable(targetName, 0)
if err != nil {
return err
}
switch j.SyncType {
case DBSync:
srcTableId, err := j.srcMeta.GetTableId(table)
if err != nil {
return err
}
j.progress.TableMapping[srcTableId] = destTable.Id
j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "")
case TableSync:
j.Dest.TableId = destTable.Id
j.progress.TableMapping = nil
j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "")
default:
return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_db_partial_sync_cache") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

def tableName = "tbl_partial_sync_cache_" + UUID.randomUUID().toString().replace("-", "")
def test_num = 0
def insert_num = 5

def exist = { res -> Boolean
return res.size() != 0
}

def has_count = { count ->
return { res -> Boolean
res.size() == count
}
}

def get_ccr_name = { ccr_body_json ->
def jsonSlurper = new groovy.json.JsonSlurper()
def object = jsonSlurper.parseText "${ccr_body_json}"
return object.name
}

def get_job_progress = { ccr_name ->
def request_body = """ {"name":"${ccr_name}"} """
def get_job_progress_uri = { check_func ->
httpTest {
uri "/job_progress"
endpoint helper.syncerAddress
body request_body
op "post"
check check_func
}
}

def result = null
get_job_progress_uri.call() { code, body ->
if (!"${code}".toString().equals("200")) {
throw "request failed, code: ${code}, body: ${body}"
}
def jsonSlurper = new groovy.json.JsonSlurper()
def object = jsonSlurper.parseText "${body}"
if (!object.success) {
throw "request failed, error msg: ${object.error_msg}"
}
logger.info("job progress: ${object.job_progress}")
result = jsonSlurper.parseText object.job_progress
}
return result
}

helper.enableDbBinlog()
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT,
`value` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

def values = [];
for (int index = 0; index < insert_num; index++) {
values.add("(${test_num}, ${index}, ${index})")
}
sql """
INSERT INTO ${tableName} VALUES ${values.join(",")}
"""
sql "sync"

def bodyJson = get_ccr_body ""
ccr_name = get_ccr_name(bodyJson)
helper.ccrJobCreate()
logger.info("ccr job name: ${ccr_name}")

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 60))

first_job_progress = get_job_progress(ccr_name)

logger.info("=== Test 1: add first column case ===")
// binlog type: ALTER_JOB, binlog data:
// {
// "type":"SCHEMA_CHANGE",
// "dbId":11049,
// "tableId":11058,
// "tableName":"tbl_add_column6ab3b514b63c4368aa0a0149da0acabd",
// "jobId":11076,
// "jobState":"FINISHED",
// "rawSql":"ALTER TABLE `regression_test_schema_change`.`tbl_add_column6ab3b514b63c4368aa0a0149da0acabd` ADD COLUMN `first` int NULL DEFAULT \"0\" COMMENT \"\" FIRST"
// }
sql """
ALTER TABLE ${tableName}
ADD COLUMN `first` INT KEY DEFAULT "0" FIRST
"""
sql "sync"

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(1), 30))

def has_column_first = { res -> Boolean
// Field == 'first' && 'Key' == 'YES'
return res[0][0] == 'first' && (res[0][3] == 'YES' || res[0][3] == 'true')
}

assertTrue(helper.checkShowTimesOf("SHOW COLUMNS FROM `${tableName}`", has_column_first, 60, "target_sql"))

sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 123)"

// cache must be clear and reload.
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num + 1, 60))

// no full sync triggered.
last_job_progress = get_job_progress(ccr_name)
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_table_partial_sync_cache") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

def tableName = "tbl_partial_sync_cache_" + UUID.randomUUID().toString().replace("-", "")
def test_num = 0
def insert_num = 5

def exist = { res -> Boolean
return res.size() != 0
}

def has_count = { count ->
return { res -> Boolean
res.size() == count
}
}

def get_ccr_name = { ccr_body_json ->
def jsonSlurper = new groovy.json.JsonSlurper()
def object = jsonSlurper.parseText "${ccr_body_json}"
return object.name
}

def get_job_progress = { ccr_name ->
def request_body = """ {"name":"${ccr_name}"} """
def get_job_progress_uri = { check_func ->
httpTest {
uri "/job_progress"
endpoint helper.syncerAddress
body request_body
op "post"
check check_func
}
}

def result = null
get_job_progress_uri.call() { code, body ->
if (!"${code}".toString().equals("200")) {
throw "request failed, code: ${code}, body: ${body}"
}
def jsonSlurper = new groovy.json.JsonSlurper()
def object = jsonSlurper.parseText "${body}"
if (!object.success) {
throw "request failed, error msg: ${object.error_msg}"
}
logger.info("job progress: ${object.job_progress}")
result = jsonSlurper.parseText object.job_progress
}
return result
}

sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT,
`value` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

def values = [];
for (int index = 0; index < insert_num; index++) {
values.add("(${test_num}, ${index}, ${index})")
}
sql """
INSERT INTO ${tableName} VALUES ${values.join(",")}
"""
sql "sync"

def bodyJson = get_ccr_body "${tableName}"
ccr_name = get_ccr_name(bodyJson)
helper.ccrJobCreate(tableName)
logger.info("ccr job name: ${ccr_name}")

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 60))

first_job_progress = get_job_progress(ccr_name)

logger.info("=== Test 1: add first column case ===")
// binlog type: ALTER_JOB, binlog data:
// {
// "type":"SCHEMA_CHANGE",
// "dbId":11049,
// "tableId":11058,
// "tableName":"tbl_add_column6ab3b514b63c4368aa0a0149da0acabd",
// "jobId":11076,
// "jobState":"FINISHED",
// "rawSql":"ALTER TABLE `regression_test_schema_change`.`tbl_add_column6ab3b514b63c4368aa0a0149da0acabd` ADD COLUMN `first` int NULL DEFAULT \"0\" COMMENT \"\" FIRST"
// }
sql """
ALTER TABLE ${tableName}
ADD COLUMN `first` INT KEY DEFAULT "0" FIRST
"""
sql "sync"

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(1), 30))

def has_column_first = { res -> Boolean
// Field == 'first' && 'Key' == 'YES'
return res[0][0] == 'first' && (res[0][3] == 'YES' || res[0][3] == 'true')
}

assertTrue(helper.checkShowTimesOf("SHOW COLUMNS FROM `${tableName}`", has_column_first, 60, "target_sql"))

sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 123)"

// cache must be clear and reload.
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num + 1, 60))

// no full sync triggered.
last_job_progress = get_job_progress(ccr_name)
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}


0 comments on commit c1ed82f

Please sign in to comment.