From 68d13d2d2c9ec73a176ce79bd90a138339c86128 Mon Sep 17 00:00:00 2001 From: Vallish Date: Wed, 11 Dec 2024 09:34:48 +0000 Subject: [PATCH] [Enhancement] add support for restore to ccr --- .../doris/backup/RestoreBinlogInfo.java | 68 +++++++++++++++++++ .../org/apache/doris/backup/RestoreJob.java | 13 ++++ .../apache/doris/binlog/BinlogManager.java | 12 ++++ .../org/apache/doris/persist/EditLog.java | 9 ++- gensrc/thrift/FrontendService.thrift | 5 +- .../doris/regression/suite/Syncer.groovy | 8 ++- 6 files changed, 110 insertions(+), 5 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinlogInfo.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinlogInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinlogInfo.java new file mode 100644 index 00000000000000..19c6f713c9372b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinlogInfo.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.backup; + +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RestoreBinlogInfo { + // currently we are sending only DB and table info. + // partitions level restore not possible since, there can be + // race condition when two partition recover and ccr-syncer try to sync it. + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "dbName") + private String dbName; + @SerializedName(value = "tableInfo") + // map of tableId and TableName. + private Map tableInfo = Maps.newHashMap(); + + /* + * constuctor + */ + public RestoreBinlogInfo(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + + public void addTableInfo(long tableId, String tableName) { + tableInfo.put(tableId, tableName); + } + + public long getDbId() { + return dbId; + } + + public List getTableIdList() { + return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + public static RestoreBinlogInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, RestoreBinlogInfo.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6dfd02b3a42648..1b6423285ffaa3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -215,6 +215,9 @@ public enum RestoreJobState { @SerializedName("prop") private Map properties = Maps.newHashMap(); + @SerializedName("resbinlog") + private RestoreBinlogInfo restoreBinlogInfo = null; // only set in FinishedCase. + private MarkedCountDownLatch createReplicaTasksLatch = null; public RestoreJob() { @@ -401,6 +404,10 @@ public boolean isFinished() { return state == RestoreJobState.FINISHED; } + public RestoreBinlogInfo getRestoreBinlogInfo() { + return restoreBinlogInfo; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; @@ -2036,6 +2043,7 @@ private Status allTabletCommitted(boolean isReplay) { return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist"); } + restoreBinlogInfo = new RestoreBinlogInfo(dbId, db.getName()); // replace the origin tables in atomic. if (isAtomicRestore) { Status st = atomicReplaceOlapTables(db, isReplay); @@ -2052,6 +2060,9 @@ private Status allTabletCommitted(boolean isReplay) { if (tbl == null) { continue; } + //just restore existing table, then version will change. + // so we need to write bin log for those tables also. + restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName()); OlapTable olapTbl = (OlapTable) tbl; if (!tbl.writeLockIfExist()) { continue; @@ -2097,6 +2108,7 @@ private Status allTabletCommitted(boolean isReplay) { } if (!isReplay) { + restoredTbls.stream().forEach(tbl -> restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName())); restoredPartitions.clear(); restoredTbls.clear(); restoredResources.clear(); @@ -2112,6 +2124,7 @@ private Status allTabletCommitted(boolean isReplay) { state = RestoreJobState.FINISHED; env.getEditLog().logRestoreJob(this); + restoreBinlogInfo = null; } LOG.info("job is finished. is replay: {}. {}", isReplay, this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b9eb91cc5f7ea6..f2088180bc7fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -19,6 +19,7 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.IndexChangeJob; +import org.apache.doris.backup.RestoreBinlogInfo; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; @@ -329,6 +330,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record); } + public void addRestoreInfo(RestoreBinlogInfo info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = info.getTableIdList(); + long timestamp = -1; + TBinlogType type = TBinlogType.RESTORE_INFO; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + + public void addTableRename(TableInfo info, long commitSeq) { long dbId = info.getDbId(); List tableIds = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 65a161157c8967..71b8930131e95d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Repository; +import org.apache.doris.backup.RestoreBinlogInfo; import org.apache.doris.backup.RestoreJob; import org.apache.doris.binlog.AddPartitionRecord; import org.apache.doris.binlog.CreateTableRecord; @@ -1694,7 +1695,13 @@ public void logAlterRepository(Repository repo) { } public void logRestoreJob(RestoreJob job) { - logEdit(OperationType.OP_RESTORE_JOB, job); + long logId = logEdit(OperationType.OP_RESTORE_JOB, job); + // write bin log only if restore job the finished. + RestoreBinlogInfo restoreBinlogInfo = job.getRestoreBinlogInfo(); + if ((job.isFinished()) && (restoreBinlogInfo != null)) { + LOG.info("log restore info, logId:{}, infos: {}", logId, restoreBinlogInfo.toJson()); + Env.getCurrentEnv().getBinlogManager().addRestoreInfo(restoreBinlogInfo, logId); + } } public void logUpdateUserProperty(UserPropertyInfo propertyInfo) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c7d1bf264f2f63..cc54174168be56 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1200,7 +1200,7 @@ enum TBinlogType { RENAME_PARTITION = 22, DROP_ROLLUP = 23, RECOVER_INFO = 24, - + RESTORE_INFO = 25, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking // compatibility. @@ -1216,8 +1216,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 25, - UNKNOWN_10 = 26, + MIN_UNKNOWN = 26, UNKNOWN_11 = 27, UNKNOWN_12 = 28, UNKNOWN_13 = 29, diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 2195e7e745afd7..39cc17b3306803 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -947,4 +947,10 @@ class Syncer { ) """ } -} + + void disableDbBinlog() { + suite.sql """ + ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false") + """ + } +} \ No newline at end of file