Skip to content

Commit

Permalink
[feature](backup) backup restore coloate table
Browse files Browse the repository at this point in the history
  • Loading branch information
justfortaste committed Dec 26, 2024
1 parent 30ebe42 commit 55529c4
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;

public static final String PROP_RESERVE_REPLICA = "reserve_replica";
public static final String PROP_RESERVE_COLOCATE = "reserve_colocate";
public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
Expand All @@ -50,6 +51,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private String backupTimestamp = null;
private int metaVersion = -1;
private boolean reserveReplica = false;
private boolean reserveColocate = false;
private boolean reserveDynamicPartitionEnable = false;
private boolean isLocal = false;
private boolean isBeingSynced = false;
Expand Down Expand Up @@ -91,6 +93,10 @@ public boolean reserveReplica() {
return reserveReplica;
}

public boolean reserveColocate() {
return reserveColocate;
}

public boolean reserveDynamicPartitionEnable() {
return reserveDynamicPartitionEnable;
}
Expand Down Expand Up @@ -173,7 +179,8 @@ public void analyzeProperties() throws AnalysisException {
if (reserveReplica && !Config.force_olap_table_replication_allocation.isEmpty()) {
reserveReplica = false;
}

// reserve colocate
reserveColocate = eatBooleanProperty(copiedProperties, PROP_RESERVE_COLOCATE, reserveColocate);
// reserve dynamic partition enable
reserveDynamicPartitionEnable = eatBooleanProperty(
copiedProperties, PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, reserveDynamicPartitionEnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
env, repository.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,6 @@ private void prepareBackupMetaForOlapTableWithoutLock(TableRef tableRef, OlapTab
status = new Status(ErrCode.COMMON_ERROR, "failed to copy table: " + olapTable.getName());
return;
}

removeUnsupportProperties(copiedTbl);
copiedTables.add(copiedTbl);
}

Expand Down Expand Up @@ -710,12 +708,6 @@ private void prepareBackupMetaForOdbcTableWithoutLock(OdbcTable odbcTable, List<
}
}

private void removeUnsupportProperties(OlapTable tbl) {
// We cannot support the colocate attribute because the colocate information is not backed up
// synchronously when backing up.
tbl.setColocateGroup(null);
}

private void waitingAllSnapshotsFinished() {
if (unfinishedTaskIds.isEmpty()) {
if (env.getEditLog().exceedMaxJournalSize(this)) {
Expand Down
36 changes: 29 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
Expand Down Expand Up @@ -117,6 +118,7 @@

public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
private static final String PROP_RESERVE_COLOCATE = RestoreStmt.PROP_RESERVE_COLOCATE;
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE;
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
Expand Down Expand Up @@ -172,6 +174,7 @@ public enum RestoreJobState {
private ReplicaAllocation replicaAlloc;

private boolean reserveReplica = false;
private boolean reserveColocate = false;
private boolean reserveDynamicPartitionEnable = false;

// this 2 members is to save all newly restored objs
Expand All @@ -193,6 +196,8 @@ public enum RestoreJobState {

private Map<Long, Long> unfinishedSignatureToId = Maps.newConcurrentMap();

private List<ColocatePersistInfo> colocatePersistInfos = Lists.newArrayList();

// the meta version is used when reading backup meta from file.
// we do not persist this field, because this is just a temporary solution.
// the true meta version should be get from backup job info, which is saved when doing backup job.
Expand Down Expand Up @@ -227,8 +232,8 @@ public RestoreJob(JobType jobType) {

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -237,6 +242,7 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.state = RestoreJobState.PENDING;
this.metaVersion = metaVersion;
this.reserveReplica = reserveReplica;
this.reserveColocate = reserveColocate;
// if backup snapshot is come from a cluster with force replication allocation, ignore the origin allocation
if (jobInfo.isForceReplicationAllocation) {
this.reserveReplica = false;
Expand All @@ -247,6 +253,7 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.isCleanPartitions = isCleanPartitions;
this.isAtomicRestore = isAtomicRestore;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_COLOCATE, String.valueOf(reserveColocate));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
Expand All @@ -256,11 +263,13 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) {
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId,
BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env,
repoId);
reserveColocate, reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions,
isAtomicRestore, env, repoId);

this.backupMeta = backupMeta;
}

Expand All @@ -280,6 +289,10 @@ public boolean isBeingSynced() {
return isBeingSynced;
}

public List<ColocatePersistInfo> getColocatePersistInfos() {
return colocatePersistInfos;
}

public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
if (checkTaskStatus(task, task.getJobId(), request)) {
return false;
Expand Down Expand Up @@ -690,6 +703,12 @@ private void checkAndPrepareMeta() {
OlapTable localOlapTbl = (OlapTable) localTbl;
OlapTable remoteOlapTbl = (OlapTable) remoteTbl;

if (localOlapTbl.isColocateTable() || (reserveColocate && remoteOlapTbl.isColocateTable())) {
status = new Status(ErrCode.COMMON_ERROR, "Not support to restore to local table "
+ tableName + " with colocate group.");
return;
}

localOlapTbl.readLock();
try {
List<String> intersectPartNames = Lists.newArrayList();
Expand Down Expand Up @@ -806,7 +825,8 @@ private void checkAndPrepareMeta() {

// reset all ids in this table
String srcDbName = jobInfo.dbName;
Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica, srcDbName);
Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica,
reserveColocate, colocatePersistInfos, srcDbName);
if (!st.ok()) {
status = st;
return;
Expand Down Expand Up @@ -2112,6 +2132,7 @@ private Status allTabletCommitted(boolean isReplay) {
state = RestoreJobState.FINISHED;

env.getEditLog().logRestoreJob(this);
colocatePersistInfos.clear();
}

LOG.info("job is finished. is replay: {}. {}", isReplay, this);
Expand Down Expand Up @@ -2384,6 +2405,7 @@ private void cancelInternal(boolean isReplay) {
state = RestoreJobState.CANCELLED;
// log
env.getEditLog().logRestoreJob(this);
colocatePersistInfos.clear();

LOG.info("finished to cancel restore job. current state: {}. is replay: {}. {}",
curState.name(), isReplay, this);
Expand Down
83 changes: 76 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
Expand All @@ -57,6 +59,7 @@
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseMvHint;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -739,8 +742,6 @@ public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boo
if (isBeingSynced) {
setBeingSyncedProperties();
}
// remove colocate property.
setColocateGroup(null);
}

/**
Expand All @@ -764,7 +765,8 @@ public void resetVersionForRestore() {
}

public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restoreReplicaAlloc,
boolean reserveReplica, String srcDbName) {
boolean reserveReplica, boolean reserveColocate, List<ColocatePersistInfo> colocatePersistInfos,
String srcDbName) {
// ATTN: The meta of the restore may come from different clusters, so the
// original ID in the meta may conflict with the ID of the new cluster. For
// example, if a newly allocated ID happens to be the same as an original ID,
Expand Down Expand Up @@ -815,6 +817,47 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
partitionInfo.resetPartitionIdForRestore(partitionMap,
reserveReplica ? null : restoreReplicaAlloc, isSinglePartition);

boolean createNewColocateGroup = false;
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
ColocateTableIndex.GroupId groupId = null;
if (reserveColocate && isColocateTable()) {
String fullGroupName = ColocateTableIndex.GroupId.getFullGroupName(db.getId(), getColocateGroup());
ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);

if (groupSchema != null) {
try {
// group already exist, check if this table can be added to this group
groupSchema.checkColocateSchema(this);
//groupSchema.checkDynamicPartition(properties, getDefaultDistributionInfo());
if (dynamicPartitionExists()
&& getTableProperty().getDynamicPartitionProperty().getBuckets()
!= groupSchema.getBucketsNum()) {
ErrorReport.reportDdlException(
ErrorCode.ERR_DYNAMIC_PARTITION_MUST_HAS_SAME_BUCKET_NUM_WITH_COLOCATE_TABLE,
getDefaultDistributionInfo().getBucketNum());
}
} catch (Exception e) {
return new Status(ErrCode.COMMON_ERROR, "Restore table " + getName()
+ " with colocate group " + getColocateGroup() + " failed: " + e.getMessage());
}

// if this is a colocate table, try to get backend seqs from colocation index.
backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupSchema.getGroupId());
createNewColocateGroup = false;
} else {
backendsPerBucketSeq = Maps.newHashMap();
createNewColocateGroup = true;
}

// add table to this group, if group does not exist, create a new one
groupId = Env.getCurrentColocateIndex()
.addTableToGroup(db.getId(), this, fullGroupName, null /* generate group id inside */);
} else {
// remove colocate property.
setColocateGroup(null);
}

// for each partition, reset rollup index map
Map<Tag, Integer> nextIndexes = Maps.newHashMap();
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Expand Down Expand Up @@ -854,24 +897,50 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore

// replicas
try {
Pair<Map<Tag, List<Long>>, TStorageMedium> tag2beIdsAndMedium =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, null, false, false);
Map<Tag, List<Long>> tag2beIds = tag2beIdsAndMedium.first;
Map<Tag, List<Long>> tag2beIds = null;
if (isColocateTable() && !createNewColocateGroup) {
// get backends from existing backend sequence
tag2beIds = Maps.newHashMap();
for (Map.Entry<Tag, List<List<Long>>> entry3 : backendsPerBucketSeq.entrySet()) {
tag2beIds.put(entry3.getKey(), entry3.getValue().get(i));
}
} else {
Pair<Map<Tag, List<Long>>, TStorageMedium> tag2beIdsAndMedium =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, null,
false, false);
tag2beIds = tag2beIdsAndMedium.first;
}
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();
Replica replica = new Replica(newReplicaId, beId, ReplicaState.NORMAL,
visibleVersion, schemaHash);
newTablet.addReplica(replica, true /* is restore */);
}
if (createNewColocateGroup) {
backendsPerBucketSeq.putIfAbsent(entry3.getKey(), Lists.newArrayList());
backendsPerBucketSeq.get(entry3.getKey()).add(entry3.getValue());
}
}
} catch (DdlException e) {
return new Status(ErrCode.COMMON_ERROR, e.getMessage());
}
}
}

if (createNewColocateGroup) {
colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
}

// we have added these index to memory, only need to persist here
if (groupId != null) {
backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, getId(),
backendsPerBucketSeq);
colocatePersistInfos.add(info);
}

// reset partition id
partition.setIdForRestore(entry.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,9 @@ public void logAlterRepository(Repository repo) {

public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
for (ColocatePersistInfo info : job.getColocatePersistInfos()) {
logColocateAddTable(info);
}
}

public void logUpdateUserProperty(UserPropertyInfo propertyInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
Loading

0 comments on commit 55529c4

Please sign in to comment.