diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index bfbb15a7a7a7834..5dd3c0d507ba557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -20,9 +20,11 @@ import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DecimalLiteral; import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.StringLiteral; @@ -53,6 +55,7 @@ import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.plsql.Exec; @@ -168,6 +171,7 @@ public enum ConnectType { protected volatile ConnectScheduler connectScheduler; // Executor protected volatile StmtExecutor executor; + protected volatile ArrayList returnResultFromRemoteExecutor = new ArrayList<>(); // Command this connection is processing. protected volatile MysqlCommand command; // Timestamp in millisecond last command starts at @@ -810,6 +814,36 @@ public StmtExecutor getExecutor() { return executor; } + public void addReturnResultFromRemoteExecutor(StmtExecutor executor) { + this.returnResultFromRemoteExecutor.add(executor); + } + + public void finalizeArrowFlightSqlRequest() { + setThreadLocalInfo(); + if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain() + && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile + || executor.getParsedStmt() instanceof LogicalPlanAdapter + || executor.getParsedStmt() instanceof InsertStmt)) { + executor.updateProfile(true); + if (statsErrorEstimator != null) { + statsErrorEstimator.updateProfile(ConnectContext.get().queryId()); + } + } + + for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) { + asynExecutor.finalizeQuery(); + } + returnResultFromRemoteExecutor.clear(); + // In most cases, `executor.finalizeQuery` is redundant and will be skipped directly. + // Because the query returning results from BE will execute `returnResultFromRemoteExecutor.finalizeQuery`, + // and the query returning results from FE and `insert into` will call unregisterQuery after execute. + executor.finalizeQuery(); + + remove(); + setCommand(MysqlCommand.COM_SLEEP); + clear(); + } + public void clear() { executor = null; statementContext = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 0c633186abf8ae2..e8b5c0c5141f9d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -86,7 +86,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -106,7 +105,6 @@ public enum ConnectType { protected final ConnectContext ctx; protected StmtExecutor executor = null; protected ConnectType connectType; - protected ArrayList returnResultFromRemoteExecutor = new ArrayList<>(); public ConnectProcessor(ConnectContext context) { this.ctx = context; @@ -360,7 +358,7 @@ public void executeQuery(String originStmt) throws Exception { } } else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) { if (!ctx.isReturnResultFromLocal()) { - returnResultFromRemoteExecutor.add(executor); + ctx.addReturnResultFromRemoteExecutor(executor); } Preconditions.checkState(ctx.getFlightSqlChannel().resultNum() <= 1); if (ctx.getFlightSqlChannel().resultNum() == 1 && i != stmts.size() - 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index d9b4583d71f20e6..2956cd94572ee93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -26,6 +26,7 @@ import org.apache.doris.common.profile.ProfileManager; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryProfile; @@ -269,6 +270,9 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e); return result; } + if (params.isDone() && info.getConnectContext().getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + info.getConnectContext().finalizeArrowFlightSqlRequest(); + } result.setStatus(new TStatus(TStatusCode.OK)); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index b968ab04c57c83b..4fe2b112eeae048 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -131,7 +131,7 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen String[] handleParts = handle.split(":"); String executedPeerIdentity = handleParts[0]; String queryId = handleParts[1]; - ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity); + ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity, true); try { // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull( @@ -173,7 +173,12 @@ public void closePreparedStatement(final ActionClosePreparedStatementRequest req String[] handleParts = request.getPreparedStatementHandle().toStringUtf8().split(":"); String executedPeerIdentity = handleParts[0]; String preparedStatementId = handleParts[1]; - flightSessionsManager.getConnectContext(executedPeerIdentity).removePreparedQuery(preparedStatementId); + ConnectContext ctx = flightSessionsManager.getConnectContext(executedPeerIdentity, false); + // If ConnectContext does not exist, it means that the Arrow Flight connection has been closed by FE. + // nothing needs to be done and the ADBC client does not need to know. + if (ctx != null) { + ctx.removePreparedQuery(preparedStatementId); + } } catch (final Exception e) { listener.onError(e); return; @@ -290,7 +295,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con @Override public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, final FlightDescriptor descriptor) { - ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity(), true); return executeQueryStatement(context.peerIdentity(), connectContext, request.getQuery(), descriptor); } @@ -300,7 +305,7 @@ public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQ String[] handleParts = command.getPreparedStatementHandle().toStringUtf8().split(":"); String executedPeerIdentity = handleParts[0]; String preparedStatementId = handleParts[1]; - ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity); + ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity, true); return executeQueryStatement(executedPeerIdentity, connectContext, connectContext.getPreparedQuery(preparedStatementId), descriptor); } @@ -339,7 +344,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r // if the server raises any error except for NotImplemented it will fail. (If it gets NotImplemented, // it will ignore and execute without a prepared statement.) see: https://github.com/apache/arrow/issues/38786 executorService.submit(() -> { - ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity(), true); try { connectContext.setCommand(MysqlCommand.COM_QUERY); final String query = request.getQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 345d7d824a23ffb..8c2ba4a37f719c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -28,7 +28,6 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; @@ -194,13 +193,6 @@ public void fetchArrowFlightSchema(int timeoutMs) { @Override public void close() throws Exception { - ctx.setCommand(MysqlCommand.COM_SLEEP); - ctx.clear(); - for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) { - asynExecutor.finalizeQuery(); - } - returnResultFromRemoteExecutor.clear(); - executor.finalizeQuery(); ConnectContext.remove(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java index 2c3a5258cc4612a..bbaac6a6aa745a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java @@ -33,9 +33,10 @@ public interface FlightSessionsManager { *

* * @param peerIdentity identity after authorization + * @param createIfNotExist if true, create new one if not exist * @return The ConnectContext or null if no sessionId is given. */ - ConnectContext getConnectContext(String peerIdentity); + ConnectContext getConnectContext(String peerIdentity, boolean createIfNotExist); /** * Creates a ConnectContext object and store it in the local cache, assuming that peerIdentity was already diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java index b7e5ffa46466a6b..9ecf93c3765a64f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -42,10 +42,10 @@ public FlightSessionsWithTokenManager(FlightTokenManager flightTokenManager) { } @Override - public ConnectContext getConnectContext(String peerIdentity) { + public ConnectContext getConnectContext(String peerIdentity, boolean createIfNotExist) { try { ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity); - if (null == connectContext) { + if (null == connectContext && createIfNotExist) { connectContext = createConnectContext(peerIdentity); return connectContext; } diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java index 3c2202b2b077970..baf90a2800ed786 100644 --- a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightAdbcDriver.java @@ -56,9 +56,19 @@ private static void connectAndExecute(Configuration configuration, LoadArrowBatc // executeQuery, two steps: // 1. Execute Query and get returned FlightInfo; // 2. Create FlightInfoReader to sequentially traverse each Endpoint; + // 3. Data prefetching: Before manually create ArrowReader, will call Endpoint reader next to cache some + // data. After create ArrowReader, call loadNextBatch continues to fetch the remaining data. QueryResult queryResult = stmt.executeQuery(); ArrowReader reader = queryResult.getReader(); loadArrowReader.load(reader); + // Note: The time cost may be much longer than the actual query execution time. + // You may get `cost: 700ms` here, but the query time in Doris FE's query profile is only `20ms`. + // It may be that the process of Java generating QueryResult is too slow, + // Does Arrow Flight do row-column conversion when generating QueryResult? + // Because the time taken to generate ResultSet is similar to that of Jdbc DriverManager, + // is Arrow cheating us? :) which is worth further research. + // In addition, the speed of AdbcStatement/QueryResult generated by JdbcDriver seems to be + // faster than FlightSqlDriver, this seems like a joke. System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start)); reader.close(); diff --git a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java index 1560e5172d7384f..3bc001a8a0e0599 100644 --- a/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java +++ b/samples/arrow-flight-sql/java/src/main/java/doris/arrowflight/demo/FlightJdbcDriver.java @@ -55,6 +55,14 @@ private static void connectAndExecute(Configuration configuration, LoadArrowBatc AdbcStatement.QueryResult queryResult = stmt.executeQuery(); ArrowReader reader = queryResult.getReader(); loadArrowReader.load(reader); + // Note: The time cost may be much longer than the actual query execution time. + // You may get `cost: 700ms` here, but the query time in Doris FE's query profile is only `20ms`. + // It may be that the process of Java generating QueryResult is too slow, + // Does Arrow Flight do row-column conversion when generating QueryResult? + // Because the time taken to generate ResultSet is similar to that of Jdbc DriverManager, + // is Arrow cheating us? :) which is worth further research. + // In addition, the speed of AdbcStatement/QueryResult generated by JdbcDriver seems to be + // faster than FlightSqlDriver, this seems like a joke. System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start)); reader.close();