diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 345d7d824a23ff..0a6e314fd523bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -18,12 +18,16 @@ package org.apache.doris.service.arrowflight; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InsertStmt; +import org.apache.doris.analysis.QueryStmt; import org.apache.doris.common.ConnectionException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; @@ -45,6 +49,7 @@ import org.apache.logging.log4j.Logger; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -193,14 +198,34 @@ public void fetchArrowFlightSchema(int timeoutMs) { } @Override - public void close() throws Exception { - ctx.setCommand(MysqlCommand.COM_SLEEP); - ctx.clear(); + public void finalizeCommand() throws IOException { for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) { asynExecutor.finalizeQuery(); } returnResultFromRemoteExecutor.clear(); + // In most cases, `executor.finalizeQuery` is redundant and will be skipped directly. + // Because the query returning results from BE will execute `returnResultFromRemoteExecutor.finalizeQuery`, + // and the query returning results from FE and `insert into` will call unregisterQuery after execute. executor.finalizeQuery(); + + if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain() + && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile + || executor.getParsedStmt() instanceof LogicalPlanAdapter + || executor.getParsedStmt() instanceof InsertStmt)) { + executor.updateProfile(true); + StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator(); + if (statsErrorEstimator != null) { + statsErrorEstimator.updateProfile(ConnectContext.get().queryId()); + } + } + + ctx.setCommand(MysqlCommand.COM_SLEEP); + ctx.clear(); + } + + @Override + public void close() throws Exception { + finalizeCommand(); ConnectContext.remove(); } }