From 68f98fbf64706c3184785854e82eb4de41815032 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sat, 12 Oct 2024 18:42:24 +0800 Subject: [PATCH] 1 --- .../org/apache/doris/qe/ConnectContext.java | 14 +- .../arrowflight/DorisFlightSqlProducer.java | 120 +++++++++--------- .../FlightSqlConnectProcessor.java | 3 + .../sessions/FlightSqlConnectContext.java | 3 +- 4 files changed, 74 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 579fdeff8e7b18a..68a72656649f67b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -951,7 +951,8 @@ public String getRemoteHostPortString() { // kill operation with no protect. public void kill(boolean killConnection) { - LOG.warn("kill query from {}, kill mysql connection: {}", getRemoteHostPortString(), killConnection); + LOG.warn("kill query from {}, kill {} connection: {}", getRemoteHostPortString(), getConnectType(), + killConnection); if (killConnection) { isKilled = true; @@ -964,8 +965,9 @@ public void kill(boolean killConnection) { // kill operation with no protect by timeout. private void killByTimeout(boolean killConnection) { - LOG.warn("kill query from {}, kill mysql connection: {} reason time out", getRemoteHostPortString(), - killConnection); + LOG.warn("kill query from {}, kill connection is {}, connection type: {}, connectionId: {} reason time out", + getRemoteHostPortString(), getConnectType(), + killConnection, connectionId); if (killConnection) { isKilled = true; @@ -999,8 +1001,10 @@ public void checkTimeout(long now) { if (command == MysqlCommand.COM_SLEEP) { if (delta > sessionVariable.getWaitTimeoutS() * 1000L) { // Need kill this connection. - LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}", - getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); + LOG.warn( + "kill wait timeout connection, connection type: {}, connectionId: {}, remote: {}, " + + "wait timeout: {}", + getConnectType(), connectionId, getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); killFlag = true; killConnection = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 16195469af915f9..5d431b386b7f113 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -186,64 +186,66 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con Preconditions.checkState(!query.isEmpty()); // After the previous query was executed, there was no getStreamStatement to take away the result. connectContext.getFlightSqlChannel().reset(); - final FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext); - - flightSQLConnectProcessor.handleQuery(query); - if (connectContext.getState().getStateType() == MysqlStateType.ERR) { - throw new RuntimeException("after executeQueryStatement handleQuery"); - } - - if (connectContext.isReturnResultFromLocal()) { - // set/use etc. stmt returns an OK result by default. - if (connectContext.getFlightSqlChannel().resultNum() == 0) { - // a random query id and add empty results - String queryId = UUID.randomUUID().toString(); - connectContext.getFlightSqlChannel().addOKResult(queryId, query); + try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) { + flightSQLConnectProcessor.handleQuery(query); + if (connectContext.getState().getStateType() == MysqlStateType.ERR) { + throw new RuntimeException("after executeQueryStatement handleQuery"); + } - final ByteString handle = ByteString.copyFromUtf8(peerIdentity + ":" + queryId); - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) - .build(); - return getFlightInfoForSchema(ticketStatement, descriptor, - connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema()); + if (connectContext.isReturnResultFromLocal()) { + // set/use etc. stmt returns an OK result by default. + if (connectContext.getFlightSqlChannel().resultNum() == 0) { + // a random query id and add empty results + String queryId = UUID.randomUUID().toString(); + connectContext.getFlightSqlChannel().addOKResult(queryId, query); + + final ByteString handle = ByteString.copyFromUtf8(peerIdentity + ":" + queryId); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(handle).build(); + return getFlightInfoForSchema(ticketStatement, descriptor, + connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot() + .getSchema()); + } else { + // A Flight Sql request can only contain one statement that returns result, + // otherwise expected thrown exception during execution. + Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1); + + // The tokens used for authentication between getStreamStatement and getFlightInfoStatement + // are different. So put the peerIdentity into the ticket and then getStreamStatement is used to + // find the correct ConnectContext. + // queryId is used to find query results. + final ByteString handle = ByteString.copyFromUtf8( + peerIdentity + ":" + DebugUtil.printId(connectContext.queryId())); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(handle).build(); + return getFlightInfoForSchema(ticketStatement, descriptor, connectContext.getFlightSqlChannel() + .getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot() + .getSchema()); + } } else { - // A Flight Sql request can only contain one statement that returns result, - // otherwise expected thrown exception during execution. - Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1); - - // The tokens used for authentication between getStreamStatement and getFlightInfoStatement - // are different. So put the peerIdentity into the ticket and then getStreamStatement is used to - // find the correct ConnectContext. - // queryId is used to find query results. - final ByteString handle = ByteString.copyFromUtf8( - peerIdentity + ":" + DebugUtil.printId(connectContext.queryId())); + // Now only query stmt will pull results from BE. + final ByteString handle; + if (connectContext.getSessionVariable().enableParallelResultSink()) { + handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query); + } else { + // only one instance + handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query); + } + Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); + if (schema == null) { + throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") + .toRuntimeException(); + } TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) .build(); - return getFlightInfoForSchema(ticketStatement, descriptor, - connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId())) - .getVectorSchemaRoot().getSchema()); - } - } else { - // Now only query stmt will pull results from BE. - final ByteString handle; - if (connectContext.getSessionVariable().enableParallelResultSink()) { - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query); - } else { - // only one instance - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query); - } - Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); - if (schema == null) { - throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); + Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + // TODO Support multiple endpoints. + Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, + connectContext.getResultFlightServerAddr().port); + List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + // TODO Set in BE callback after query end, Client will not callback. + return new FlightInfo(schema, descriptor, endpoints, -1, -1); } - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) - .build(); - Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); - // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); - List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); - // TODO Set in BE callback after query end, Client will not callback. - return new FlightInfo(schema, descriptor, endpoints, -1, -1); } } catch (Exception e) { String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) @@ -296,8 +298,7 @@ private ActionCreatePreparedStatementResult buildCreatePreparedStatementResult(B final ByteString bytes = Objects.isNull(parameterSchema) ? ByteString.EMPTY : ByteString.copyFrom(serializeMetadata(parameterSchema)); return ActionCreatePreparedStatementResult.newBuilder() - .setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))) - .setParameterSchema(bytes) + .setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes) .setPreparedStatementHandle(handle).build(); } @@ -326,12 +327,11 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r Schema metaData = connectContext.getFlightSqlChannel() .createOneOneSchemaRoot("ResultMeta", "UNIMPLEMENTED").getSchema(); listener.onNext(new Result( - Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData)) - .toByteArray())); + Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData)).toByteArray())); } catch (Exception e) { - String errMsg = "create prepared statement failed, " + e.getMessage() + ", " - + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() - + ", error msg: " + connectContext.getState().getErrorMessage(); + String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage( + e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + + connectContext.getState().getErrorMessage(); LOG.warn(errMsg, e); listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index fe0648a0680ca12..b812bf81d8a5146 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -53,6 +53,8 @@ /** * Process one flgiht sql connection. + * + * Must use try-with-resources. */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); @@ -177,6 +179,7 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { @Override public void close() throws Exception { ctx.setCommand(MysqlCommand.COM_SLEEP); + ctx.clear(); // TODO support query profile for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) { asynExecutor.finalizeQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index 4badae03b3141e9..b90d7505923b713 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -63,6 +63,7 @@ protected void closeChannel() { if (flightSqlChannel != null) { flightSqlChannel.close(); } + connectScheduler.unregisterConnection(this); } // kill operation with no protect. @@ -72,8 +73,8 @@ public void kill(boolean killConnection) { if (killConnection) { isKilled = true; + // Close channel and break connection with client. closeChannel(); - connectScheduler.unregisterConnection(this); } // Now, cancel running query. cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user"));