Skip to content

Commit

Permalink
branch-3.0: [fix](backup) Automatic adapt upload/download snapshot ba…
Browse files Browse the repository at this point in the history
…tch size apache#44560 (apache#44639)

Cherry-picked from apache#44560

Co-authored-by: walter <[email protected]>
  • Loading branch information
github-actions[bot] and w41ter authored Nov 27, 2024
1 parent 47c3f05 commit 4994841
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 42 deletions.
13 changes: 7 additions & 6 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2724,16 +2724,17 @@ public class Config extends ConfigBase {
public static String nereids_trace_log_dir = System.getenv("LOG_DIR") + "/nereids_trace";

@ConfField(mutable = true, masterOnly = true, description = {
"备份过程中,分配给每个be的upload任务最大个数,默认值为3个。",
"The max number of upload tasks assigned to each be during the backup process, the default value is 3."
"备份过程中,一个 upload 任务上传的快照数量上限,默认值为10个",
"The max number of snapshots assigned to a upload task during the backup process, the default value is 10."
})
public static int backup_upload_task_num_per_be = 3;
public static int backup_upload_snapshot_batch_size = 10;

@ConfField(mutable = true, masterOnly = true, description = {
"恢复过程中,分配给每个be的download任务最大个数,默认值为3个。",
"The max number of download tasks assigned to each be during the restore process, the default value is 3."
"恢复过程中,一个 download 任务下载的快照数量上限,默认值为10个",
"The max number of snapshots assigned to a download task during the restore process, "
+ "the default value is 10."
})
public static int restore_download_task_num_per_be = 3;
public static int restore_download_snapshot_batch_size = 10;

@ConfField(mutable = true, masterOnly = true, description = {
"备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
Expand Down
17 changes: 6 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,13 +751,10 @@ private void uploadSnapshot() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
int batchNum = totalNum;
if (Config.backup_upload_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.backup_upload_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
LOG.info("backend {} has {} batch, total {} tasks, {}", beId, batchNum, totalNum, this);
int taskNumPerBatch = Config.backup_upload_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

List<FsBroker> brokers = Lists.newArrayList();
Status st = repo.getBrokerAddress(beId, env, brokers);
Expand All @@ -768,12 +765,10 @@ private void uploadSnapshot() {
Preconditions.checkState(brokers.size() == 1);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
SnapshotInfo info = infos.get(index++);
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
SnapshotInfo info = infos.get(index + j);
String src = info.getTabletPath();
String dest = repo.getRepoTabletPathBySnapshotInfo(label, info);
if (dest == null) {
Expand Down
37 changes: 12 additions & 25 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -1686,16 +1686,10 @@ private void downloadRemoteSnapshots() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
int batchNum = totalNum;
if (Config.restore_download_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} has {} batch, total {} tasks, {}",
beId, batchNum, totalNum, this);
}
int taskNumPerBatch = Config.restore_download_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

List<FsBroker> brokerAddrs = null;
brokerAddrs = Lists.newArrayList();
Expand All @@ -1707,12 +1701,10 @@ private void downloadRemoteSnapshots() {
Preconditions.checkState(brokerAddrs.size() == 1);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
SnapshotInfo info = beSnapshotInfos.get(index++);
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "restored table "
Expand Down Expand Up @@ -1846,22 +1838,17 @@ private void downloadLocalSnapshots() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
int batchNum = totalNum;
if (Config.restore_download_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
int taskNumPerBatch = Config.restore_download_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot();

SnapshotInfo info = beSnapshotInfos.get(index++);
SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "restored table "
Expand Down

0 comments on commit 4994841

Please sign in to comment.