diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 6b03ea4a..9faf13d9 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -492,8 +492,8 @@ func (j *Job) partialSync() error { case PersistRestoreInfo: // Step 5: Update job progress && dest table id // update job info, only for dest table id + var targetName = table if alias, ok := j.progress.TableAliases[table]; ok { - targetName := table if j.isTableSyncWithAlias() { targetName = j.Dest.Table } @@ -519,10 +519,21 @@ func (j *Job) partialSync() error { } log.Infof("partial sync status: persist restore info") + destTable, err := j.destMeta.UpdateTable(targetName, 0) + if err != nil { + return err + } switch j.SyncType { case DBSync: + srcTableId, err := j.srcMeta.GetTableId(table) + if err != nil { + return err + } + j.progress.TableMapping[srcTableId] = destTable.Id j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "") case TableSync: + j.Dest.TableId = destTable.Id + j.progress.TableMapping = nil j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "") default: return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType) diff --git a/regression-test/suites/db-partial-sync-cache/test_db_partial_sync_cache.groovy b/regression-test/suites/db-partial-sync-cache/test_db_partial_sync_cache.groovy new file mode 100644 index 00000000..54d45f20 --- /dev/null +++ b/regression-test/suites/db-partial-sync-cache/test_db_partial_sync_cache.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_db_partial_sync_cache") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_partial_sync_cache_" + UUID.randomUUID().toString().replace("-", "") + def test_num = 0 + def insert_num = 5 + + def exist = { res -> Boolean + return res.size() != 0 + } + + def has_count = { count -> + return { res -> Boolean + res.size() == count + } + } + + def get_ccr_name = { ccr_body_json -> + def jsonSlurper = new groovy.json.JsonSlurper() + def object = jsonSlurper.parseText "${ccr_body_json}" + return object.name + } + + def get_job_progress = { ccr_name -> + def request_body = """ {"name":"${ccr_name}"} """ + def get_job_progress_uri = { check_func -> + httpTest { + uri "/job_progress" + endpoint helper.syncerAddress + body request_body + op "post" + check check_func + } + } + + def result = null + get_job_progress_uri.call() { code, body -> + if (!"${code}".toString().equals("200")) { + throw "request failed, code: ${code}, body: ${body}" + } + def jsonSlurper = new groovy.json.JsonSlurper() + def object = jsonSlurper.parseText "${body}" + if (!object.success) { + throw "request failed, error msg: ${object.error_msg}" + } + logger.info("job progress: ${object.job_progress}") + result = jsonSlurper.parseText object.job_progress + } + return result + } + + helper.enableDbBinlog() + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT, + `value` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + def values = []; + for (int index = 0; index < insert_num; index++) { + values.add("(${test_num}, ${index}, ${index})") + } + sql """ + INSERT INTO ${tableName} VALUES ${values.join(",")} + """ + sql "sync" + + def bodyJson = get_ccr_body "" + ccr_name = get_ccr_name(bodyJson) + helper.ccrJobCreate() + logger.info("ccr job name: ${ccr_name}") + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 60)) + + first_job_progress = get_job_progress(ccr_name) + + logger.info("=== Test 1: add first column case ===") + // binlog type: ALTER_JOB, binlog data: + // { + // "type":"SCHEMA_CHANGE", + // "dbId":11049, + // "tableId":11058, + // "tableName":"tbl_add_column6ab3b514b63c4368aa0a0149da0acabd", + // "jobId":11076, + // "jobState":"FINISHED", + // "rawSql":"ALTER TABLE `regression_test_schema_change`.`tbl_add_column6ab3b514b63c4368aa0a0149da0acabd` ADD COLUMN `first` int NULL DEFAULT \"0\" COMMENT \"\" FIRST" + // } + sql """ + ALTER TABLE ${tableName} + ADD COLUMN `first` INT KEY DEFAULT "0" FIRST + """ + sql "sync" + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${tableName}" AND State = "FINISHED" + """, + has_count(1), 30)) + + def has_column_first = { res -> Boolean + // Field == 'first' && 'Key' == 'YES' + return res[0][0] == 'first' && (res[0][3] == 'YES' || res[0][3] == 'true') + } + + assertTrue(helper.checkShowTimesOf("SHOW COLUMNS FROM `${tableName}`", has_column_first, 60, "target_sql")) + + sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 123)" + + // cache must be clear and reload. + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num + 1, 60)) + + // no full sync triggered. + last_job_progress = get_job_progress(ccr_name) + assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at) +} + diff --git a/regression-test/suites/table-partial-sync-cache/test_table_partial_sync_cache.groovy b/regression-test/suites/table-partial-sync-cache/test_table_partial_sync_cache.groovy new file mode 100644 index 00000000..a08d65e7 --- /dev/null +++ b/regression-test/suites/table-partial-sync-cache/test_table_partial_sync_cache.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_table_partial_sync_cache") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_partial_sync_cache_" + UUID.randomUUID().toString().replace("-", "") + def test_num = 0 + def insert_num = 5 + + def exist = { res -> Boolean + return res.size() != 0 + } + + def has_count = { count -> + return { res -> Boolean + res.size() == count + } + } + + def get_ccr_name = { ccr_body_json -> + def jsonSlurper = new groovy.json.JsonSlurper() + def object = jsonSlurper.parseText "${ccr_body_json}" + return object.name + } + + def get_job_progress = { ccr_name -> + def request_body = """ {"name":"${ccr_name}"} """ + def get_job_progress_uri = { check_func -> + httpTest { + uri "/job_progress" + endpoint helper.syncerAddress + body request_body + op "post" + check check_func + } + } + + def result = null + get_job_progress_uri.call() { code, body -> + if (!"${code}".toString().equals("200")) { + throw "request failed, code: ${code}, body: ${body}" + } + def jsonSlurper = new groovy.json.JsonSlurper() + def object = jsonSlurper.parseText "${body}" + if (!object.success) { + throw "request failed, error msg: ${object.error_msg}" + } + logger.info("job progress: ${object.job_progress}") + result = jsonSlurper.parseText object.job_progress + } + return result + } + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT, + `value` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + def values = []; + for (int index = 0; index < insert_num; index++) { + values.add("(${test_num}, ${index}, ${index})") + } + sql """ + INSERT INTO ${tableName} VALUES ${values.join(",")} + """ + sql "sync" + + def bodyJson = get_ccr_body "${tableName}" + ccr_name = get_ccr_name(bodyJson) + helper.ccrJobCreate(tableName) + logger.info("ccr job name: ${ccr_name}") + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num, 60)) + + first_job_progress = get_job_progress(ccr_name) + + logger.info("=== Test 1: add first column case ===") + // binlog type: ALTER_JOB, binlog data: + // { + // "type":"SCHEMA_CHANGE", + // "dbId":11049, + // "tableId":11058, + // "tableName":"tbl_add_column6ab3b514b63c4368aa0a0149da0acabd", + // "jobId":11076, + // "jobState":"FINISHED", + // "rawSql":"ALTER TABLE `regression_test_schema_change`.`tbl_add_column6ab3b514b63c4368aa0a0149da0acabd` ADD COLUMN `first` int NULL DEFAULT \"0\" COMMENT \"\" FIRST" + // } + sql """ + ALTER TABLE ${tableName} + ADD COLUMN `first` INT KEY DEFAULT "0" FIRST + """ + sql "sync" + + assertTrue(helper.checkShowTimesOf(""" + SHOW ALTER TABLE COLUMN + FROM ${context.dbName} + WHERE TableName = "${tableName}" AND State = "FINISHED" + """, + has_count(1), 30)) + + def has_column_first = { res -> Boolean + // Field == 'first' && 'Key' == 'YES' + return res[0][0] == 'first' && (res[0][3] == 'YES' || res[0][3] == 'true') + } + + assertTrue(helper.checkShowTimesOf("SHOW COLUMNS FROM `${tableName}`", has_column_first, 60, "target_sql")) + + sql "INSERT INTO ${tableName} VALUES (123, 123, 123, 123)" + + // cache must be clear and reload. + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}", insert_num + 1, 60)) + + // no full sync triggered. + last_job_progress = get_job_progress(ccr_name) + assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at) +} + +