Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-2.1: [fix](tabletScheduler) Fix addTablet dead lock in tabletScheduler #45298 #45769

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ Status EngineCloneTask::_do_clone() {
auto duration = std::chrono::milliseconds(dp->param("duration", 10 * 1000));
std::this_thread::sleep_for(duration);
});

DBUG_EXECUTE_IF("EngineCloneTask.failed_clone", {
LOG_WARNING("EngineCloneTask.failed_clone")
.tag("tablet_id", _clone_req.tablet_id)
.tag("replica_id", _clone_req.replica_id)
.tag("version", _clone_req.version);
return Status::InternalError(
"in debug point, EngineCloneTask.failed_clone tablet={}, replica={}, version={}",
_clone_req.tablet_id, _clone_req.replica_id, _clone_req.version);
});
Status status = Status::OK();
string src_file_path;
TBackend src_host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/*
* TabletSchedCtx contains all information which is created during tablet scheduler processing.
*/
public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private static final Logger LOG = LogManager.getLogger(TabletSchedCtx.class);

/*
* SCHED_FAILED_COUNTER_THRESHOLD:
* threshold of times a tablet failed to be scheduled
*
* MIN_ADJUST_PRIORITY_INTERVAL_MS:
* min interval time of adjusting a tablet's priority
*
* MAX_NOT_BEING_SCHEDULED_INTERVAL_MS:
* max gap time of a tablet NOT being scheduled.
*
* These 3 params is for adjusting priority.
* If a tablet being scheduled failed for more than SCHED_FAILED_COUNTER_THRESHOLD times, its priority
* will be downgraded. And the interval between adjustment is larger than MIN_ADJUST_PRIORITY_INTERVAL_MS,
* to avoid being downgraded too soon.
* And if a tablet is not being scheduled longer than MAX_NOT_BEING_SCHEDULED_INTERVAL_MS, its priority
* will be upgraded, to avoid starvation.
*
*/
private static final int SCHED_FAILED_COUNTER_THRESHOLD = 5;
private static final long MIN_ADJUST_PRIORITY_INTERVAL_MS = 5 * 60 * 1000L; // 5 min
private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 * 1000L; // 30 min

/*
* A clone task timeout is between Config.min_clone_task_timeout_sec and Config.max_clone_task_timeout_sec,
* estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND.
Expand Down Expand Up @@ -450,10 +429,6 @@ public void setSchedFailedCode(SubCode code) {
schedFailedCode = code;
}

public CloneTask getCloneTask() {
return cloneTask;
}

public long getCopySize() {
return copySize;
}
Expand Down Expand Up @@ -932,12 +907,14 @@ public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTabl
}
if (cloneTask != null) {
AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature());
cloneTask = null;

// clear all CLONE replicas
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
Table table = db.getTableNullable(tblId);
if (table != null && table.writeLockIfExist()) {
// try get table write lock, if failed TabletScheduler will try next time
if (table != null && table.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
try {
List<Replica> cloneReplicas = Lists.newArrayList();
tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@
public class TabletScheduler extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TabletScheduler.class);

// handle at most BATCH_NUM tablets in one loop
private static final int MIN_BATCH_NUM = 50;

// the minimum interval of updating cluster statistics and priority of tablet info
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s

Expand Down Expand Up @@ -150,7 +147,7 @@ public enum AddResult {
ADDED, // success to add
ALREADY_IN, // already added, skip
LIMIT_EXCEED, // number of pending tablets exceed the limit
REPLACE_ADDED, // succ to add, and envit a lowest task
REPLACE_ADDED, // succ to add, and evict a lowest task
DISABLED // scheduler has been disabled.
}

Expand Down Expand Up @@ -285,7 +282,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) {
addResult = AddResult.REPLACE_ADDED;
pendingTablets.pollLast();
finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
"envit lower priority sched tablet because pending queue is full");
"evict lower priority sched tablet because pending queue is full");
}

if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
Expand Down Expand Up @@ -1845,9 +1842,9 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
tabletCtx.increaseFailedRunningCounter();
if (!tabletCtx.isExceedFailedRunningLimit()) {
stat.counterCloneTaskFailed.incrementAndGet();
tabletCtx.setState(TabletSchedCtx.State.PENDING);
tabletCtx.releaseResource(this);
tabletCtx.resetFailedSchedCounter();
tabletCtx.setState(TabletSchedCtx.State.PENDING);
addBackToPendingTablets(tabletCtx);
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class TabletHealthTest extends TestWithFeService {
Expand Down Expand Up @@ -78,6 +80,8 @@ protected void runBeforeAll() throws Exception {

@Override
protected void runBeforeEach() throws Exception {
// set back to default value
Config.max_scheduling_tablets = 2000;
for (Table table : db.getTables()) {
dropTable(table.getName(), true);
}
Expand Down Expand Up @@ -358,4 +362,52 @@ public void testColocateTabletHealth() throws Exception {

dropTable(table.getName(), true);
}

@Test
public void testAddTabletNoDeadLock() throws Exception {
Config.max_scheduling_tablets = 1;
createTable("CREATE TABLE tbl3 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 2"
+ " PROPERTIES ('replication_num' = '3')");
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.failed");
OlapTable table = (OlapTable) db.getTableOrMetaException("tbl3");
Partition partition = table.getPartitions().iterator().next();
List<Tablet> tablets = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next().getTablets();
Assertions.assertEquals(2, tablets.size());

partition.updateVisibleVersion(10L);
tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> replica.updateVersion(10)));

Tablet tabletA = tablets.get(0);
Tablet tabletB = tablets.get(1);
TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler();
tabletA.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
checkTabletStatus(tabletA, TabletStatus.VERSION_INCOMPLETE, table, partition);
Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady();
Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady();
Thread.sleep(1000);
MinMaxPriorityQueue<TabletSchedCtx> queue = scheduler.getPendingTabletQueue();
TabletSchedCtx tabletACtx = queue.peekFirst();
Assertions.assertNotNull(tabletACtx);
tabletACtx.setLastVisitedTime(System.currentTimeMillis() + 3600 * 1000L);
tabletB.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
checkTabletStatus(tabletB, TabletStatus.VERSION_INCOMPLETE, table, partition);
Thread thread = new Thread(() -> {
try {
Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady();
Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady();
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
Thread.sleep(1000);
Assertions.assertTrue(table.tryWriteLock(2, TimeUnit.SECONDS));
table.writeUnlock();
DebugPointUtil.clearDebugPoints();
doRepair();
Thread.sleep(1000);
doRepair();
checkTabletIsHealth(tabletA, table, partition);
checkTabletIsHealth(tabletB, table, partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.thrift.TException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -293,6 +294,10 @@ private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest fin
tabletInfo.setPathHash(pathHash);
tabletInfo.setUsed(true);
tabletInfos.add(tabletInfo);
if (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.failed")) {
finishTaskRequest.setTaskStatus(new TStatus(TStatusCode.CANCELLED));
finishTaskRequest.getTaskStatus().setErrorMsgs(Collections.singletonList("debug point set"));
}
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}

Expand Down
Loading