From 499484193918c714c820a5ba471f59372c65186b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 10:21:58 +0800 Subject: [PATCH] branch-3.0: [fix](backup) Automatic adapt upload/download snapshot batch size #44560 (#44639) Cherry-picked from #44560 Co-authored-by: walter --- .../java/org/apache/doris/common/Config.java | 13 ++++--- .../org/apache/doris/backup/BackupJob.java | 17 +++------ .../org/apache/doris/backup/RestoreJob.java | 37 ++++++------------- 3 files changed, 25 insertions(+), 42 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7b23272bbb55bb..4b3e5bc0a3e591 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2724,16 +2724,17 @@ public class Config extends ConfigBase { public static String nereids_trace_log_dir = System.getenv("LOG_DIR") + "/nereids_trace"; @ConfField(mutable = true, masterOnly = true, description = { - "备份过程中,分配给每个be的upload任务最大个数,默认值为3个。", - "The max number of upload tasks assigned to each be during the backup process, the default value is 3." + "备份过程中,一个 upload 任务上传的快照数量上限,默认值为10个", + "The max number of snapshots assigned to a upload task during the backup process, the default value is 10." }) - public static int backup_upload_task_num_per_be = 3; + public static int backup_upload_snapshot_batch_size = 10; @ConfField(mutable = true, masterOnly = true, description = { - "恢复过程中,分配给每个be的download任务最大个数,默认值为3个。", - "The max number of download tasks assigned to each be during the restore process, the default value is 3." + "恢复过程中,一个 download 任务下载的快照数量上限,默认值为10个", + "The max number of snapshots assigned to a download task during the restore process, " + + "the default value is 10." }) - public static int restore_download_task_num_per_be = 3; + public static int restore_download_snapshot_batch_size = 10; @ConfField(mutable = true, masterOnly = true, description = { "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。", diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 478e8902d7d8c4..621a2b1d9f7d29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -751,13 +751,10 @@ private void uploadSnapshot() { for (Long beId : beToSnapshots.keySet()) { List infos = beToSnapshots.get(beId); int totalNum = infos.size(); - int batchNum = totalNum; - if (Config.backup_upload_task_num_per_be > 0) { - batchNum = Math.min(totalNum, Config.backup_upload_task_num_per_be); - } // each task contains several upload sub tasks - int taskNumPerBatch = Math.max(totalNum / batchNum, 1); - LOG.info("backend {} has {} batch, total {} tasks, {}", beId, batchNum, totalNum, this); + int taskNumPerBatch = Config.backup_upload_snapshot_batch_size; + LOG.info("backend {} has total {} snapshots, per task batch size {}, {}", + beId, totalNum, taskNumPerBatch, this); List brokers = Lists.newArrayList(); Status st = repo.getBrokerAddress(beId, env, brokers); @@ -768,12 +765,10 @@ private void uploadSnapshot() { Preconditions.checkState(brokers.size() == 1); // allot tasks - int index = 0; - for (int batch = 0; batch < batchNum; batch++) { + for (int index = 0; index < totalNum; index += taskNumPerBatch) { Map srcToDest = Maps.newHashMap(); - int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch; - for (int j = 0; j < currentBatchTaskNum; j++) { - SnapshotInfo info = infos.get(index++); + for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) { + SnapshotInfo info = infos.get(index + j); String src = info.getTabletPath(); String dest = repo.getRepoTabletPathBySnapshotInfo(label, info); if (dest == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index e922955c5aaa40..a984eed595073d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1686,16 +1686,10 @@ private void downloadRemoteSnapshots() { for (Long beId : beToSnapshots.keySet()) { List beSnapshotInfos = beToSnapshots.get(beId); int totalNum = beSnapshotInfos.size(); - int batchNum = totalNum; - if (Config.restore_download_task_num_per_be > 0) { - batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be); - } // each task contains several upload sub tasks - int taskNumPerBatch = Math.max(totalNum / batchNum, 1); - if (LOG.isDebugEnabled()) { - LOG.debug("backend {} has {} batch, total {} tasks, {}", - beId, batchNum, totalNum, this); - } + int taskNumPerBatch = Config.restore_download_snapshot_batch_size; + LOG.info("backend {} has total {} snapshots, per task batch size {}, {}", + beId, totalNum, taskNumPerBatch, this); List brokerAddrs = null; brokerAddrs = Lists.newArrayList(); @@ -1707,12 +1701,10 @@ private void downloadRemoteSnapshots() { Preconditions.checkState(brokerAddrs.size() == 1); // allot tasks - int index = 0; - for (int batch = 0; batch < batchNum; batch++) { + for (int index = 0; index < totalNum; index += taskNumPerBatch) { Map srcToDest = Maps.newHashMap(); - int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch; - for (int j = 0; j < currentBatchTaskNum; j++) { - SnapshotInfo info = beSnapshotInfos.get(index++); + for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) { + SnapshotInfo info = beSnapshotInfos.get(index + j); Table tbl = db.getTableNullable(info.getTblId()); if (tbl == null) { status = new Status(ErrCode.NOT_FOUND, "restored table " @@ -1846,22 +1838,17 @@ private void downloadLocalSnapshots() { for (Long beId : beToSnapshots.keySet()) { List beSnapshotInfos = beToSnapshots.get(beId); int totalNum = beSnapshotInfos.size(); - int batchNum = totalNum; - if (Config.restore_download_task_num_per_be > 0) { - batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be); - } // each task contains several upload sub tasks - int taskNumPerBatch = Math.max(totalNum / batchNum, 1); + int taskNumPerBatch = Config.restore_download_snapshot_batch_size; + LOG.info("backend {} has total {} snapshots, per task batch size {}, {}", + beId, totalNum, taskNumPerBatch, this); // allot tasks - int index = 0; - for (int batch = 0; batch < batchNum; batch++) { + for (int index = 0; index < totalNum; index += taskNumPerBatch) { List remoteTabletSnapshots = Lists.newArrayList(); - int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch; - for (int j = 0; j < currentBatchTaskNum; j++) { + for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) { TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot(); - - SnapshotInfo info = beSnapshotInfos.get(index++); + SnapshotInfo info = beSnapshotInfos.get(index + j); Table tbl = db.getTableNullable(info.getTblId()); if (tbl == null) { status = new Status(ErrCode.NOT_FOUND, "restored table "