Skip to content

Commit

Permalink
[Enhancement] add support for restore to ccr
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Dec 16, 2024
1 parent 86c3c76 commit 68d13d2
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.backup;

import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class RestoreBinlogInfo {
// currently we are sending only DB and table info.
// partitions level restore not possible since, there can be
// race condition when two partition recover and ccr-syncer try to sync it.
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "dbName")
private String dbName;
@SerializedName(value = "tableInfo")
// map of tableId and TableName.
private Map<Long, String> tableInfo = Maps.newHashMap();

/*
* constuctor
*/
public RestoreBinlogInfo(long dbId, String dbName) {
this.dbId = dbId;
this.dbName = dbName;
}

public void addTableInfo(long tableId, String tableName) {
tableInfo.put(tableId, tableName);
}

public long getDbId() {
return dbId;
}

public List<Long> getTableIdList() {
return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static RestoreBinlogInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, RestoreBinlogInfo.class);
}
}
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ public enum RestoreJobState {
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();

@SerializedName("resbinlog")
private RestoreBinlogInfo restoreBinlogInfo = null; // only set in FinishedCase.

private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;

public RestoreJob() {
Expand Down Expand Up @@ -401,6 +404,10 @@ public boolean isFinished() {
return state == RestoreJobState.FINISHED;
}

public RestoreBinlogInfo getRestoreBinlogInfo() {
return restoreBinlogInfo;
}

@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
Expand Down Expand Up @@ -2036,6 +2043,7 @@ private Status allTabletCommitted(boolean isReplay) {
return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
}

restoreBinlogInfo = new RestoreBinlogInfo(dbId, db.getName());
// replace the origin tables in atomic.
if (isAtomicRestore) {
Status st = atomicReplaceOlapTables(db, isReplay);
Expand All @@ -2052,6 +2060,9 @@ private Status allTabletCommitted(boolean isReplay) {
if (tbl == null) {
continue;
}
//just restore existing table, then version will change.
// so we need to write bin log for those tables also.
restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName());
OlapTable olapTbl = (OlapTable) tbl;
if (!tbl.writeLockIfExist()) {
continue;
Expand Down Expand Up @@ -2097,6 +2108,7 @@ private Status allTabletCommitted(boolean isReplay) {
}

if (!isReplay) {
restoredTbls.stream().forEach(tbl -> restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName()));
restoredPartitions.clear();
restoredTbls.clear();
restoredResources.clear();
Expand All @@ -2112,6 +2124,7 @@ private Status allTabletCommitted(boolean isReplay) {
state = RestoreJobState.FINISHED;

env.getEditLog().logRestoreJob(this);
restoreBinlogInfo = null;
}

LOG.info("job is finished. is replay: {}. {}", isReplay, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.backup.RestoreBinlogInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -329,6 +330,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
}

public void addRestoreInfo(RestoreBinlogInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = info.getTableIdList();
long timestamp = -1;
TBinlogType type = TBinlogType.RESTORE_INFO;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}


public void addTableRename(TableInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreBinlogInfo;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.binlog.AddPartitionRecord;
import org.apache.doris.binlog.CreateTableRecord;
Expand Down Expand Up @@ -1694,7 +1695,13 @@ public void logAlterRepository(Repository repo) {
}

public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
long logId = logEdit(OperationType.OP_RESTORE_JOB, job);
// write bin log only if restore job the finished.
RestoreBinlogInfo restoreBinlogInfo = job.getRestoreBinlogInfo();
if ((job.isFinished()) && (restoreBinlogInfo != null)) {
LOG.info("log restore info, logId:{}, infos: {}", logId, restoreBinlogInfo.toJson());
Env.getCurrentEnv().getBinlogManager().addRestoreInfo(restoreBinlogInfo, logId);
}
}

public void logUpdateUserProperty(UserPropertyInfo propertyInfo) {
Expand Down
5 changes: 2 additions & 3 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ enum TBinlogType {
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,
RECOVER_INFO = 24,

RESTORE_INFO = 25,
// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
// compatibility.
Expand All @@ -1216,8 +1216,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
MIN_UNKNOWN = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
UNKNOWN_13 = 29,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,4 +947,10 @@ class Syncer {
)
"""
}
}

void disableDbBinlog() {
suite.sql """
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
"""
}
}

0 comments on commit 68d13d2

Please sign in to comment.