Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 8, 2024
1 parent 7e3cd4c commit 77f3bda
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 37 deletions.
6 changes: 0 additions & 6 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,6 @@ void set_last_failure_time(Tablet* tablet, const Compaction& compaction, int64_t

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);
~WriteCooldownMetaExecutors() {
for (int i = 0; i < _executors.size(); ++i) {
// _executors[i]->shutdown();
_executors[i]->join();
}
}

static WriteCooldownMetaExecutors* get_instance() {
static WriteCooldownMetaExecutors instance;
Expand Down
54 changes: 24 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -976,16 +976,16 @@ private void sendPipelineCtx() throws TException, RpcException, UserException {
futures = Lists.newArrayList();

for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
// if (LOG.isDebugEnabled()) {
String infos = "";
for (PipelineExecContext pec : ctxs.ctxs) {
infos += pec.fragmentId + " ";
if (LOG.isDebugEnabled()) {
String infos = "";
for (PipelineExecContext pec : ctxs.ctxs) {
infos += pec.fragmentId + " ";
}
if (LOG.isDebugEnabled()) {
LOG.debug("query {}, sending pipeline fragments: {} to be {} bprc address {}",
DebugUtil.printId(queryId), infos, ctxs.beId, ctxs.brpcAddr.toString());
}
}
// if (LOG.isDebugEnabled()) {
LOG.info("query {}, sending pipeline fragments: {} to be {} bprc address {}",
DebugUtil.printId(queryId), infos, ctxs.beId, ctxs.brpcAddr.toString());
// }
// }

ctxs.unsetFields();
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
Expand Down Expand Up @@ -1090,22 +1090,22 @@ private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, Future<
private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServiceProxy,
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
String operation) throws RpcException, UserException {
long currentTimeMillis = System.currentTimeMillis();
long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout();
String msg = String.format(
"timeout before waiting %s rpc, query timeout:%d, already elapsed:%d, left for this:%d",
operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs);
LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
if (!queryOptions.isSetExecutionTimeout() || !queryOptions.isSetQueryTimeout()) {
LOG.warn("Query {} does not set timeout info, execution timeout: is_set:{}, value:{}"
+ ", query timeout: is_set:{}, value: {}, "
+ "coordinator timeout deadline {}, cur time millis: {}",
DebugUtil.printId(queryId),
queryOptions.isSetExecutionTimeout(), queryOptions.getExecutionTimeout(),
queryOptions.isSetQueryTimeout(), queryOptions.getQueryTimeout(),
timeoutDeadline, currentTimeMillis);
}
if (leftTimeMs <= 0) {
long currentTimeMillis = System.currentTimeMillis();
long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout();
String msg = String.format(
"timeout before waiting %s rpc, query timeout:%d, already elapsed:%d, left for this:%d",
operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs);
LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
if (!queryOptions.isSetExecutionTimeout() || !queryOptions.isSetQueryTimeout()) {
LOG.warn("Query {} does not set timeout info, execution timeout: is_set:{}, value:{}"
+ ", query timeout: is_set:{}, value: {}, "
+ "coordinator timeout deadline {}, cur time millis: {}",
DebugUtil.printId(queryId),
queryOptions.isSetExecutionTimeout(), queryOptions.getExecutionTimeout(),
queryOptions.isSetQueryTimeout(), queryOptions.getQueryTimeout(),
timeoutDeadline, currentTimeMillis);
}
throw new UserException(msg);
}

Expand Down Expand Up @@ -1160,12 +1160,6 @@ private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServicePro
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
.increase(1L);
SimpleScheduler.addToBlacklist(triple.getLeft().beId, errMsg);
boolean alive1 = Env.getCurrentSystemInfo().checkBackendAlive(triple.getLeft().beId);
boolean alive2 = Env.getCurrentSystemInfo().checkBackendLoadAvailable(triple.getLeft().beId);
boolean alive3 = Env.getCurrentSystemInfo().checkBackendQueryAvailable(triple.getLeft().beId);
boolean alive4 = Env.getCurrentSystemInfo()
.checkBackendScheduleAvailable(triple.getLeft().beId);
LOG.info("THRIFT_RPC_ERROR exception: " + alive1 + " " + alive2 + " " + alive3 + " " + alive4);
throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
default:
throw new UserException(errMsg, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ suite("test_memtable_flush_fault", "nonConcurrent") {
sql "sync"
} catch (Exception e){
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("[IO_ERROR]dbug_be_memtable_submit_flush_error"))
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
} finally {
GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
}
Expand Down

0 comments on commit 77f3bda

Please sign in to comment.