From 5caf01d32053f245991a844ff8cfe852911ce411 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 25 Oct 2023 22:22:31 +0800 Subject: [PATCH] 3 --- .../org/apache/doris/qe/ConnectContext.java | 13 ++-- .../org/apache/doris/qe/ConnectProcessor.java | 6 +- .../java/org/apache/doris/qe/Coordinator.java | 10 ++- .../org/apache/doris/qe/StmtExecutor.java | 8 +- .../arrowflight/DorisFlightSqlProducer.java | 46 ++++++++---- .../FlightSQLConnectProcessor.java | 1 + .../arrowflight/MysqlChannelToFlightSql.java | 74 +++++++++++++++++++ 7 files changed, 129 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/MysqlChannelToFlightSql.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index ae65b8984447edd..dd1404af9fac847 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; +import org.apache.doris.service.arrowflight.MysqlChannelToFlightSql; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; @@ -199,7 +200,7 @@ public enum ConnectType { private TNetworkAddress resultInternalServiceAddr; private ArrayList resultOutputExprs; private TUniqueId finstId; - private boolean waitQueryResult = true; + private boolean waitSyncQueryResult = true; public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { @@ -303,12 +304,12 @@ public TUniqueId getFinstId() { return finstId; } - public void setWaitQueryResult(boolean waitQueryResult) { - this.waitQueryResult = waitQueryResult; + public void setWaitSyncQueryResult(boolean waitSyncQueryResult) { + this.waitSyncQueryResult = waitSyncQueryResult; } - public boolean isWaitQueryResult() { - return waitQueryResult; + public boolean isWaitSyncQueryResult() { + return waitSyncQueryResult; } public static ConnectContext get() { @@ -350,7 +351,7 @@ public ConnectContext(String peerIdentity) { returnRows = 0; isKilled = false; sessionVariable = VariableMgr.newSessionVariable(); - mysqlChannel = new DummyMysqlChannel(); + mysqlChannel = new MysqlChannelToFlightSql(); command = MysqlCommand.COM_SLEEP; if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 47aa700155c825a..9f1ee09b78b8cba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -253,12 +253,12 @@ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) { parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); executor = new StmtExecutor(ctx, parsedStmt); ctx.setExecutor(executor); - if (!ctx.isWaitQueryResult()) { - asynQueryResultExecutor.add(executor); - } try { executor.execute(); + if (!ctx.isWaitSyncQueryResult()) { + asynQueryResultExecutor.add(executor); + } if (i != stmts.size() - 1) { if (connectType.equals(ConnectType.MYSQL)) { ctx.getState().mysqlServerStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 89b7e4bc1a41326..60640e8f395c272 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -604,10 +604,12 @@ public void exec() throws Exception { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId, addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline); - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); - context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); - context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); - context.setResultOutputExprs(fragments.get(0).getOutputExprs()); + if (!context.isWaitSyncQueryResult()) { + context.setFinstId(topParams.instanceExecParams.get(0).instanceId); + context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); + context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); + context.setResultOutputExprs(fragments.get(0).getOutputExprs()); + } if (LOG.isDebugEnabled()) { LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 01bcc123fff1ccf..8437d5ebe0a6e91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -135,6 +135,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; +import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -653,7 +654,7 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception { throw e; } } finally { - if (context.isWaitQueryResult()) { + if (context.isWaitSyncQueryResult()) { finalizeQuery(); } } @@ -735,6 +736,9 @@ public void executeByLegacy(TUniqueId queryId) throws Exception { // sql/sqlHash block checkBlockRules(); if (parsedStmt instanceof QueryStmt) { + if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + context.setWaitSyncQueryResult(false); + } handleQueryWithRetry(queryId); } else if (parsedStmt instanceof SetStmt) { handleSetStmt(); @@ -1451,7 +1455,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable } } - if (!context.isWaitQueryResult()) { + if (!context.isWaitSyncQueryResult()) { profile.getSummaryProfile().setTempStartTime(); if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index f27ca6a4a73cb2b..66fd75610631f03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -20,6 +20,7 @@ package org.apache.doris.service.arrowflight; +import org.apache.doris.catalog.Env; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.MysqlCommand; @@ -29,6 +30,7 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import static java.util.Arrays.asList; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.Criteria; import org.apache.arrow.flight.FlightDescriptor; @@ -63,6 +65,9 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.types.pojo.ArrowType.Bool; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -111,27 +116,40 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi ConnectContext connectContext = null; try { connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); - connectContext.setWaitQueryResult(false); final String query = request.getQuery(); final FlightSQLConnectProcessor flightSQLConnectProcessor = new FlightSQLConnectProcessor(connectContext); flightSQLConnectProcessor.handleQuery(query); - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() - .setStatementHandle(ByteString.copyFromUtf8( - DebugUtil.printId(connectContext.getFinstId()) + ":" + query)).build(); - final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); - // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); - List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); - + Ticket ticket; + Location location; Schema schema; - schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); - if (schema == null) { - throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); + if (connectContext.isWaitSyncQueryResult()) { + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(ByteString.copyFromUtf8(query)).build(); + ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + location = Location.forGrpcInsecure(Env.getCurrentEnv().getSelfNode().getHost(), + Env.getCurrentEnv().getSelfNode().getPort()); + schema = new Schema(asList( + new Field("i", new FieldType(true, new Bool(), null, null), null) + )); + } else { + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(ByteString.copyFromUtf8( + DebugUtil.printId(connectContext.getFinstId()) + ":" + query)).build(); + ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + // TODO Support multiple endpoints. + location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, + connectContext.getResultFlightServerAddr().port); + + schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); + if (schema == null) { + throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); + } } - // TODO Set in BE callback after query end, Client client will not callback by default. + + List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + // TODO Set in BE callback after query end, Client will not callback. connectContext.setCommand(MysqlCommand.COM_SLEEP); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSQLConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSQLConnectProcessor.java index 59c98ae73f49825..53bf2f08601d97e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSQLConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSQLConnectProcessor.java @@ -62,6 +62,7 @@ public FlightSQLConnectProcessor(ConnectContext context) { super(context); connectType = ConnectType.ARROW_FLIGHT_SQL; context.setThreadLocalInfo(); + context.setWaitSyncQueryResult(true); } public void prepare(MysqlCommand command) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/MysqlChannelToFlightSql.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/MysqlChannelToFlightSql.java new file mode 100644 index 000000000000000..f61cdff583dc0ac --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/MysqlChannelToFlightSql.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.mysql.MysqlChannel; +import org.apache.doris.mysql.MysqlSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class MysqlChannelToFlightSql extends MysqlChannel { + public MysqlChannelToFlightSql() { + this.serializer = MysqlSerializer.newInstance(); + } + + @Override + public String getRemoteIp() { + return ""; + } + + @Override + public String getRemoteHostPortString() { + return ""; + } + + @Override + public void close() { + } + + @Override + protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException { + return 0; + } + + @Override + public ByteBuffer fetchOnePacket() throws IOException { + return ByteBuffer.allocate(0); + } + + @Override + public void flush() throws IOException { + } + + @Override + public void sendOnePacket(ByteBuffer packet) throws IOException { + } + + @Override + public void sendAndFlush(ByteBuffer packet) throws IOException { + } + + @Override + public void reset() { + } + + public MysqlSerializer getSerializer() { + return serializer; + } +}