Skip to content

Commit

Permalink
Fix partial sync with incremental data (#186)
Browse files Browse the repository at this point in the history
The table restored via partial sync, should reset the commit seq to the snapshot seq
  • Loading branch information
w41ter authored Sep 25, 2024
1 parent 7dce61d commit 719d117
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 10 deletions.
37 changes: 31 additions & 6 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) {
// Like fullSync, but only backup and restore partial of the partitions of a table.
func (j *Job) partialSync() error {
type inMemoryData struct {
SnapshotName string `json:"snapshot_name"`
SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"`
SnapshotName string `json:"snapshot_name"`
SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"`
TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"`
}

if j.progress.PartialSyncData == nil {
Expand Down Expand Up @@ -385,9 +386,25 @@ func (j *Job) partialSync() error {
}

log.Tracef("job: %.128s", snapshotResp.GetJobInfo())
if !snapshotResp.IsSetJobInfo() {
return xerror.New(xerror.Normal, "jobInfo is not set")
}

tableCommitSeqMap, err := ExtractTableCommitSeqMap(snapshotResp.GetJobInfo())
if err != nil {
return err
}

if j.SyncType == TableSync {
if _, ok := tableCommitSeqMap[j.Src.TableId]; !ok {
return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId)
}
}

inMemoryData := &inMemoryData{
SnapshotName: snapshotName,
SnapshotResp: snapshotResp,
SnapshotName: snapshotName,
SnapshotResp: snapshotResp,
TableCommitSeqMap: tableCommitSeqMap,
}
j.progress.NextSubVolatile(AddExtraInfo, inMemoryData)

Expand All @@ -410,6 +427,8 @@ func (j *Job) partialSync() error {
log.Debugf("partial sync job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes))
snapshotResp.SetJobInfo(jobInfoBytes)

// save the entire commit seq map, this value will be used in PersistRestoreInfo.
j.progress.TableCommitSeqMap = inMemoryData.TableCommitSeqMap
j.progress.NextSubCheckpoint(RestoreSnapshot, inMemoryData)

case RestoreSnapshot:
Expand Down Expand Up @@ -535,9 +554,14 @@ func (j *Job) partialSync() error {
j.progress.TableMapping[srcTableId] = destTable.Id
j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "")
case TableSync:
commitSeq, ok := j.progress.TableCommitSeqMap[j.Src.TableId]
if !ok {
return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId)
}
j.Dest.TableId = destTable.Id
j.progress.TableMapping = nil
j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "")
j.progress.TableCommitSeqMap = nil
j.progress.NextWithPersist(commitSeq, TableIncrementalSync, Done, "")
default:
return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType)
}
Expand Down Expand Up @@ -959,7 +983,8 @@ func (j *Job) getDbSyncTableRecords(upsert *record.Upsert) ([]*record.TableRecor
tableRecords = append(tableRecords, tableRecord)
}
} else {
// TODO: check
// for db partial sync
tableRecords = append(tableRecords, tableRecord)
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/service/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,15 +655,14 @@ func (s *HttpService) featuresHandler(w http.ResponseWriter, r *http.Request) {
}
type flagListResult struct {
*defaultResult
Flags []flagValue
Flags []flagValue `json:"flags"`
}

var result flagListResult
result.defaultResult = newSuccessResult()
defer func() { writeJson(w, &result) }()

flag.VisitAll(func(flag *flag.Flag) {
fmt.Printf("Flag %s\n", flag.Name)
if !strings.HasPrefix(flag.Name, "feature") {
return
}
Expand Down
34 changes: 34 additions & 0 deletions regression-test/common/helper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,40 @@ class Helper {
}
return result
}

// test whether the ccr syncer has set a feature flag?
Boolean has_feature(name) {
def features_uri = { check_func ->
suite.httpTest {
uri "/features"
endpoint syncerAddress
body ""
op "get"
check check_func
}
}

def result = null
features_uri.call() { code, body ->
if (!"${code}".toString().equals("200")) {
throw "request failed, code: ${code}, body: ${body}"
}
def jsonSlurper = new groovy.json.JsonSlurper()
def object = jsonSlurper.parseText "${body}"
if (!object.success) {
throw "request failed, error msg: ${object.error_msg}"
}
suite.logger.info("features: ${object.flags}")
result = object.flags
}

for (def flag in result) {
if (flag.feature == name && flag.value) {
return true
}
}
return false
}
}

new Helper(suite)
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_db_partial_sync_incremental") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

if (!helper.has_feature("feature_schema_change_partial_sync")) {
logger.info("this suite require feature_schema_change_partial_sync set to true")
return
}

def tableName = "tbl_sync_incremental_" + UUID.randomUUID().toString().replace("-", "")
def tableName1 = "tbl_sync_incremental_1_" + UUID.randomUUID().toString().replace("-", "")
def test_num = 0
def insert_num = 5

def exist = { res -> Boolean
return res.size() != 0
}

def has_count = { count ->
return { res -> Boolean
res.size() == count
}
}

helper.enableDbBinlog()
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT,
`value` INT SUM
)
ENGINE=OLAP
AGGREGATE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""
sql "DROP TABLE IF EXISTS ${tableName1}"
sql """
CREATE TABLE if NOT EXISTS ${tableName1}
(
`test` INT,
`id` INT,
`value` INT SUM
)
ENGINE=OLAP
AGGREGATE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

def values = [];
for (int index = 0; index < insert_num; index++) {
values.add("(${test_num}, ${index}, ${index})")
}
sql """
INSERT INTO ${tableName} VALUES ${values.join(",")}
"""
sql """
INSERT INTO ${tableName1} VALUES ${values.join(",")}
"""
sql "sync"

helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 60))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1}", insert_num, 60))

def first_job_progress = helper.get_job_progress()

logger.info("=== pause job, add column and insert data")
helper.ccrJobPause()

// binlog type: ALTER_JOB, binlog data:
// {
// "type":"SCHEMA_CHANGE",
// "dbId":11049,
// "tableId":11058,
// "tableName":"tbl_add_column6ab3b514b63c4368aa0a0149da0acabd",
// "jobId":11076,
// "jobState":"FINISHED",
// "rawSql":"ALTER TABLE `regression_test_schema_change`.`tbl_add_column6ab3b514b63c4368aa0a0149da0acabd` ADD COLUMN `first` int NULL DEFAULT \"0\" COMMENT \"\" FIRST"
// }
sql """
ALTER TABLE ${tableName}
ADD COLUMN `first` INT KEY DEFAULT "0" FIRST
"""
sql "sync"

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(1), 30))

sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 1)"
sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 2)"
sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 3)"
sql "INSERT INTO ${tableName1} VALUES (123, 123, 1)"
sql "INSERT INTO ${tableName1} VALUES (123, 123, 2)"
sql "INSERT INTO ${tableName1} VALUES (123, 123, 3)"

helper.ccrJobResume()

def has_column_first = { res -> Boolean
// Field == 'first' && 'Key' == 'YES'
return res[0][0] == 'first' && (res[0][3] == 'YES' || res[0][3] == 'true')
}

assertTrue(helper.checkShowTimesOf("SHOW COLUMNS FROM `${tableName}`", has_column_first, 60, "target_sql"))

logger.info("the aggregate keys inserted should be synced accurately")
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num + 1, 60))
def last_record = target_sql "SELECT value FROM ${tableName} WHERE id = 123 AND test = 123"
logger.info("last record is ${last_record}")
assertTrue(last_record.size() == 1 && last_record[0][0] == 6)

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1}", insert_num + 1, 60))
last_record = target_sql "SELECT value FROM ${tableName1} WHERE id = 123 AND test = 123"
logger.info("last record of table ${tableName1} is ${last_record}")
assertTrue(last_record.size() == 1 && last_record[0][0] == 6)

// no full sync triggered.
def last_job_progress = helper.get_job_progress()
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}




Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ suite("test_table_partial_sync_cache") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

def tableName = "tbl_partial_sync_cache_" + UUID.randomUUID().toString().replace("-", "")
def tableName = "tbl_sync_cache_" + UUID.randomUUID().toString().replace("-", "")
def test_num = 0
def insert_num = 5

Expand Down
Loading

0 comments on commit 719d117

Please sign in to comment.