Skip to content

Commit

Permalink
[Enhancement] add support for restore to ccr
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Dec 11, 2024
1 parent 279c5b7 commit 5e011f0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
env, repository.getId());
}

env.getEditLog().logRestoreJob(restoreJob);
env.getEditLog().logRestoreJob(restoreJob, null);

// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
addBackupOrRestoreJob(db.getId(), restoreJob);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.backup;

import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class RestoreBinLogInfo {
// currently we are sending only DB and table info.
// partitions level restore not possible since, there can be
// race condition when two partition recover and ccr-syncer try to sync it.
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "dbName")
private String dbName;
@SerializedName(value = "tableInfo")
// map of tableId and TableName.
private Map<Long, String> tableInfo = Maps.newHashMap();

/*
* constuctor
*/
public RestoreBinLogInfo(long dbId, String dbName) {
this.dbId = dbId;
this.dbName = dbName;
}

public void addTableInfo(long tableId, String tableName) {
tableInfo.put(tableId, tableName);
}

public long getDbId() {
return dbId;
}

public List<Long> getTableIdList() {
return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static RestoreBinLogInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, RestoreBinLogInfo.class);
}
}
13 changes: 9 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -1638,7 +1638,7 @@ private void waitingAllSnapshotsFinished() {
snapshotFinishedTime = System.currentTimeMillis();
state = RestoreJobState.DOWNLOAD;

env.getEditLog().logRestoreJob(this);
env.getEditLog().logRestoreJob(this, null);
LOG.info("finished making snapshots. {}", this);
return;
}
Expand Down Expand Up @@ -1981,7 +1981,7 @@ private void waitingAllDownloadFinished() {
// backupMeta is useless now
backupMeta = null;

env.getEditLog().logRestoreJob(this);
env.getEditLog().logRestoreJob(this, null);
LOG.info("finished to download. {}", this);
}

Expand Down Expand Up @@ -2036,6 +2036,7 @@ private Status allTabletCommitted(boolean isReplay) {
return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
}

RestoreBinLogInfo restoreBinLogInfo = new RestoreBinLogInfo(dbId, db.getName());
// replace the origin tables in atomic.
if (isAtomicRestore) {
Status st = atomicReplaceOlapTables(db, isReplay);
Expand All @@ -2052,6 +2053,9 @@ private Status allTabletCommitted(boolean isReplay) {
if (tbl == null) {
continue;
}
//just restore existing table, then version will change.
// so we need to write bin log for those tables also.
restoreBinLogInfo.addTableInfo(tbl.getId(), tbl.getName());
OlapTable olapTbl = (OlapTable) tbl;
if (!tbl.writeLockIfExist()) {
continue;
Expand Down Expand Up @@ -2097,6 +2101,7 @@ private Status allTabletCommitted(boolean isReplay) {
}

if (!isReplay) {
restoredTbls.stream().forEach(tbl -> restoreBinLogInfo.addTableInfo(tbl.getId(), tbl.getName()));
restoredPartitions.clear();
restoredTbls.clear();
restoredResources.clear();
Expand All @@ -2111,7 +2116,7 @@ private Status allTabletCommitted(boolean isReplay) {
finishedTime = System.currentTimeMillis();
state = RestoreJobState.FINISHED;

env.getEditLog().logRestoreJob(this);
env.getEditLog().logRestoreJob(this, restoreBinLogInfo);
}

LOG.info("job is finished. is replay: {}. {}", isReplay, this);
Expand Down Expand Up @@ -2383,7 +2388,7 @@ private void cancelInternal(boolean isReplay) {
finishedTime = System.currentTimeMillis();
state = RestoreJobState.CANCELLED;
// log
env.getEditLog().logRestoreJob(this);
env.getEditLog().logRestoreJob(this, null);

LOG.info("finished to cancel restore job. current state: {}. is replay: {}. {}",
curState.name(), isReplay, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.backup.RestoreBinLogInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -329,6 +330,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
}

public void addRestoreInfo(RestoreBinLogInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = info.getTableIdList();
long timestamp = -1;
TBinlogType type = TBinlogType.RESTORE_INFO;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}


public void addTableRename(TableInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreBinLogInfo;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.binlog.AddPartitionRecord;
import org.apache.doris.binlog.CreateTableRecord;
Expand Down Expand Up @@ -1693,8 +1694,13 @@ public void logAlterRepository(Repository repo) {
logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
}

public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
public void logRestoreJob(RestoreJob job, RestoreBinLogInfo binInfo) {
long logId = logEdit(OperationType.OP_RESTORE_JOB, job);
// write bin log only if restore job the finished.
if ((job.isFinished()) && (binInfo != null)) {
LOG.info("log restore info, logId:{}, infos: {}", logId, binInfo.toJson());
Env.getCurrentEnv().getBinlogManager().addRestoreInfo(binInfo, logId);
}
}

public void logUpdateUserProperty(UserPropertyInfo propertyInfo) {
Expand Down
5 changes: 2 additions & 3 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ enum TBinlogType {
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,
RECOVER_INFO = 24,

RESTORE_INFO = 25,
// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
// compatibility.
Expand All @@ -1216,8 +1216,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
MIN_UNKNOWN = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
UNKNOWN_13 = 29,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,4 +947,10 @@ class Syncer {
)
"""
}
}

void disableDbBinlog() {
suite.sql """
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
"""
}
}

0 comments on commit 5e011f0

Please sign in to comment.