From fe946fa1e63874a565c1b9c1ee29e278b04541c3 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 23 Dec 2024 10:26:31 +0800 Subject: [PATCH] [fix](tabletScheduler) Fix addTablet dead lock in tabletScheduler (#45298) The conditions that need to be met to trigger the bug, with the second condition being somewhat difficult to trigger, are as follows: 1. The number of tablets that need to be fixed exceeds 2000 (in the pending queue); 2. The scheduling of the lowest priority in the pending queue has previously experienced a clone failure, with fewer than 3 failures, and has been put back into the pending queue. Additionally, a new scheduling request that happens to belong to the same table as the previous one has a higher priority than the previous scheduling. The fix is to write the lock trylock in finalize TabletCtx. If the lock cannot be obtained, the current scheduling will fail and the next one will be rescheduled Fix ``` "colocate group clone checker" #7557 daemon prio=5 os_prio=0 cpu=686.24ms elapsed=6719.45s tid=0x00007f3e6c039ab0 nid=0x17b08 waiting on condition [0x00007f3ec77fe000] (1 similar threads) java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.2/Native Method) - parking to wait for <0x000010014d223908> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync) at java.util.concurrent.locks.LockSupport.park(java.base@17.0.2/LockSupport.java:211) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.2/AbstractQueuedSynchronizer.java:715) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.2/AbstractQueuedSynchronizer.java:938) at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(java.base@17.0.2/ReentrantReadWriteLock.java:959) at org.apache.doris.common.lock.MonitoredReentrantReadWriteLock$WriteLock.lock(MonitoredReentrantReadWriteLock.java:98) at org.apache.doris.catalog.Table.writeLockIfExist(Table.java:211) at org.apache.doris.clone.TabletSchedCtx.releaseResource(TabletSchedCtx.java:940) at org.apache.doris.clone.TabletSchedCtx.releaseResource(TabletSchedCtx.java:898) at org.apache.doris.clone.TabletScheduler.releaseTabletCtx(TabletScheduler.java:1743) at org.apache.doris.clone.TabletScheduler.finalizeTabletCtx(TabletScheduler.java:1625) at org.apache.doris.clone.TabletScheduler.addTablet(TabletScheduler.java:287) - locked <0x0000100009429110> (a org.apache.doris.clone.TabletScheduler) at org.apache.doris.clone.ColocateTableCheckerAndBalancer.matchGroups(ColocateTableCheckerAndBalancer.java:563) at org.apache.doris.clone.ColocateTableCheckerAndBalancer.runAfterCatalogReady(ColocateTableCheckerAndBalancer.java:340) at org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) at org.apache.doris.common.util.Daemon.run(Daemon.java:119) ``` --- be/src/olap/task/engine_clone_task.cpp | 10 ++++ .../apache/doris/clone/TabletSchedCtx.java | 31 ++--------- .../apache/doris/clone/TabletScheduler.java | 9 ++-- .../apache/doris/clone/TabletHealthTest.java | 52 +++++++++++++++++++ .../doris/utframe/MockedBackendFactory.java | 5 ++ 5 files changed, 74 insertions(+), 33 deletions(-) diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index a88945ed6755c4..259b1f7108f1d0 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -168,6 +168,16 @@ Status EngineCloneTask::_do_clone() { auto duration = std::chrono::milliseconds(dp->param("duration", 10 * 1000)); std::this_thread::sleep_for(duration); }); + + DBUG_EXECUTE_IF("EngineCloneTask.failed_clone", { + LOG_WARNING("EngineCloneTask.failed_clone") + .tag("tablet_id", _clone_req.tablet_id) + .tag("replica_id", _clone_req.replica_id) + .tag("version", _clone_req.version); + return Status::InternalError( + "in debug point, EngineCloneTask.failed_clone tablet={}, replica={}, version={}", + _clone_req.tablet_id, _clone_req.replica_id, _clone_req.version); + }); Status status = Status::OK(); string src_file_path; TBackend src_host; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 5e29adbb6da817..d8f7bd3fb09e35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -62,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; /* * TabletSchedCtx contains all information which is created during tablet scheduler processing. @@ -69,28 +70,6 @@ public class TabletSchedCtx implements Comparable { private static final Logger LOG = LogManager.getLogger(TabletSchedCtx.class); - /* - * SCHED_FAILED_COUNTER_THRESHOLD: - * threshold of times a tablet failed to be scheduled - * - * MIN_ADJUST_PRIORITY_INTERVAL_MS: - * min interval time of adjusting a tablet's priority - * - * MAX_NOT_BEING_SCHEDULED_INTERVAL_MS: - * max gap time of a tablet NOT being scheduled. - * - * These 3 params is for adjusting priority. - * If a tablet being scheduled failed for more than SCHED_FAILED_COUNTER_THRESHOLD times, its priority - * will be downgraded. And the interval between adjustment is larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, - * to avoid being downgraded too soon. - * And if a tablet is not being scheduled longer than MAX_NOT_BEING_SCHEDULED_INTERVAL_MS, its priority - * will be upgraded, to avoid starvation. - * - */ - private static final int SCHED_FAILED_COUNTER_THRESHOLD = 5; - private static final long MIN_ADJUST_PRIORITY_INTERVAL_MS = 5 * 60 * 1000L; // 5 min - private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 * 1000L; // 30 min - /* * A clone task timeout is between Config.min_clone_task_timeout_sec and Config.max_clone_task_timeout_sec, * estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND. @@ -450,10 +429,6 @@ public void setSchedFailedCode(SubCode code) { schedFailedCode = code; } - public CloneTask getCloneTask() { - return cloneTask; - } - public long getCopySize() { return copySize; } @@ -932,12 +907,14 @@ public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTabl } if (cloneTask != null) { AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature()); + cloneTask = null; // clear all CLONE replicas Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db != null) { Table table = db.getTableNullable(tblId); - if (table != null && table.writeLockIfExist()) { + // try get table write lock, if failed TabletScheduler will try next time + if (table != null && table.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { try { List cloneReplicas = Lists.newArrayList(); tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 724d51c1791925..ba10e5edf862fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -104,9 +104,6 @@ public class TabletScheduler extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletScheduler.class); - // handle at most BATCH_NUM tablets in one loop - private static final int MIN_BATCH_NUM = 50; - // the minimum interval of updating cluster statistics and priority of tablet info private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s @@ -150,7 +147,7 @@ public enum AddResult { ADDED, // success to add ALREADY_IN, // already added, skip LIMIT_EXCEED, // number of pending tablets exceed the limit - REPLACE_ADDED, // succ to add, and envit a lowest task + REPLACE_ADDED, // succ to add, and evict a lowest task DISABLED // scheduler has been disabled. } @@ -285,7 +282,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { addResult = AddResult.REPLACE_ADDED; pendingTablets.pollLast(); finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, - "envit lower priority sched tablet because pending queue is full"); + "evict lower priority sched tablet because pending queue is full"); } if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) { @@ -1845,9 +1842,9 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) tabletCtx.increaseFailedRunningCounter(); if (!tabletCtx.isExceedFailedRunningLimit()) { stat.counterCloneTaskFailed.incrementAndGet(); + tabletCtx.setState(TabletSchedCtx.State.PENDING); tabletCtx.releaseResource(this); tabletCtx.resetFailedSchedCounter(); - tabletCtx.setState(TabletSchedCtx.State.PENDING); addBackToPendingTablets(tabletCtx); return false; } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java index b22925e5d89270..320bff45229fba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java @@ -40,12 +40,14 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class TabletHealthTest extends TestWithFeService { @@ -78,6 +80,8 @@ protected void runBeforeAll() throws Exception { @Override protected void runBeforeEach() throws Exception { + // set back to default value + Config.max_scheduling_tablets = 2000; for (Table table : db.getTables()) { dropTable(table.getName(), true); } @@ -358,4 +362,52 @@ public void testColocateTabletHealth() throws Exception { dropTable(table.getName(), true); } + + @Test + public void testAddTabletNoDeadLock() throws Exception { + Config.max_scheduling_tablets = 1; + createTable("CREATE TABLE tbl3 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 2" + + " PROPERTIES ('replication_num' = '3')"); + DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.failed"); + OlapTable table = (OlapTable) db.getTableOrMetaException("tbl3"); + Partition partition = table.getPartitions().iterator().next(); + List tablets = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next().getTablets(); + Assertions.assertEquals(2, tablets.size()); + + partition.updateVisibleVersion(10L); + tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> replica.updateVersion(10))); + + Tablet tabletA = tablets.get(0); + Tablet tabletB = tablets.get(1); + TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler(); + tabletA.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + checkTabletStatus(tabletA, TabletStatus.VERSION_INCOMPLETE, table, partition); + Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady(); + Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady(); + Thread.sleep(1000); + MinMaxPriorityQueue queue = scheduler.getPendingTabletQueue(); + TabletSchedCtx tabletACtx = queue.peekFirst(); + Assertions.assertNotNull(tabletACtx); + tabletACtx.setLastVisitedTime(System.currentTimeMillis() + 3600 * 1000L); + tabletB.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + checkTabletStatus(tabletB, TabletStatus.VERSION_INCOMPLETE, table, partition); + Thread thread = new Thread(() -> { + try { + Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady(); + Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + Thread.sleep(1000); + Assertions.assertTrue(table.tryWriteLock(2, TimeUnit.SECONDS)); + table.writeUnlock(); + DebugPointUtil.clearDebugPoints(); + doRepair(); + Thread.sleep(1000); + doRepair(); + checkTabletIsHealth(tabletA, table, partition); + checkTabletIsHealth(tabletB, table, partition); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 93ce8b6766ba3e..4e2a9fa056b940 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -83,6 +83,7 @@ import org.apache.thrift.TException; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.BlockingQueue; @@ -293,6 +294,10 @@ private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest fin tabletInfo.setPathHash(pathHash); tabletInfo.setUsed(true); tabletInfos.add(tabletInfo); + if (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.failed")) { + finishTaskRequest.setTaskStatus(new TStatus(TStatusCode.CANCELLED)); + finishTaskRequest.getTaskStatus().setErrorMsgs(Collections.singletonList("debug point set")); + } finishTaskRequest.setFinishTabletInfos(tabletInfos); }