Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](build index)Optimize failed task check on same tablet (#42295) #43581

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace ErrorCode {
TStatusError(HTTP_ERROR, true); \
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(OBTAIN_LOCK_FAILED, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down Expand Up @@ -487,6 +488,7 @@ class [[nodiscard]] Status {
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
#undef ERROR_CTOR

template <int code>
Expand Down
20 changes: 12 additions & 8 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,37 +659,41 @@ Status IndexBuilder::do_build_inverted_index() {
std::unique_lock<std::mutex> schema_change_lock(_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try schema_change_lock failed");
return Status::ObtainLockFailed("try schema_change_lock failed. tablet={} ",
_tablet->tablet_id());
}
// Check executing serially with compaction task.
std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
std::try_to_lock);
if (!base_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try base_compaction_lock failed");
return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!cumu_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cumu_compaction_lock failed");
return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
std::try_to_lock);
if (!cold_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cold_compaction_lock failed");
return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}

std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
std::try_to_lock);
if (!build_inverted_index_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>(
"failed to obtain build inverted index lock. tablet={}", _tablet->tablet_id());
return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}",
_tablet->tablet_id());
}

std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
return Status::ObtainLockFailed("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
}

_input_rowsets =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -56,12 +56,12 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;


public class IndexChangeJob implements Writable {
private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class);

static final int MAX_FAILED_NUM = 10;
static final int MIN_FAILED_NUM = 3;

public enum JobState {
// CHECKSTYLE OFF
Expand Down Expand Up @@ -109,9 +109,6 @@ public boolean isFinalState() {
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
// save failed task after retry three times, tablet -> backends
@SerializedName(value = "failedTabletBackends")
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;

Expand Down Expand Up @@ -344,7 +341,9 @@ protected void runWaitingTxnJob() throws AlterCancelException {

LOG.info("invertedIndexBatchTask:{}", invertedIndexBatchTask);
AgentTaskQueue.addBatchTask(invertedIndexBatchTask);
AgentTaskExecutor.submit(invertedIndexBatchTask);
if (!FeConstants.runningUnitTest) {
AgentTaskExecutor.submit(invertedIndexBatchTask);
}
} finally {
olapTable.readUnlock();
}
Expand All @@ -359,9 +358,8 @@ protected void runRunningJob() throws AlterCancelException {
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
Expand All @@ -370,18 +368,19 @@ protected void runRunningJob() throws AlterCancelException {
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 3) {
if (task.getFailedTimes() >= MIN_FAILED_NUM) {
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
List<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(),
k -> Lists.newArrayList());
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold "
+ failedTaskCount);
// If error is obtaining lock failed.
// we should do more tries.
if (task.getErrorCode().equals(TStatusCode.OBTAIN_LOCK_FAILED)) {
if (task.getFailedTimes() < MAX_FAILED_NUM) {
continue;
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MAX_FAILED_NUM + ", error: " + task.getErrorMsg());
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MIN_FAILED_NUM + ", error: " + task.getErrorMsg());
}
}
return;
Expand All @@ -390,7 +389,9 @@ protected void runRunningJob() throws AlterCancelException {
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();

Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("inverted index job finished: {}", jobId);
}

Expand All @@ -408,7 +409,9 @@ protected boolean cancelImpl(String errMsg) {
jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2871,7 +2871,9 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
if (LOG.isDebugEnabled()) {
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
}
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
}
// Drop table column stats after light schema change finished.
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, null);

Expand Down Expand Up @@ -3130,7 +3132,9 @@ public void buildOrDeleteTableInvertedIndices(Database db, OlapTable olapTable,
addIndexChangeJob(indexChangeJob);

// write edit log
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
}
LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
olapTable.getName(), partitionName, jobId);
} // end for partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -74,6 +75,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
// disallow external catalog
Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName());

if (FeConstants.runningUnitTest) {
return;
}
// check access
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbTableName.getCtl(), dbTableName.getDb(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
task.setErrorMsg(errMsg);
task.setErrorCode(taskStatus.getStatusCode());
// We start to let FE perceive the task's error msg
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.Config;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

public abstract class AgentTask {
Expand All @@ -36,6 +37,7 @@ public abstract class AgentTask {

protected int failedTimes;
protected String errorMsg;
protected TStatusCode errorCode;
// some of process may use this member to check if the task is finished.
// some of are not.
// so whether the task is finished depends on caller's logic, not the value of this member.
Expand Down Expand Up @@ -126,6 +128,14 @@ public String getErrorMsg() {
return errorMsg;
}

public TStatusCode getErrorCode() {
return errorCode;
}

public void setErrorCode(TStatusCode errorCode) {
this.errorCode = errorCode;
}

public void setFinished(boolean isFinished) {
this.isFinished = isFinished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public static synchronized void updateTask(long backendId, TTaskType type, long
// this is just for unit test
public static synchronized List<AgentTask> getTask(TTaskType type) {
List<AgentTask> res = Lists.newArrayList();
for (Map<Long, AgentTask> agentTasks : tasks.column(TTaskType.ALTER).values()) {
for (Map<Long, AgentTask> agentTasks : tasks.column(type).values()) {
res.addAll(agentTasks.values());
}
return res;
Expand Down
Loading
Loading