diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index bc38cfe09e5606..5ea8135ce5168d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -40,6 +40,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED; public static final String PROP_RESERVE_REPLICA = "reserve_replica"; + public static final String PROP_RESERVE_COLOCATE = "reserve_colocate"; public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable"; public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; @@ -50,6 +51,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars private String backupTimestamp = null; private int metaVersion = -1; private boolean reserveReplica = false; + private boolean reserveColocate = false; private boolean reserveDynamicPartitionEnable = false; private boolean isLocal = false; private boolean isBeingSynced = false; @@ -91,6 +93,10 @@ public boolean reserveReplica() { return reserveReplica; } + public boolean reserveColocate() { + return reserveColocate; + } + public boolean reserveDynamicPartitionEnable() { return reserveDynamicPartitionEnable; } @@ -173,7 +179,8 @@ public void analyzeProperties() throws AnalysisException { if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) { reserveReplica = false; } - + // reserve colocate + reserveColocate = eatBooleanProperty(copiedProperties, PROP_RESERVE_COLOCATE, reserveColocate); // reserve dynamic partition enable reserveDynamicPartitionEnable = eatBooleanProperty( copiedProperties, PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, reserveDynamicPartitionEnable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 6f88881e3cb2a3..040ab729a5fd61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -558,15 +558,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone()); restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp, db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), - stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), + stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveColocate(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), - stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), - stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveColocate(), + stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(), + stmt.isCleanPartitions(), stmt.isAtomicRestore(), env, repository.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index de12670807f20e..b8c735842291cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -675,8 +675,6 @@ private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTab status = new Status(ErrCode.COMMON_ERROR, "failed to copy table: " + olapTable.getName()); return; } - - removeUnsupportProperties(copiedTbl); copiedTables.add(copiedTbl); } @@ -710,12 +708,6 @@ private void prepareBackupMetaForOdbcTableWithoutLock(OdbcTable odbcTable, List< } } - private void removeUnsupportProperties(OlapTable tbl) { - // We cannot support the colocate attribute because the colocate information is not backed up - // synchronously when backing up. - tbl.setColocateGroup(null); - } - private void waitingAllSnapshotsFinished() { if (unfinishedTaskIds.isEmpty()) { if (env.getEditLog().exceedMaxJournalSize(this)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6dfd02b3a42648..ff68011075129a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -68,6 +68,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.persist.ColocatePersistInfo; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; @@ -117,6 +118,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA; + private static final String PROP_RESERVE_COLOCATE = RestoreStmt.PROP_RESERVE_COLOCATE; private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE; private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED; @@ -172,6 +174,7 @@ public enum RestoreJobState { private ReplicaAllocation replicaAlloc; private boolean reserveReplica = false; + private boolean reserveColocate = false; private boolean reserveDynamicPartitionEnable = false; // this 2 members is to save all newly restored objs @@ -193,6 +196,8 @@ public enum RestoreJobState { private Map unfinishedSignatureToId = Maps.newConcurrentMap(); + private List colocatePersistInfos = Lists.newArrayList(); + // the meta version is used when reading backup meta from file. // we do not persist this field, because this is just a temporary solution. // the true meta version should be get from backup job info, which is saved when doing backup job. @@ -227,8 +232,8 @@ public RestoreJob(JobType jobType) { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, - boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { + boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, + boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -237,6 +242,7 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.state = RestoreJobState.PENDING; this.metaVersion = metaVersion; this.reserveReplica = reserveReplica; + this.reserveColocate = reserveColocate; // if backup snapshot is come from a cluster with force replication allocation, ignore the origin allocation if (jobInfo.isForceReplicationAllocation) { this.reserveReplica = false; @@ -247,6 +253,7 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.isCleanPartitions = isCleanPartitions; this.isAtomicRestore = isAtomicRestore; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); + properties.put(PROP_RESERVE_COLOCATE, String.valueOf(reserveColocate)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); @@ -256,11 +263,13 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, - boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { + boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, + boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, + BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, - repoId); + reserveColocate, reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, + isAtomicRestore, env, repoId); + this.backupMeta = backupMeta; } @@ -280,6 +289,10 @@ public boolean isBeingSynced() { return isBeingSynced; } + public List getColocatePersistInfos() { + return colocatePersistInfos; + } + public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) { if (checkTaskStatus(task, task.getJobId(), request)) { return false; @@ -690,6 +703,12 @@ private void checkAndPrepareMeta() { OlapTable localOlapTbl = (OlapTable) localTbl; OlapTable remoteOlapTbl = (OlapTable) remoteTbl; + if (localOlapTbl.isColocateTable() || (reserveColocate && remoteOlapTbl.isColocateTable())) { + status = new Status(ErrCode.COMMON_ERROR, "Not support to restore to local table " + + tableName + " with colocate group."); + return; + } + localOlapTbl.readLock(); try { List intersectPartNames = Lists.newArrayList(); @@ -806,7 +825,8 @@ private void checkAndPrepareMeta() { // reset all ids in this table String srcDbName = jobInfo.dbName; - Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica, srcDbName); + Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica, + reserveColocate, colocatePersistInfos, srcDbName); if (!st.ok()) { status = st; return; @@ -2112,6 +2132,7 @@ private Status allTabletCommitted(boolean isReplay) { state = RestoreJobState.FINISHED; env.getEditLog().logRestoreJob(this); + colocatePersistInfos.clear(); } LOG.info("job is finished. is replay: {}. {}", isReplay, this); @@ -2384,6 +2405,7 @@ private void cancelInternal(boolean isReplay) { state = RestoreJobState.CANCELLED; // log env.getEditLog().logRestoreJob(this); + colocatePersistInfos.clear(); LOG.info("finished to cancel restore job. current state: {}. is replay: {}. {}", curState.name(), isReplay, this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 32a4fd1517baa3..c811ee82251576 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -41,6 +41,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; @@ -57,6 +59,7 @@ import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.hint.UseMvHint; +import org.apache.doris.persist.ColocatePersistInfo; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; @@ -739,8 +742,6 @@ public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boo if (isBeingSynced) { setBeingSyncedProperties(); } - // remove colocate property. - setColocateGroup(null); } /** @@ -764,7 +765,8 @@ public void resetVersionForRestore() { } public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restoreReplicaAlloc, - boolean reserveReplica, String srcDbName) { + boolean reserveReplica, boolean reserveColocate, List colocatePersistInfos, + String srcDbName) { // ATTN: The meta of the restore may come from different clusters, so the // original ID in the meta may conflict with the ID of the new cluster. For // example, if a newly allocated ID happens to be the same as an original ID, @@ -815,6 +817,47 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore partitionInfo.resetPartitionIdForRestore(partitionMap, reserveReplica ? null : restoreReplicaAlloc, isSinglePartition); + boolean createNewColocateGroup = false; + Map>> backendsPerBucketSeq = null; + ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); + ColocateTableIndex.GroupId groupId = null; + if (reserveColocate && isColocateTable()) { + String fullGroupName = ColocateTableIndex.GroupId.getFullGroupName(db.getId(), getColocateGroup()); + ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName); + + if (groupSchema != null) { + try { + // group already exist, check if this table can be added to this group + groupSchema.checkColocateSchema(this); + //groupSchema.checkDynamicPartition(properties, getDefaultDistributionInfo()); + if (dynamicPartitionExists() + && getTableProperty().getDynamicPartitionProperty().getBuckets() + != groupSchema.getBucketsNum()) { + ErrorReport.reportDdlException( + ErrorCode.ERR_DYNAMIC_PARTITION_MUST_HAS_SAME_BUCKET_NUM_WITH_COLOCATE_TABLE, + getDefaultDistributionInfo().getBucketNum()); + } + } catch (Exception e) { + return new Status(ErrCode.COMMON_ERROR, "Restore table " + getName() + + " with colocate group " + getColocateGroup() + " failed: " + e.getMessage()); + } + + // if this is a colocate table, try to get backend seqs from colocation index. + backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupSchema.getGroupId()); + createNewColocateGroup = false; + } else { + backendsPerBucketSeq = Maps.newHashMap(); + createNewColocateGroup = true; + } + + // add table to this group, if group does not exist, create a new one + groupId = Env.getCurrentColocateIndex() + .addTableToGroup(db.getId(), this, fullGroupName, null /* generate group id inside */); + } else { + // remove colocate property. + setColocateGroup(null); + } + // for each partition, reset rollup index map Map nextIndexes = Maps.newHashMap(); for (Map.Entry entry : idToPartition.entrySet()) { @@ -854,10 +897,20 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore // replicas try { - Pair>, TStorageMedium> tag2beIdsAndMedium = - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( - replicaAlloc, nextIndexes, null, false, false); - Map> tag2beIds = tag2beIdsAndMedium.first; + Map> tag2beIds = null; + if (isColocateTable() && !createNewColocateGroup) { + // get backends from existing backend sequence + tag2beIds = Maps.newHashMap(); + for (Map.Entry>> entry3 : backendsPerBucketSeq.entrySet()) { + tag2beIds.put(entry3.getKey(), entry3.getValue().get(i)); + } + } else { + Pair>, TStorageMedium> tag2beIdsAndMedium = + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( + replicaAlloc, nextIndexes, null, + false, false); + tag2beIds = tag2beIdsAndMedium.first; + } for (Map.Entry> entry3 : tag2beIds.entrySet()) { for (Long beId : entry3.getValue()) { long newReplicaId = env.getNextId(); @@ -865,6 +918,10 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore visibleVersion, schemaHash); newTablet.addReplica(replica, true /* is restore */); } + if (createNewColocateGroup) { + backendsPerBucketSeq.putIfAbsent(entry3.getKey(), Lists.newArrayList()); + backendsPerBucketSeq.get(entry3.getKey()).add(entry3.getValue()); + } } } catch (DdlException e) { return new Status(ErrCode.COMMON_ERROR, e.getMessage()); @@ -872,6 +929,18 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore } } + if (createNewColocateGroup) { + colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + } + + // we have added these index to memory, only need to persist here + if (groupId != null) { + backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId); + ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, getId(), + backendsPerBucketSeq); + colocatePersistInfos.add(info); + } + // reset partition id partition.setIdForRestore(entry.getKey()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 65a161157c8967..5f3b58ef005f52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1695,6 +1695,9 @@ public void logAlterRepository(Repository repo) { public void logRestoreJob(RestoreJob job) { logEdit(OperationType.OP_RESTORE_JOB, job); + for (ColocatePersistInfo info : job.getColocatePersistInfos()) { + logColocateAddTable(info); + } } public void logUpdateUserProperty(UserPropertyInfo propertyInfo) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index dadfdb632e394d..568a168bafae86 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) { db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false, env, repo.getId()); List tbls = Lists.newArrayList(); diff --git a/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy b/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy new file mode 100644 index 00000000000000..dbe0cbacc65cd4 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_colocate", "backup_restore") { + String suiteName = "test_backup_restore_colocate" + String repoName = "${suiteName}_repo" + String dbName = "${suiteName}_db" + String newDbName = "${suiteName}_db_new" + String tableName1 = "${suiteName}_table1" + String tableName2 = "${suiteName}_table2" + String tableName3 = "${suiteName}_table3" + String snapshotName = "${suiteName}_snapshot" + String groupName = "${suiteName}_group" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + sql "DROP DATABASE IF EXISTS ${dbName}" + sql "DROP DATABASE IF EXISTS ${newDbName}" + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "CREATE DATABASE IF NOT EXISTS ${newDbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + sql """ + CREATE TABLE if NOT EXISTS ${dbName}.${tableName1} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "${groupName}" + ) + """ + sql """ + CREATE TABLE if NOT EXISTS ${dbName}.${tableName2} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "${groupName}" + ) + """ + def insert_num = 5 + for (int i = 0; i < insert_num; ++i) { + sql """ + INSERT INTO ${dbName}.${tableName1} VALUES (${i}, ${i}) + """ + sql """ + INSERT INTO ${dbName}.${tableName2} VALUES (${i}, ${i}) + """ + } + + def query = "select * from ${dbName}.${tableName1} as t1, ${dbName}.${tableName2} as t2 where t1.id=t2.id;" + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + res = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(res.size(), insert_num) + + explain { + sql("${query}") + contains("COLOCATE") + } + + res = sql "${query}" + assertEquals(res.size(), insert_num) + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName1}, ${tableName2}) + PROPERTIES ("type" = "full") + """ + + syncer.waitSnapshotFinish(dbName) + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + + logger.info("============== test 1: without reserve_colocate =============") + + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + syncer.waitAllRestoreFinish(dbName) + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + res = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(res.size(), insert_num) + + + explain { + sql("${query}") + notContains("COLOCATE") + } + res = sql "${query}" + assertEquals(res.size(), insert_num) + + logger.info("============== test 2: reserve_colocate = false =============") + + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "false" + ) + """ + syncer.waitAllRestoreFinish(dbName) + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + res = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(res.size(), insert_num) + + + explain { + sql("${query}") + notContains("COLOCATE") + } + res = sql "${query}" + assertEquals(res.size(), insert_num) + + + logger.info("============== test 3: reserve_colocate = true =============") + + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "true" + ) + """ + syncer.waitAllRestoreFinish(dbName) + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + res = sql "SELECT * FROM ${dbName}.${tableName2}" + assertEquals(res.size(), insert_num) + + + explain { + sql("${query}") + contains("COLOCATE") + } + res = sql "${query}" + assertEquals(res.size(), insert_num) + + logger.info("============== test 4: Not support to restore to local table with colocate group =============") + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "true" + ) + """ + syncer.waitAllRestoreFinish(dbName) + // Not support to restore to local table with colocate group + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("with colocate group")) + + + logger.info("============== test 5: local table with colocate group =============") + + res = sql "SELECT * FROM ${dbName}.${tableName1}" + assertEquals(res.size(), insert_num) + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "false" + ) + """ + syncer.waitAllRestoreFinish(dbName) + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("with colocate group")) + + + logger.info("============== test 6: local table without colocate group =============") + + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + // without colocate group + sql """ + CREATE TABLE if NOT EXISTS ${dbName}.${tableName1} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + sql """ + CREATE TABLE if NOT EXISTS ${dbName}.${tableName2} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + assertEquals(res.size(), insert_num) + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "false" + ) + """ + syncer.waitAllRestoreFinish(dbName) + records = sql_return_maparray "SHOW restore FROM ${dbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("OK")) + + + logger.info("============== test 7: local colocate mismatch error =============") + + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName2}" + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName3}" + // create with different colocat + sql """ + CREATE TABLE if NOT EXISTS ${newDbName}.${tableName3} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "colocate_with" = "${groupName}" + ) + """ + + sql """ + RESTORE SNAPSHOT ${newDbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "reserve_colocate" = "true" + ) + """ + syncer.waitAllRestoreFinish(newDbName) + + records = sql_return_maparray "SHOW restore FROM ${newDbName}" + row = records[records.size() - 1] + assertTrue(row.Status.contains("Colocate tables must have same bucket num")) + + //cleanup + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName2}" + sql "DROP TABLE IF EXISTS ${newDbName}.${tableName3}" + sql "DROP DATABASE ${newDbName} FORCE" + + sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName3}" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} \ No newline at end of file