Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 13, 2024
1 parent ffa02e3 commit acaad91
Showing 1 changed file with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.doris.service.arrowflight;

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -45,6 +49,7 @@
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -193,14 +198,34 @@ public void fetchArrowFlightSchema(int timeoutMs) {
}

@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
ctx.clear();
public void finalizeCommand() throws IOException {
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
}
returnResultFromRemoteExecutor.clear();
// In most cases, `executor.finalizeQuery` is redundant and will be skipped directly.
// Because the query returning results from BE will execute `returnResultFromRemoteExecutor.finalizeQuery`,
// and the query returning results from FE and `insert into` will call unregisterQuery after execute.
executor.finalizeQuery();

if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain()
&& (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
|| executor.getParsedStmt() instanceof InsertStmt)) {
executor.updateProfile(true);
StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator();
if (statsErrorEstimator != null) {
statsErrorEstimator.updateProfile(ConnectContext.get().queryId());
}
}

ctx.setCommand(MysqlCommand.COM_SLEEP);
ctx.clear();
}

@Override
public void close() throws Exception {
finalizeCommand();
ConnectContext.remove();
}
}

0 comments on commit acaad91

Please sign in to comment.