From 234e36ca921c7a9d41040b3f0645150b85d9db04 Mon Sep 17 00:00:00 2001 From: Vallish Date: Sun, 8 Dec 2024 15:46:12 +0000 Subject: [PATCH] [Enhancement] add support for restore to ccr --- .../apache/doris/backup/BackupHandler.java | 2 +- .../doris/backup/RestoreBinLogInfo.java | 68 +++++++++++++++++++ .../org/apache/doris/backup/RestoreJob.java | 10 +-- .../apache/doris/binlog/BinlogManager.java | 12 ++++ .../org/apache/doris/persist/EditLog.java | 9 ++- gensrc/thrift/FrontendService.thrift | 7 +- regression-test/conf/regression-conf.groovy | 17 +++++ .../doris/regression/suite/Syncer.groovy | 33 ++++++++- 8 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinLogInfo.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 6a12eee3a78cb38..dc3f0c252d372e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -570,7 +570,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw env, repository.getId()); } - env.getEditLog().logRestoreJob(restoreJob); + env.getEditLog().logRestoreJob(restoreJob, null); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. addBackupOrRestoreJob(db.getId(), restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinLogInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinLogInfo.java new file mode 100644 index 000000000000000..cecda1b2446517c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreBinLogInfo.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.backup; + +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RestoreBinLogInfo { + // currently we are sending only DB and table info. + // partitions level restore not possible since, there can be + // race condition when two partition recover and ccr-syncer try to sync it. + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "dbName") + private String dbName; + @SerializedName(value = "tableInfo") + // map of tableId and TableName. + private Map tableInfo = Maps.newHashMap(); + + /* + * constuctor + */ + public RestoreBinLogInfo(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + + public void addTableInfo(long tableId, String tableName) { + tableInfo.put(tableId, tableName); + } + + public long getDbId() { + return dbId; + } + + public List getTableIdList() { + return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + public static RestoreBinLogInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, RestoreBinLogInfo.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6dfd02b3a426481..a5273dce4bccad7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1638,7 +1638,7 @@ private void waitingAllSnapshotsFinished() { snapshotFinishedTime = System.currentTimeMillis(); state = RestoreJobState.DOWNLOAD; - env.getEditLog().logRestoreJob(this); + env.getEditLog().logRestoreJob(this, null); LOG.info("finished making snapshots. {}", this); return; } @@ -1981,7 +1981,7 @@ private void waitingAllDownloadFinished() { // backupMeta is useless now backupMeta = null; - env.getEditLog().logRestoreJob(this); + env.getEditLog().logRestoreJob(this, null); LOG.info("finished to download. {}", this); } @@ -2036,6 +2036,7 @@ private Status allTabletCommitted(boolean isReplay) { return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist"); } + RestoreBinLogInfo restoreBinLogInfo = new RestoreBinLogInfo(dbId, db.getName()); // replace the origin tables in atomic. if (isAtomicRestore) { Status st = atomicReplaceOlapTables(db, isReplay); @@ -2097,6 +2098,7 @@ private Status allTabletCommitted(boolean isReplay) { } if (!isReplay) { + restoredTbls.stream().forEach(tbl -> restoreBinLogInfo.addTableInfo(tbl.getId(), tbl.getName())); restoredPartitions.clear(); restoredTbls.clear(); restoredResources.clear(); @@ -2111,7 +2113,7 @@ private Status allTabletCommitted(boolean isReplay) { finishedTime = System.currentTimeMillis(); state = RestoreJobState.FINISHED; - env.getEditLog().logRestoreJob(this); + env.getEditLog().logRestoreJob(this, restoreBinLogInfo); } LOG.info("job is finished. is replay: {}. {}", isReplay, this); @@ -2383,7 +2385,7 @@ private void cancelInternal(boolean isReplay) { finishedTime = System.currentTimeMillis(); state = RestoreJobState.CANCELLED; // log - env.getEditLog().logRestoreJob(this); + env.getEditLog().logRestoreJob(this, null); LOG.info("finished to cancel restore job. current state: {}. is replay: {}. {}", curState.name(), isReplay, this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 67bb99a8bcdc18f..d8aad070dc621a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -19,6 +19,7 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.IndexChangeJob; +import org.apache.doris.backup.RestoreBinLogInfo; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; @@ -328,6 +329,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record); } + public void addRestoreInfo(RestoreBinLogInfo info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = info.getTableIdList(); + long timestamp = -1; + TBinlogType type = TBinlogType.RESTORE_INFO; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + + public void addTableRename(TableInfo info, long commitSeq) { long dbId = info.getDbId(); List tableIds = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index f1377e9daebfc44..96f8cb36072e128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Repository; +import org.apache.doris.backup.RestoreBinLogInfo; import org.apache.doris.backup.RestoreJob; import org.apache.doris.binlog.AddPartitionRecord; import org.apache.doris.binlog.CreateTableRecord; @@ -1689,8 +1690,12 @@ public void logAlterRepository(Repository repo) { logEdit(OperationType.OP_ALTER_REPOSITORY, repo); } - public void logRestoreJob(RestoreJob job) { - logEdit(OperationType.OP_RESTORE_JOB, job); + public void logRestoreJob(RestoreJob job, RestoreBinLogInfo binInfo) { + long logId = logEdit(OperationType.OP_RESTORE_JOB, job); + if (binInfo != null) { + LOG.info("log restore info, logId:{}, infos: {}", logId, binInfo.toJson()); + Env.getCurrentEnv().getBinlogManager().addRestoreInfo(binInfo, logId); + } } public void logUpdateUserProperty(UserPropertyInfo propertyInfo) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 916885028ad4a0d..cc54174168be560 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1199,7 +1199,8 @@ enum TBinlogType { RENAME_ROLLUP = 21, RENAME_PARTITION = 22, DROP_ROLLUP = 23, - + RECOVER_INFO = 24, + RESTORE_INFO = 25, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking // compatibility. @@ -1215,9 +1216,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 24, - UNKNOWN_9 = 25, - UNKNOWN_10 = 26, + MIN_UNKNOWN = 26, UNKNOWN_11 = 27, UNKNOWN_12 = 28, UNKNOWN_13 = 29, diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index ab9bb0beb918697..9f2cd3f8cc95ae5 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -259,3 +259,20 @@ lakesoulMinioEndpoint="*******" metaServiceToken = "greedisgood9999" instanceId = "default_instance_id" multiClusterInstance = "default_instance_id" + +// JDBC configuration +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?" +targetJdbcUrl = "jdbc:mysql://127.0.0.1:9190/?" +jdbcUser = "root" +jdbcPassword = "" +feSourceThriftAddress = "127.0.0.1:9020" +feTargetThriftAddress = "127.0.0.1:9020" +syncerAddress = "127.0.0.1:9190" +feSyncerUser = "root" +feSyncerPassword = "" +feHttpAddress = "127.0.0.1:8330" +// CCR configuration +ccrDownstreamUrl = "jdbc:mysql://127.0.0.1:9030/?" +ccrDownstreamUser = "root" +ccrDownstreamPassword = "" +ccrDownstreamFeThriftAddress = "127.0.0.1:9020" \ No newline at end of file diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 2195e7e745afd7b..7109dc31f4ef0e6 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -947,4 +947,35 @@ class Syncer { ) """ } -} + + void createMinoIoRepository(String name, boolean readOnly = false) { + String ak = "xuYlrTbIGdz8oA8HeHSa" + String sk = "6n7S3PFkDOBDSJiwQLURNywVwT20ptqOAY3wrOSF" + String endpoint = "http://127.0.0.1:9000" + String region = suite.getS3Region() + String bucket = "test" + + + suite.try_sql "DROP REPOSITORY `${name}`" + suite.sql """ + CREATE ${readOnly ? "READ ONLY" : ""} REPOSITORY `${name}` + WITH S3 + ON LOCATION "s3://${bucket}/${name}" + PROPERTIES + ( + "s3.endpoint" = "http://127.0.0.1:9000", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.root"= "${name}", + "s3.use_path_style" = "true", + "s3.region" = "${region}" + ) + """ + } + + void disableDbBinlog() { + suite.sql """ + ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false") + """ + } +} \ No newline at end of file