Skip to content

Commit

Permalink
opt
Browse files Browse the repository at this point in the history
  • Loading branch information
qidaye committed Nov 5, 2024
1 parent fca28fe commit de4bdee
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace ErrorCode {
TStatusError(HTTP_ERROR, true); \
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(TRY_LOCK_FAILED, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down Expand Up @@ -487,6 +488,7 @@ class [[nodiscard]] Status {
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(TryLockFailed, TRY_LOCK_FAILED)
#undef ERROR_CTOR

template <int code>
Expand Down
19 changes: 11 additions & 8 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,37 +633,40 @@ Status IndexBuilder::do_build_inverted_index() {
std::unique_lock<std::mutex> schema_change_lock(_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try schema_change_lock failed");
return Status::TryLockFailed("try schema_change_lock failed. tablet={} ",
_tablet->tablet_id());
}
// Check executing serially with compaction task.
std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
std::try_to_lock);
if (!base_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try base_compaction_lock failed");
return Status::TryLockFailed("try base_compaction_lock failed. tablet={} ",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!cumu_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cumu_compaction_lock failed");
return Status::TryLockFailed("try cumu_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
std::try_to_lock);
if (!cold_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cold_compaction_lock failed");
return Status::TryLockFailed("try cold_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
std::try_to_lock);
if (!build_inverted_index_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>(
"failed to obtain build inverted index lock. tablet={}", _tablet->tablet_id());
return Status::TryLockFailed("failed to obtain build inverted index lock. tablet={}",
_tablet->tablet_id());
}

std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
return Status::TryLockFailed("got migration_rlock failed. tablet={}", _tablet->tablet_id());
}

_input_rowsets =
Expand Down
38 changes: 14 additions & 24 deletions fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,26 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


public class IndexChangeJob implements Writable {
private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class);
private static final int MAX_FAILED_NUM = 10;
private static final int MIN_FAILED_NUM = 3;

public enum JobState {
// CHECKSTYLE OFF
Expand Down Expand Up @@ -111,9 +109,6 @@ public boolean isFinalState() {
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
// save failed task after retry three times, tablet -> backends
@SerializedName(value = "failedTabletBackends")
protected Map<Long, Set<Long>> failedTabletBackends = Maps.newHashMap();
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;

Expand Down Expand Up @@ -361,9 +356,8 @@ protected void runRunningJob() throws AlterCancelException {
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
Expand All @@ -372,23 +366,19 @@ protected void runRunningJob() throws AlterCancelException {
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() >= MAX_FAILED_NUM) {
if (task.getFailedTimes() >= MIN_FAILED_NUM) {
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
// If error is E-216, it indicates obtaining lock failed.
// we should retry this task.
if (task.getErrorMsg().contains("E-216")) {
continue;
}
Set<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(),
k -> new HashSet<>());
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold "
+ failedTaskCount + ", error: " + task.getErrorMsg());
// If error is obtaining lock failed.
// we should do more tries.
if (task.getErrorCode().equals(TStatusCode.TRY_LOCK_FAILED)) {
if (task.getFailedTimes() < MAX_FAILED_NUM) {
continue;
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MAX_FAILED_NUM + ", error: " + task.getErrorMsg());
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MIN_FAILED_NUM + ", error: " + task.getErrorMsg());
}
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
task.setErrorMsg(errMsg);
task.setErrorCode(taskStatus.getStatusCode());
// We start to let FE perceive the task's error msg
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.Config;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

public abstract class AgentTask {
Expand All @@ -36,6 +37,7 @@ public abstract class AgentTask {

protected int failedTimes;
protected String errorMsg;
protected TStatusCode errorCode;
// some of process may use this member to check if the task is finished.
// some of are not.
// so whether the task is finished depends on caller's logic, not the value of this member.
Expand Down Expand Up @@ -122,6 +124,14 @@ public String getErrorMsg() {
return errorMsg;
}

public TStatusCode getErrorCode() {
return errorCode;
}

public void setErrorCode(TStatusCode errorCode) {
this.errorCode = errorCode;
}

public void setFinished(boolean isFinished) {
this.isFinished = isFinished;
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/Status.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ enum TStatusCode {

NOT_MASTER = 73,

TRY_LOCK_FAILED = 74,

// used for cloud
DELETE_BITMAP_LOCK_ERROR = 100,
// Not be larger than 200, see status.h
Expand Down

0 comments on commit de4bdee

Please sign in to comment.