Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 13, 2024
1 parent 559a2f9 commit e37070c
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 19 deletions.
34 changes: 34 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.plsql.Exec;
Expand Down Expand Up @@ -168,6 +171,7 @@ public enum ConnectType {
protected volatile ConnectScheduler connectScheduler;
// Executor
protected volatile StmtExecutor executor;
protected volatile ArrayList<StmtExecutor> returnResultFromRemoteExecutor = new ArrayList<>();
// Command this connection is processing.
protected volatile MysqlCommand command;
// Timestamp in millisecond last command starts at
Expand Down Expand Up @@ -810,6 +814,36 @@ public StmtExecutor getExecutor() {
return executor;
}

public void addReturnResultFromRemoteExecutor(StmtExecutor executor) {
this.returnResultFromRemoteExecutor.add(executor);
}

public void finalizeArrowFlightSqlRequest() {
setThreadLocalInfo();
if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain()
&& (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
|| executor.getParsedStmt() instanceof InsertStmt)) {
executor.updateProfile(true);
if (statsErrorEstimator != null) {
statsErrorEstimator.updateProfile(ConnectContext.get().queryId());
}
}

for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
}
returnResultFromRemoteExecutor.clear();
// In most cases, `executor.finalizeQuery` is redundant and will be skipped directly.
// Because the query returning results from BE will execute `returnResultFromRemoteExecutor.finalizeQuery`,
// and the query returning results from FE and `insert into` will call unregisterQuery after execute.
executor.finalizeQuery();

remove();
setCommand(MysqlCommand.COM_SLEEP);
clear();
}

public void clear() {
executor = null;
statementContext = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -106,7 +105,6 @@ public enum ConnectType {
protected final ConnectContext ctx;
protected StmtExecutor executor = null;
protected ConnectType connectType;
protected ArrayList<StmtExecutor> returnResultFromRemoteExecutor = new ArrayList<>();

public ConnectProcessor(ConnectContext context) {
this.ctx = context;
Expand Down Expand Up @@ -360,7 +358,7 @@ public void executeQuery(String originStmt) throws Exception {
}
} else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
if (!ctx.isReturnResultFromLocal()) {
returnResultFromRemoteExecutor.add(executor);
ctx.addReturnResultFromRemoteExecutor(executor);
}
Preconditions.checkState(ctx.getFlightSqlChannel().resultNum() <= 1);
if (ctx.getFlightSqlChannel().resultNum() == 1 && i != stmts.size() - 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.profile.ProfileManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
Expand Down Expand Up @@ -269,6 +270,9 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e);
return result;
}
if (params.isDone() && info.getConnectContext().getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
info.getConnectContext().finalizeArrowFlightSqlRequest();
}
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen
String[] handleParts = handle.split(":");
String executedPeerIdentity = handleParts[0];
String queryId = handleParts[1];
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity, true);
try {
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(
Expand Down Expand Up @@ -173,7 +173,12 @@ public void closePreparedStatement(final ActionClosePreparedStatementRequest req
String[] handleParts = request.getPreparedStatementHandle().toStringUtf8().split(":");
String executedPeerIdentity = handleParts[0];
String preparedStatementId = handleParts[1];
flightSessionsManager.getConnectContext(executedPeerIdentity).removePreparedQuery(preparedStatementId);
ConnectContext ctx = flightSessionsManager.getConnectContext(executedPeerIdentity, false);
// If ConnectContext does not exist, it means that the Arrow Flight connection has been closed by FE.
// nothing needs to be done and the ADBC client does not need to know.
if (ctx != null) {
ctx.removePreparedQuery(preparedStatementId);
}
} catch (final Exception e) {
listener.onError(e);
return;
Expand Down Expand Up @@ -290,7 +295,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
@Override
public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context,
final FlightDescriptor descriptor) {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity(), true);
return executeQueryStatement(context.peerIdentity(), connectContext, request.getQuery(), descriptor);
}

Expand All @@ -300,7 +305,7 @@ public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQ
String[] handleParts = command.getPreparedStatementHandle().toStringUtf8().split(":");
String executedPeerIdentity = handleParts[0];
String preparedStatementId = handleParts[1];
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity, true);
return executeQueryStatement(executedPeerIdentity, connectContext,
connectContext.getPreparedQuery(preparedStatementId), descriptor);
}
Expand Down Expand Up @@ -339,7 +344,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
// if the server raises any error except for NotImplemented it will fail. (If it gets NotImplemented,
// it will ignore and execute without a prepared statement.) see: https://github.com/apache/arrow/issues/38786
executorService.submit(() -> {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity(), true);
try {
connectContext.setCommand(MysqlCommand.COM_QUERY);
final String query = request.getQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
Expand Down Expand Up @@ -194,13 +193,6 @@ public void fetchArrowFlightSchema(int timeoutMs) {

@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
ctx.clear();
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
}
returnResultFromRemoteExecutor.clear();
executor.finalizeQuery();
ConnectContext.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ public interface FlightSessionsManager {
* <p>
*
* @param peerIdentity identity after authorization
* @param createIfNotExist if true, create new one if not exist
* @return The ConnectContext or null if no sessionId is given.
*/
ConnectContext getConnectContext(String peerIdentity);
ConnectContext getConnectContext(String peerIdentity, boolean createIfNotExist);

/**
* Creates a ConnectContext object and store it in the local cache, assuming that peerIdentity was already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public FlightSessionsWithTokenManager(FlightTokenManager flightTokenManager) {
}

@Override
public ConnectContext getConnectContext(String peerIdentity) {
public ConnectContext getConnectContext(String peerIdentity, boolean createIfNotExist) {
try {
ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
if (null == connectContext) {
if (null == connectContext && createIfNotExist) {
connectContext = createConnectContext(peerIdentity);
return connectContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,19 @@ private static void connectAndExecute(Configuration configuration, LoadArrowBatc
// executeQuery, two steps:
// 1. Execute Query and get returned FlightInfo;
// 2. Create FlightInfoReader to sequentially traverse each Endpoint;
// 3. Data prefetching: Before manually create ArrowReader, will call Endpoint reader next to cache some
// data. After create ArrowReader, call loadNextBatch continues to fetch the remaining data.
QueryResult queryResult = stmt.executeQuery();
ArrowReader reader = queryResult.getReader();
loadArrowReader.load(reader);
// Note: The time cost may be much longer than the actual query execution time.
// You may get `cost: 700ms` here, but the query time in Doris FE's query profile is only `20ms`.
// It may be that the process of Java generating QueryResult is too slow,
// Does Arrow Flight do row-column conversion when generating QueryResult?
// Because the time taken to generate ResultSet is similar to that of Jdbc DriverManager,
// is Arrow cheating us? :) which is worth further research.
// In addition, the speed of AdbcStatement/QueryResult generated by JdbcDriver seems to be
// faster than FlightSqlDriver, this seems like a joke.
System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start));

reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ private static void connectAndExecute(Configuration configuration, LoadArrowBatc
AdbcStatement.QueryResult queryResult = stmt.executeQuery();
ArrowReader reader = queryResult.getReader();
loadArrowReader.load(reader);
// Note: The time cost may be much longer than the actual query execution time.
// You may get `cost: 700ms` here, but the query time in Doris FE's query profile is only `20ms`.
// It may be that the process of Java generating QueryResult is too slow,
// Does Arrow Flight do row-column conversion when generating QueryResult?
// Because the time taken to generate ResultSet is similar to that of Jdbc DriverManager,
// is Arrow cheating us? :) which is worth further research.
// In addition, the speed of AdbcStatement/QueryResult generated by JdbcDriver seems to be
// faster than FlightSqlDriver, this seems like a joke.
System.out.printf("> cost: %d ms.\n\n", (System.currentTimeMillis() - start));

reader.close();
Expand Down

0 comments on commit e37070c

Please sign in to comment.