diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index a88945ed6755c4..259b1f7108f1d0 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -168,6 +168,16 @@ Status EngineCloneTask::_do_clone() { auto duration = std::chrono::milliseconds(dp->param("duration", 10 * 1000)); std::this_thread::sleep_for(duration); }); + + DBUG_EXECUTE_IF("EngineCloneTask.failed_clone", { + LOG_WARNING("EngineCloneTask.failed_clone") + .tag("tablet_id", _clone_req.tablet_id) + .tag("replica_id", _clone_req.replica_id) + .tag("version", _clone_req.version); + return Status::InternalError( + "in debug point, EngineCloneTask.failed_clone tablet={}, replica={}, version={}", + _clone_req.tablet_id, _clone_req.replica_id, _clone_req.version); + }); Status status = Status::OK(); string src_file_path; TBackend src_host; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 5e29adbb6da817..d8f7bd3fb09e35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -62,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; /* * TabletSchedCtx contains all information which is created during tablet scheduler processing. @@ -69,28 +70,6 @@ public class TabletSchedCtx implements Comparable { private static final Logger LOG = LogManager.getLogger(TabletSchedCtx.class); - /* - * SCHED_FAILED_COUNTER_THRESHOLD: - * threshold of times a tablet failed to be scheduled - * - * MIN_ADJUST_PRIORITY_INTERVAL_MS: - * min interval time of adjusting a tablet's priority - * - * MAX_NOT_BEING_SCHEDULED_INTERVAL_MS: - * max gap time of a tablet NOT being scheduled. - * - * These 3 params is for adjusting priority. - * If a tablet being scheduled failed for more than SCHED_FAILED_COUNTER_THRESHOLD times, its priority - * will be downgraded. And the interval between adjustment is larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, - * to avoid being downgraded too soon. - * And if a tablet is not being scheduled longer than MAX_NOT_BEING_SCHEDULED_INTERVAL_MS, its priority - * will be upgraded, to avoid starvation. - * - */ - private static final int SCHED_FAILED_COUNTER_THRESHOLD = 5; - private static final long MIN_ADJUST_PRIORITY_INTERVAL_MS = 5 * 60 * 1000L; // 5 min - private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 * 1000L; // 30 min - /* * A clone task timeout is between Config.min_clone_task_timeout_sec and Config.max_clone_task_timeout_sec, * estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND. @@ -450,10 +429,6 @@ public void setSchedFailedCode(SubCode code) { schedFailedCode = code; } - public CloneTask getCloneTask() { - return cloneTask; - } - public long getCopySize() { return copySize; } @@ -932,12 +907,14 @@ public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTabl } if (cloneTask != null) { AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature()); + cloneTask = null; // clear all CLONE replicas Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db != null) { Table table = db.getTableNullable(tblId); - if (table != null && table.writeLockIfExist()) { + // try get table write lock, if failed TabletScheduler will try next time + if (table != null && table.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { try { List cloneReplicas = Lists.newArrayList(); tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 724d51c1791925..ba10e5edf862fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -104,9 +104,6 @@ public class TabletScheduler extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletScheduler.class); - // handle at most BATCH_NUM tablets in one loop - private static final int MIN_BATCH_NUM = 50; - // the minimum interval of updating cluster statistics and priority of tablet info private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s @@ -150,7 +147,7 @@ public enum AddResult { ADDED, // success to add ALREADY_IN, // already added, skip LIMIT_EXCEED, // number of pending tablets exceed the limit - REPLACE_ADDED, // succ to add, and envit a lowest task + REPLACE_ADDED, // succ to add, and evict a lowest task DISABLED // scheduler has been disabled. } @@ -285,7 +282,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { addResult = AddResult.REPLACE_ADDED; pendingTablets.pollLast(); finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, - "envit lower priority sched tablet because pending queue is full"); + "evict lower priority sched tablet because pending queue is full"); } if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) { @@ -1845,9 +1842,9 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) tabletCtx.increaseFailedRunningCounter(); if (!tabletCtx.isExceedFailedRunningLimit()) { stat.counterCloneTaskFailed.incrementAndGet(); + tabletCtx.setState(TabletSchedCtx.State.PENDING); tabletCtx.releaseResource(this); tabletCtx.resetFailedSchedCounter(); - tabletCtx.setState(TabletSchedCtx.State.PENDING); addBackToPendingTablets(tabletCtx); return false; } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java index b22925e5d89270..320bff45229fba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java @@ -40,12 +40,14 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class TabletHealthTest extends TestWithFeService { @@ -78,6 +80,8 @@ protected void runBeforeAll() throws Exception { @Override protected void runBeforeEach() throws Exception { + // set back to default value + Config.max_scheduling_tablets = 2000; for (Table table : db.getTables()) { dropTable(table.getName(), true); } @@ -358,4 +362,52 @@ public void testColocateTabletHealth() throws Exception { dropTable(table.getName(), true); } + + @Test + public void testAddTabletNoDeadLock() throws Exception { + Config.max_scheduling_tablets = 1; + createTable("CREATE TABLE tbl3 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 2" + + " PROPERTIES ('replication_num' = '3')"); + DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.failed"); + OlapTable table = (OlapTable) db.getTableOrMetaException("tbl3"); + Partition partition = table.getPartitions().iterator().next(); + List tablets = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next().getTablets(); + Assertions.assertEquals(2, tablets.size()); + + partition.updateVisibleVersion(10L); + tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> replica.updateVersion(10))); + + Tablet tabletA = tablets.get(0); + Tablet tabletB = tablets.get(1); + TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler(); + tabletA.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + checkTabletStatus(tabletA, TabletStatus.VERSION_INCOMPLETE, table, partition); + Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady(); + Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady(); + Thread.sleep(1000); + MinMaxPriorityQueue queue = scheduler.getPendingTabletQueue(); + TabletSchedCtx tabletACtx = queue.peekFirst(); + Assertions.assertNotNull(tabletACtx); + tabletACtx.setLastVisitedTime(System.currentTimeMillis() + 3600 * 1000L); + tabletB.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + checkTabletStatus(tabletB, TabletStatus.VERSION_INCOMPLETE, table, partition); + Thread thread = new Thread(() -> { + try { + Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady(); + Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + Thread.sleep(1000); + Assertions.assertTrue(table.tryWriteLock(2, TimeUnit.SECONDS)); + table.writeUnlock(); + DebugPointUtil.clearDebugPoints(); + doRepair(); + Thread.sleep(1000); + doRepair(); + checkTabletIsHealth(tabletA, table, partition); + checkTabletIsHealth(tabletB, table, partition); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 93ce8b6766ba3e..4e2a9fa056b940 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -83,6 +83,7 @@ import org.apache.thrift.TException; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.BlockingQueue; @@ -293,6 +294,10 @@ private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest fin tabletInfo.setPathHash(pathHash); tabletInfo.setUsed(true); tabletInfos.add(tabletInfo); + if (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.failed")) { + finishTaskRequest.setTaskStatus(new TStatus(TStatusCode.CANCELLED)); + finishTaskRequest.getTaskStatus().setErrorMsgs(Collections.singletonList("debug point set")); + } finishTaskRequest.setFinishTabletInfos(tabletInfos); }