diff --git a/be/src/common/status.h b/be/src/common/status.h index e95b93431679a2..234566c2b01cf8 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -76,6 +76,7 @@ namespace ErrorCode { TStatusError(HTTP_ERROR, true); \ TStatusError(TABLET_MISSING, true); \ TStatusError(NOT_MASTER, true); \ + TStatusError(OBTAIN_LOCK_FAILED, false); \ TStatusError(DELETE_BITMAP_LOCK_ERROR, false); // E error_name, error_code, print_stacktrace #define APPLY_FOR_OLAP_ERROR_CODES(E) \ @@ -487,6 +488,7 @@ class [[nodiscard]] Status { ERROR_CTOR(HttpError, HTTP_ERROR) ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN) ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR) + ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED) #undef ERROR_CTOR template diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 3af35ee78a84d9..c843b123030e71 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -659,37 +659,41 @@ Status IndexBuilder::do_build_inverted_index() { std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::try_to_lock); if (!schema_change_lock.owns_lock()) { - return Status::Error("try schema_change_lock failed"); + return Status::ObtainLockFailed("try schema_change_lock failed. tablet={} ", + _tablet->tablet_id()); } // Check executing serially with compaction task. std::unique_lock base_compaction_lock(_tablet->get_base_compaction_lock(), std::try_to_lock); if (!base_compaction_lock.owns_lock()) { - return Status::Error("try base_compaction_lock failed"); + return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ", + _tablet->tablet_id()); } std::unique_lock cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); if (!cumu_compaction_lock.owns_lock()) { - return Status::Error("try cumu_compaction_lock failed"); + return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}", + _tablet->tablet_id()); } std::unique_lock cold_compaction_lock(_tablet->get_cold_compaction_lock(), std::try_to_lock); if (!cold_compaction_lock.owns_lock()) { - return Status::Error("try cold_compaction_lock failed"); + return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}", + _tablet->tablet_id()); } std::unique_lock build_inverted_index_lock(_tablet->get_build_inverted_index_lock(), std::try_to_lock); if (!build_inverted_index_lock.owns_lock()) { - return Status::Error( - "failed to obtain build inverted index lock. tablet={}", _tablet->tablet_id()); + return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}", + _tablet->tablet_id()); } std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock); if (!migration_rlock.owns_lock()) { - return Status::Error("got migration_rlock failed. tablet={}", - _tablet->tablet_id()); + return Status::ObtainLockFailed("got migration_rlock failed. tablet={}", + _tablet->tablet_id()); } _input_rowsets = diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index 7fa592356cc8b6..bb0c018dc36f36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -42,12 +42,12 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterInvertedIndexTask; import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTaskType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -56,12 +56,12 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.Map; public class IndexChangeJob implements Writable { private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class); - + static final int MAX_FAILED_NUM = 10; + static final int MIN_FAILED_NUM = 3; public enum JobState { // CHECKSTYLE OFF @@ -109,9 +109,6 @@ public boolean isFinalState() { private long originIndexId; @SerializedName(value = "invertedIndexBatchTask") AgentBatchTask invertedIndexBatchTask = new AgentBatchTask(); - // save failed task after retry three times, tablet -> backends - @SerializedName(value = "failedTabletBackends") - protected Map> failedTabletBackends = Maps.newHashMap(); @SerializedName(value = "timeoutMs") protected long timeoutMs = -1; @@ -344,7 +341,9 @@ protected void runWaitingTxnJob() throws AlterCancelException { LOG.info("invertedIndexBatchTask:{}", invertedIndexBatchTask); AgentTaskQueue.addBatchTask(invertedIndexBatchTask); - AgentTaskExecutor.submit(invertedIndexBatchTask); + if (!FeConstants.runningUnitTest) { + AgentTaskExecutor.submit(invertedIndexBatchTask); + } } finally { olapTable.readUnlock(); } @@ -359,9 +358,8 @@ protected void runRunningJob() throws AlterCancelException { // and the job will be in RUNNING state forever. Database db = Env.getCurrentInternalCatalog() .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); - OlapTable tbl; try { - tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + db.getTableOrMetaException(tableId, TableType.OLAP); } catch (MetaNotFoundException e) { throw new AlterCancelException(e.getMessage()); } @@ -370,18 +368,19 @@ protected void runRunningJob() throws AlterCancelException { LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId); List tasks = invertedIndexBatchTask.getUnfinishedTasks(2000); for (AgentTask task : tasks) { - if (task.getFailedTimes() > 3) { + if (task.getFailedTimes() >= MIN_FAILED_NUM) { LOG.warn("alter inverted index task failed: " + task.getErrorMsg()); - List failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(), - k -> Lists.newArrayList()); - failedBackends.add(task.getBackendId()); - int expectSucceedTaskNum = tbl.getPartitionInfo() - .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); - int failedTaskCount = failedBackends.size(); - if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { - throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold " - + failedTaskCount); + // If error is obtaining lock failed. + // we should do more tries. + if (task.getErrorCode().equals(TStatusCode.OBTAIN_LOCK_FAILED)) { + if (task.getFailedTimes() < MAX_FAILED_NUM) { + continue; + } + throw new AlterCancelException("inverted index tasks failed times reach threshold " + + MAX_FAILED_NUM + ", error: " + task.getErrorMsg()); } + throw new AlterCancelException("inverted index tasks failed times reach threshold " + + MIN_FAILED_NUM + ", error: " + task.getErrorMsg()); } } return; @@ -390,7 +389,9 @@ protected void runRunningJob() throws AlterCancelException { this.jobState = JobState.FINISHED; this.finishedTimeMs = System.currentTimeMillis(); - Env.getCurrentEnv().getEditLog().logIndexChangeJob(this); + if (!FeConstants.runningUnitTest) { + Env.getCurrentEnv().getEditLog().logIndexChangeJob(this); + } LOG.info("inverted index job finished: {}", jobId); } @@ -408,7 +409,9 @@ protected boolean cancelImpl(String errMsg) { jobState = JobState.CANCELLED; this.errMsg = errMsg; this.finishedTimeMs = System.currentTimeMillis(); - Env.getCurrentEnv().getEditLog().logIndexChangeJob(this); + if (!FeConstants.runningUnitTest) { + Env.getCurrentEnv().getEditLog().logIndexChangeJob(this); + } LOG.info("cancel index job {}, err: {}", jobId, errMsg); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 9475b993718e77..f371398f78f016 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2871,7 +2871,9 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o if (LOG.isDebugEnabled()) { LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info); } - Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info); + if (!FeConstants.runningUnitTest) { + Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info); + } // Drop table column stats after light schema change finished. Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, null); @@ -3130,7 +3132,9 @@ public void buildOrDeleteTableInvertedIndices(Database db, OlapTable olapTable, addIndexChangeJob(indexChangeJob); // write edit log - Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob); + if (!FeConstants.runningUnitTest) { + Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob); + } LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}", olapTable.getName(), partitionName, jobId); } // end for partition diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterTableStmt.java index 10f1d121d79ea2..9ef73f792cf36c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterTableStmt.java @@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -74,6 +75,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException { // disallow external catalog Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName()); + if (FeConstants.runningUnitTest) { + return; + } // check access if (!Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), dbTableName.getCtl(), dbTableName.getDb(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index a4bbe763f60946..4010a9b564d0a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -143,6 +143,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) { + (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "") + ", backendId: " + backend + ", signature: " + signature; task.setErrorMsg(errMsg); + task.setErrorCode(taskStatus.getStatusCode()); // We start to let FE perceive the task's error msg if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD && taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java index 0ba998b3808f0f..1294b408754e73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java @@ -19,6 +19,7 @@ import org.apache.doris.common.Config; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTaskType; public abstract class AgentTask { @@ -36,6 +37,7 @@ public abstract class AgentTask { protected int failedTimes; protected String errorMsg; + protected TStatusCode errorCode; // some of process may use this member to check if the task is finished. // some of are not. // so whether the task is finished depends on caller's logic, not the value of this member. @@ -126,6 +128,14 @@ public String getErrorMsg() { return errorMsg; } + public TStatusCode getErrorCode() { + return errorCode; + } + + public void setErrorCode(TStatusCode errorCode) { + this.errorCode = errorCode; + } + public void setFinished(boolean isFinished) { this.isFinished = isFinished; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java index 6ea0934854c2b9..bd68d87f1919c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java @@ -156,7 +156,7 @@ public static synchronized void updateTask(long backendId, TTaskType type, long // this is just for unit test public static synchronized List getTask(TTaskType type) { List res = Lists.newArrayList(); - for (Map agentTasks : tasks.column(TTaskType.ALTER).values()) { + for (Map agentTasks : tasks.column(type).values()) { res.addAll(agentTasks.values()); } return res; diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/IndexChangeJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/IndexChangeJobTest.java new file mode 100644 index 00000000000000..5a4ce9347d4e31 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/IndexChangeJobTest.java @@ -0,0 +1,555 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.alter; + +import org.apache.doris.analysis.AccessTestUtil; +import org.apache.doris.analysis.AlterClause; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BuildIndexClause; +import org.apache.doris.analysis.CancelAlterTableStmt; +import org.apache.doris.analysis.CreateIndexClause; +import org.apache.doris.analysis.DropIndexClause; +import org.apache.doris.analysis.IndexDef; +import org.apache.doris.analysis.ShowAlterStmt; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FakeEditLog; +import org.apache.doris.catalog.FakeEnv; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.OlapTable.OlapTableState; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Partition.PartitionState; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TableProperty; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.transaction.FakeTransactionIDGenerator; +import org.apache.doris.transaction.GlobalTransactionMgr; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class IndexChangeJobTest { + + private static String fileName = "./IndexChangeJobTest"; + + private static FakeEditLog fakeEditLog; + private static FakeEnv fakeEnv; + private static FakeTransactionIDGenerator fakeTransactionIDGenerator; + private static GlobalTransactionMgr masterTransMgr; + private static GlobalTransactionMgr slaveTransMgr; + private static Env masterEnv; + private static Env slaveEnv; + + private static Analyzer analyzer; + private static Database db; + private static OlapTable olapTable; + private static CreateIndexClause createIndexClause; + private static BuildIndexClause buildIndexClause; + private static DropIndexClause dropIndexClause; + private static CancelAlterTableStmt cancelAlterTableStmt; + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Before + public void setUp() + throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, + NoSuchMethodException, SecurityException, UserException { + FeConstants.runningUnitTest = true; + FakeEnv.setMetaVersion(FeMetaVersion.VERSION_CURRENT); + fakeEditLog = new FakeEditLog(); + fakeEnv = new FakeEnv(); + fakeTransactionIDGenerator = new FakeTransactionIDGenerator(); + masterEnv = CatalogTestUtil.createTestCatalog(); + slaveEnv = CatalogTestUtil.createTestCatalog(); + masterTransMgr = (GlobalTransactionMgr) masterEnv.getGlobalTransactionMgr(); + masterTransMgr.setEditLog(masterEnv.getEditLog()); + slaveTransMgr = (GlobalTransactionMgr) slaveEnv.getGlobalTransactionMgr(); + slaveTransMgr.setEditLog(slaveEnv.getEditLog()); + analyzer = AccessTestUtil.fetchAdminAnalyzer(false); + db = masterEnv.getInternalCatalog().getDbOrDdlException(CatalogTestUtil.testDbId1); + olapTable = (OlapTable) db.getTableOrDdlException(CatalogTestUtil.testTableId1); + + // set mow table property + Map properties = Maps.newHashMap(); + properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false"); + TableProperty tableProperty = new TableProperty(properties); + olapTable.setTableProperty(tableProperty); + + TableName tableName = new TableName(masterEnv.getInternalCatalog().getName(), db.getName(), olapTable.getName()); + IndexDef indexDef = new IndexDef("index1", false, Lists.newArrayList(olapTable.getBaseSchema().get(1).getName()), IndexDef.IndexType.INVERTED, Maps.newHashMap(), "balabala"); + createIndexClause = new CreateIndexClause(tableName, indexDef, false); + createIndexClause.analyze(analyzer); + + buildIndexClause = new BuildIndexClause(tableName, indexDef, false); + buildIndexClause.analyze(analyzer); + + dropIndexClause = new DropIndexClause("index1", false, tableName, false); + dropIndexClause.analyze(analyzer); + + cancelAlterTableStmt = new CancelAlterTableStmt(ShowAlterStmt.AlterType.INDEX, tableName); + cancelAlterTableStmt.analyze(analyzer); + + AgentTaskQueue.clearAllTasks(); + } + + @Test + public void testCreateIndexIndexChange() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(0, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + } + + @Test + public void testBuildIndexIndexChange() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + } + + @Test + public void testDropIndexIndexChange() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(dropIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + Assert.assertEquals(olapTable.getIndexes().size(), 0); + } + + @Test + // start a build index job, then normally finish it + public void testBuildIndexIndexChangeNormal() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + // finish alter tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState()); + } + + @Test + // start a drop index job, then normally finish it + public void testDropIndexIndexChangeNormal() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(dropIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + // finish alter tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState()); + } + + @Test + public void testCancelBuildIndexIndexChangeNormal() throws UserException { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + // cancel build index job + schemaChangeHandler.cancel(cancelAlterTableStmt); + + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(0, tasks.size()); + Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState()); + } + + @Test + public void testBuildIndexIndexChangeWhileTableNotStable() throws Exception { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + olapTable.setState(OlapTableState.SCHEMA_CHANGE); + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("errCode = 2, detailMessage = Table[testTable1]'s state(SCHEMA_CHANGE) is not NORMAL. Do not allow doing ALTER ops"); + schemaChangeHandler.process(alterClauses, db, olapTable); + + olapTable.setState(OlapTableState.NORMAL); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + Assert.assertEquals(IndexState.NORMAL, baseIndex.getState()); + Assert.assertEquals(PartitionState.NORMAL, testPartition.getState()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List replicas = baseTablet.getReplicas(); + Replica replica2 = replicas.get(1); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job, set replica2 to clone + replica2.setState(Replica.ReplicaState.CLONE); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + + // rerun waiting txn job, set replica2 to normal + replica2.setState(Replica.ReplicaState.NORMAL); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + // finish alter tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState()); + } + + @Test + public void testDropIndexIndexChangeWhileTableNotStable() throws Exception { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + olapTable.setState(OlapTableState.SCHEMA_CHANGE); + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("errCode = 2, detailMessage = Table[testTable1]'s state(SCHEMA_CHANGE) is not NORMAL. Do not allow doing ALTER ops"); + schemaChangeHandler.process(alterClauses, db, olapTable); + + olapTable.setState(OlapTableState.NORMAL); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(dropIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + Assert.assertEquals(IndexState.NORMAL, baseIndex.getState()); + Assert.assertEquals(PartitionState.NORMAL, testPartition.getState()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List replicas = baseTablet.getReplicas(); + Replica replica2 = replicas.get(1); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job, set replica2 to clone + replica2.setState(Replica.ReplicaState.CLONE); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + + // rerun waiting txn job, set replica2 to normal + replica2.setState(Replica.ReplicaState.NORMAL); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + // finish alter tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState()); + } + + @Test + public void testBuildIndexFailedWithMinFailedNum() throws Exception { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + + // if one task failed, the job should be failed + // if task error is not OBTAIN_LOCK_FAILED, the job should be failed after MIN_FAILED_NUM = 3 times + AgentTask agentTask = tasks.get(0); + agentTask.setErrorCode(TStatusCode.IO_ERROR); + Assert.assertEquals(agentTask.getFailedTimes(), 0); + for (int i = 0; i < IndexChangeJob.MIN_FAILED_NUM; i++) { + agentTask.failed(); + schemaChangeHandler.runAfterCatalogReady(); + if (i < IndexChangeJob.MIN_FAILED_NUM - 1) { + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + } + } + Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState()); + } + + @Test + public void testBuildIndexFailedWithMaxFailedNum() throws Exception { + fakeEnv = new FakeEnv(); + fakeEditLog = new FakeEditLog(); + FakeEnv.setEnv(masterEnv); + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(createIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Assert.assertEquals(olapTable.getIndexes().size(), 1); + Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1"); + alterClauses.clear(); + alterClauses.add(buildIndexClause); + schemaChangeHandler.process(alterClauses, db, olapTable); + Map indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs(); + Assert.assertEquals(1, indexChangeJobMap.size()); + Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState()); + + IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get(); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0); + + Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState()); + // run waiting txn job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3); + // run running job + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX); + Assert.assertEquals(3, tasks.size()); + + // if one task failed, the job should be failed + // if task error is OBTAIN_LOCK_FAILED, the job should be failed after MAX_FAILED_NUM = 10 times + AgentTask agentTask = tasks.get(0); + agentTask.setErrorCode(TStatusCode.OBTAIN_LOCK_FAILED); + Assert.assertEquals(agentTask.getFailedTimes(), 0); + for (int i = 0; i < IndexChangeJob.MAX_FAILED_NUM; i++) { + agentTask.failed(); + schemaChangeHandler.runAfterCatalogReady(); + if (i < IndexChangeJob.MAX_FAILED_NUM - 1) { + Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState()); + } + } + Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java index fe96259bdb30fc..838424dd901477 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java @@ -89,5 +89,12 @@ public void testNormal() throws UserException, AnalysisException { Assert.assertEquals("CANCEL ALTER ROLLUP FROM `testDb`.`testTbl`", stmt.toString()); Assert.assertEquals("testDb", stmt.getDbName()); Assert.assertEquals(AlterType.ROLLUP, stmt.getAlterType()); + + stmt = new CancelAlterTableStmt(AlterType.INDEX, + new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, null, "testTbl")); + stmt.analyze(analyzer); + Assert.assertEquals("CANCEL ALTER INDEX FROM `testDb`.`testTbl`", stmt.toString()); + Assert.assertEquals("testDb", stmt.getDbName()); + Assert.assertEquals(AlterType.INDEX, stmt.getAlterType()); } } diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 0b40545ead0823..b311b94d48c638 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -104,6 +104,8 @@ enum TStatusCode { NOT_MASTER = 73, + OBTAIN_LOCK_FAILED = 74, + // used for cloud DELETE_BITMAP_LOCK_ERROR = 100, // Not be larger than 200, see status.h