diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index a960b20f..99efdc5e 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -757,24 +757,27 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { return false, nil } -func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) { +func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) { log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { - return "", err + return "", false, err } else if restoreState == RestoreStateFinished { - return "", nil + return "", false, nil } else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { - pattern := regexp.MustCompile("Table (?P.*) already exist but with different schema") + pattern := regexp.MustCompile("(?PTable|View) (?P.*) already exist but with different schema") matches := pattern.FindStringSubmatch(status) index := pattern.SubexpIndex("tableName") - if len(matches) < index && len(matches[index]) == 0 { - return "", xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + if len(matches) == 0 || index == -1 || len(matches[index]) == 0 { + return "", false, xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) } - return matches[index], nil + + resource := matches[pattern.SubexpIndex("tableOrView")] + tableOrView := resource == "Table" + return matches[index], tableOrView, nil } else if restoreState == RestoreStateCancelled { - return "", xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + return "", false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) } else { // RestoreStatePending, RestoreStateUnknown time.Sleep(RESTORE_CHECK_DURATION) @@ -782,7 +785,7 @@ func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, } log.Warnf("get restore signature not matched timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName) - return "", nil + return "", false, nil } func (s *Spec) waitTransactionDone(txnId int64) error { diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 288b4a4d..5ac2b5f4 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -28,7 +28,7 @@ type Specer interface { CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) CreateSnapshotAndWaitForDone(tables []string) (string, error) CheckRestoreFinished(snapshotName string) (bool, error) - GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) + GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) WaitTransactionDone(txnId int64) // busy wait LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index fe35615e..e78a2f09 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -613,7 +613,7 @@ func (j *Job) fullSync() error { if err != nil { return err } - log.Debugf("job info size: %d, bytes: %s", len(jobInfoBytes), string(jobInfoBytes)) + log.Debugf("job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes)) snapshotResp.SetJobInfo(jobInfoBytes) var commitSeq int64 = math.MaxInt64 @@ -685,21 +685,33 @@ func (j *Job) fullSync() error { if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) { // We need rebuild the exists table. var tableName string + var tableOrView bool = true if j.SyncType == TableSync { tableName = j.Dest.Table } else { - tableName, err = j.IDest.GetRestoreSignatureNotMatchedTable(restoreSnapshotName) + tableName, tableOrView, err = j.IDest.GetRestoreSignatureNotMatchedTableOrView(restoreSnapshotName) if err != nil || len(tableName) == 0 { continue } } - log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName) + + resource := "table" + if !tableOrView { + resource = "view" + } + log.Infof("the signature of %s %s is not matched with the target table in snapshot", resource, tableName) for { - if err := j.IDest.DropTable(tableName, false); err == nil { - break + if tableOrView { + if err := j.IDest.DropTable(tableName, false); err == nil { + break + } + } else { + if err := j.IDest.DropView(tableName); err == nil { + break + } } } - log.Infof("the restore is cancelled, the unmatched table %s is dropped, restore snapshot again", tableName) + log.Infof("the restore is cancelled, the unmatched %s %s is dropped, restore snapshot again", resource, tableName) break } else if err != nil { return err diff --git a/regression-test/suites/db-sync-view/test_sync_view_twice.groovy b/regression-test/suites/db-sync-view/test_sync_view_twice.groovy new file mode 100644 index 00000000..bb616c5d --- /dev/null +++ b/regression-test/suites/db-sync-view/test_sync_view_twice.groovy @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_sync_view_twice") { + def syncerAddress = "127.0.0.1:9190" + def sync_gap_time = 5000 + def createDuplicateTable = { tableName -> + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + user_id BIGINT NOT NULL COMMENT "用户 ID", + name VARCHAR(20) COMMENT "用户姓名", + age INT COMMENT "用户年龄" + ) + ENGINE=OLAP + DUPLICATE KEY(user_id) + DISTRIBUTED BY HASH(user_id) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + } + + def checkShowTimesOf = { sqlString, checkFunc, times, func = "sql" -> Boolean + List> res + while (times > 0) { + try { + if (func == "sql") { + res = sql "${sqlString}" + } else { + res = target_sql "${sqlString}" + } + + if (checkFunc.call(res)) { + return true + } + } catch (Exception e) { + logger.warn("Exception: ${e}") + } + + if (--times > 0) { + sleep(sync_gap_time) + } + } + + return false + } + + def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean + def tmpRes = target_sql "${sqlString}" + while (tmpRes.size() != rowSize) { + sleep(sync_gap_time) + if (--times > 0) { + tmpRes = target_sql "${sqlString}" + } else { + break + } + } + return tmpRes.size() == rowSize + } + + def checkRestoreFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + ret = (row[4] as String) == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkBackupFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = sql "SHOW BACKUP FROM ${context.dbName}" + for (List row : sqlInfo) { + if ((row[4] as String).contains(checkTable)) { + ret = row[3] == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkRestoreAllFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = true + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + if ((row[4] as String) != "FINISHED") { + ret = false + } + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + + } + + return ret + } + + def checkRestoreRowsTimesOf = {rowSize, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + if (sqlInfo.size() >= rowSize) { + ret = true + break + } else if (--times > 0 && sqlInfo.size() < rowSize) { + sleep(sync_gap_time) + } + } + + return ret + } + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + def suffix = UUID.randomUUID().toString().replace("-", "") + def tableDuplicate0 = "tbl_duplicate_0_${suffix}" + createDuplicateTable(tableDuplicate0) + sql """ + INSERT INTO ${tableDuplicate0} VALUES + (1, "Emily", 25), + (2, "Benjamin", 35), + (3, "Olivia", 28), + (4, "Alexander", 60), + (5, "Ava", 17); + """ + + sql "ALTER DATABASE ${context.dbName} SET properties (\"binlog.enable\" = \"true\")" + + logger.info("=== Test1: create view ===") + sql """ + CREATE VIEW view_test_${suffix} (k1, name, v1) + AS + SELECT user_id as k1, name, SUM(age) FROM ${tableDuplicate0} + GROUP BY k1,name; + """ + + String response + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "" + body "${bodyJson}" + op "post" + result response + } + + assertTrue(checkRestoreFinishTimesOf("${tableDuplicate0}", 30)) + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableDuplicate0}", 5, 30)) + + // the view will be restored again. + logger.info("=== Test 2: delete job and create it again ===") + test_num = 5 + httpTest { + uri "/delete" + endpoint syncerAddress + def bodyJson = get_ccr_body "" + body "${bodyJson}" + op "post" + result response + } + + sql """ + INSERT INTO ${tableDuplicate0} VALUES (6, "Zhangsan", 31) + """ + sql "sync" + + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "" + body "${bodyJson}" + op "post" + result response + } + + // first, check backup + sleep(15000) + assertTrue(checkBackupFinishTimesOf("${tableDuplicate0}", 60)) + + // then, check retore + sleep(15000) + assertTrue(checkRestoreRowsTimesOf(2, 30)) + assertTrue(checkRestoreFinishTimesOf("${tableDuplicate0}", 30)) + + assertTrue(checkSelectTimesOf("SELECT * FROM ${tableDuplicate0}", 6, 50)) + def view_size = target_sql "SHOW VIEW FROM ${tableDuplicate0}" + assertTrue(view_size.size() == 1); +}