Skip to content

Commit

Permalink
[improvement](build index)Optimize failed task check on same tablet (a…
Browse files Browse the repository at this point in the history
…pache#42295)

## Proposed changes

1. Remove logic for replica majority failure checking
2. Introduce `OBTAIN_LOCK_FAILED` status code both in `status.h` and
`Status.thrift`
3. Add `MIN_FAILED_NUM = 3` for non try_lock_failed failure task. The
task will try 3 times before canceling the job.
4. Add `MAX_FAILED_NUM = 10` for try_lock_failed failure task. The task
will try 10 times before canceling the job.

---------

Co-authored-by: qidaye <[email protected]>
  • Loading branch information
qidaye and qidaye committed Nov 11, 2024
1 parent 91d95f6 commit c48f0d7
Show file tree
Hide file tree
Showing 11 changed files with 624 additions and 32 deletions.
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace ErrorCode {
TStatusError(HTTP_ERROR, true); \
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(OBTAIN_LOCK_FAILED, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down Expand Up @@ -487,6 +488,7 @@ class [[nodiscard]] Status {
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
#undef ERROR_CTOR

template <int code>
Expand Down
20 changes: 12 additions & 8 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,37 +659,41 @@ Status IndexBuilder::do_build_inverted_index() {
std::unique_lock<std::mutex> schema_change_lock(_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try schema_change_lock failed");
return Status::ObtainLockFailed("try schema_change_lock failed. tablet={} ",
_tablet->tablet_id());
}
// Check executing serially with compaction task.
std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
std::try_to_lock);
if (!base_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try base_compaction_lock failed");
return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!cumu_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cumu_compaction_lock failed");
return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
std::try_to_lock);
if (!cold_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cold_compaction_lock failed");
return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
std::try_to_lock);
if (!build_inverted_index_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>(
"failed to obtain build inverted index lock. tablet={}", _tablet->tablet_id());
return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}",
_tablet->tablet_id());
}

std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
return Status::ObtainLockFailed("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
}

_input_rowsets =
Expand Down
45 changes: 24 additions & 21 deletions fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -56,12 +56,12 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;


public class IndexChangeJob implements Writable {
private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class);

static final int MAX_FAILED_NUM = 10;
static final int MIN_FAILED_NUM = 3;

public enum JobState {
// CHECKSTYLE OFF
Expand Down Expand Up @@ -109,9 +109,6 @@ public boolean isFinalState() {
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
// save failed task after retry three times, tablet -> backends
@SerializedName(value = "failedTabletBackends")
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;

Expand Down Expand Up @@ -344,7 +341,9 @@ protected void runWaitingTxnJob() throws AlterCancelException {

LOG.info("invertedIndexBatchTask:{}", invertedIndexBatchTask);
AgentTaskQueue.addBatchTask(invertedIndexBatchTask);
AgentTaskExecutor.submit(invertedIndexBatchTask);
if (!FeConstants.runningUnitTest) {
AgentTaskExecutor.submit(invertedIndexBatchTask);
}
} finally {
olapTable.readUnlock();
}
Expand All @@ -359,9 +358,8 @@ protected void runRunningJob() throws AlterCancelException {
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
Expand All @@ -370,18 +368,19 @@ protected void runRunningJob() throws AlterCancelException {
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 3) {
if (task.getFailedTimes() >= MIN_FAILED_NUM) {
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
List<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(),
k -> Lists.newArrayList());
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold "
+ failedTaskCount);
// If error is obtaining lock failed.
// we should do more tries.
if (task.getErrorCode().equals(TStatusCode.OBTAIN_LOCK_FAILED)) {
if (task.getFailedTimes() < MAX_FAILED_NUM) {
continue;
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MAX_FAILED_NUM + ", error: " + task.getErrorMsg());
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MIN_FAILED_NUM + ", error: " + task.getErrorMsg());
}
}
return;
Expand All @@ -390,7 +389,9 @@ protected void runRunningJob() throws AlterCancelException {
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();

Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("inverted index job finished: {}", jobId);
}

Expand All @@ -408,7 +409,9 @@ protected boolean cancelImpl(String errMsg) {
jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2871,7 +2871,9 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
if (LOG.isDebugEnabled()) {
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
}
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
}
// Drop table column stats after light schema change finished.
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, null);

Expand Down Expand Up @@ -3130,7 +3132,9 @@ public void buildOrDeleteTableInvertedIndices(Database db, OlapTable olapTable,
addIndexChangeJob(indexChangeJob);

// write edit log
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
}
LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
olapTable.getName(), partitionName, jobId);
} // end for partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -74,6 +75,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
// disallow external catalog
Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName());

if (FeConstants.runningUnitTest) {
return;
}
// check access
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbTableName.getCtl(), dbTableName.getDb(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
task.setErrorMsg(errMsg);
task.setErrorCode(taskStatus.getStatusCode());
// We start to let FE perceive the task's error msg
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.Config;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

public abstract class AgentTask {
Expand All @@ -36,6 +37,7 @@ public abstract class AgentTask {

protected int failedTimes;
protected String errorMsg;
protected TStatusCode errorCode;
// some of process may use this member to check if the task is finished.
// some of are not.
// so whether the task is finished depends on caller's logic, not the value of this member.
Expand Down Expand Up @@ -126,6 +128,14 @@ public String getErrorMsg() {
return errorMsg;
}

public TStatusCode getErrorCode() {
return errorCode;
}

public void setErrorCode(TStatusCode errorCode) {
this.errorCode = errorCode;
}

public void setFinished(boolean isFinished) {
this.isFinished = isFinished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public static synchronized void updateTask(long backendId, TTaskType type, long
// this is just for unit test
public static synchronized List<AgentTask> getTask(TTaskType type) {
List<AgentTask> res = Lists.newArrayList();
for (Map<Long, AgentTask> agentTasks : tasks.column(TTaskType.ALTER).values()) {
for (Map<Long, AgentTask> agentTasks : tasks.column(type).values()) {
res.addAll(agentTasks.values());
}
return res;
Expand Down
Loading

0 comments on commit c48f0d7

Please sign in to comment.