Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 25, 2023
1 parent a50c383 commit 5caf01d
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 29 deletions.
13 changes: 7 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.MysqlChannelToFlightSql;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -199,7 +200,7 @@ public enum ConnectType {
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private boolean waitQueryResult = true;
private boolean waitSyncQueryResult = true;

public void setUserQueryTimeout(int queryTimeout) {
if (queryTimeout > 0) {
Expand Down Expand Up @@ -303,12 +304,12 @@ public TUniqueId getFinstId() {
return finstId;
}

public void setWaitQueryResult(boolean waitQueryResult) {
this.waitQueryResult = waitQueryResult;
public void setWaitSyncQueryResult(boolean waitSyncQueryResult) {
this.waitSyncQueryResult = waitSyncQueryResult;
}

public boolean isWaitQueryResult() {
return waitQueryResult;
public boolean isWaitSyncQueryResult() {
return waitSyncQueryResult;
}

public static ConnectContext get() {
Expand Down Expand Up @@ -350,7 +351,7 @@ public ConnectContext(String peerIdentity) {
returnRows = 0;
isKilled = false;
sessionVariable = VariableMgr.newSessionVariable();
mysqlChannel = new DummyMysqlChannel();
mysqlChannel = new MysqlChannelToFlightSql();
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,12 @@ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
if (!ctx.isWaitQueryResult()) {
asynQueryResultExecutor.add(executor);
}

try {
executor.execute();
if (!ctx.isWaitSyncQueryResult()) {
asynQueryResultExecutor.add(executor);
}
if (i != stmts.size() - 1) {
if (connectType.equals(ConnectType.MYSQL)) {
ctx.getState().mysqlServerStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
Expand Down
10 changes: 6 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,12 @@ public void exec() throws Exception {
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline);
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
if (!context.isWaitSyncQueryResult()) {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
Expand Down Expand Up @@ -653,7 +654,7 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
throw e;
}
} finally {
if (context.isWaitQueryResult()) {
if (context.isWaitSyncQueryResult()) {
finalizeQuery();
}
}
Expand Down Expand Up @@ -735,6 +736,9 @@ public void executeByLegacy(TUniqueId queryId) throws Exception {
// sql/sqlHash block
checkBlockRules();
if (parsedStmt instanceof QueryStmt) {
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setWaitSyncQueryResult(false);
}
handleQueryWithRetry(queryId);
} else if (parsedStmt instanceof SetStmt) {
handleSetStmt();
Expand Down Expand Up @@ -1451,7 +1455,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
}
}

if (!context.isWaitQueryResult()) {
if (!context.isWaitSyncQueryResult()) {
profile.getSummaryProfile().setTempStartTime();
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.doris.service.arrowflight;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.MysqlCommand;
Expand All @@ -29,6 +30,7 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import static java.util.Arrays.asList;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
Expand Down Expand Up @@ -63,6 +65,9 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -111,27 +116,40 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi
ConnectContext connectContext = null;
try {
connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
connectContext.setWaitQueryResult(false);
final String query = request.getQuery();
final FlightSQLConnectProcessor flightSQLConnectProcessor = new FlightSQLConnectProcessor(connectContext);

flightSQLConnectProcessor.handleQuery(query);

TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query)).build();
final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));

Ticket ticket;
Location location;
Schema schema;
schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
if (connectContext.isWaitSyncQueryResult()) {
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(ByteString.copyFromUtf8(query)).build();
ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
location = Location.forGrpcInsecure(Env.getCurrentEnv().getSelfNode().getHost(),
Env.getCurrentEnv().getSelfNode().getPort());
schema = new Schema(asList(
new Field("i", new FieldType(true, new Bool(), null, null), null)
));
} else {
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query)).build();
ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);

schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
}
}
// TODO Set in BE callback after query end, Client client will not callback by default.

List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
connectContext.setCommand(MysqlCommand.COM_SLEEP);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public FlightSQLConnectProcessor(ConnectContext context) {
super(context);
connectType = ConnectType.ARROW_FLIGHT_SQL;
context.setThreadLocalInfo();
context.setWaitSyncQueryResult(true);
}

public void prepare(MysqlCommand command) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.service.arrowflight;

import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlSerializer;

import java.io.IOException;
import java.nio.ByteBuffer;

public class MysqlChannelToFlightSql extends MysqlChannel {
public MysqlChannelToFlightSql() {
this.serializer = MysqlSerializer.newInstance();
}

@Override
public String getRemoteIp() {
return "";
}

@Override
public String getRemoteHostPortString() {
return "";
}

@Override
public void close() {
}

@Override
protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException {
return 0;
}

@Override
public ByteBuffer fetchOnePacket() throws IOException {
return ByteBuffer.allocate(0);
}

@Override
public void flush() throws IOException {
}

@Override
public void sendOnePacket(ByteBuffer packet) throws IOException {
}

@Override
public void sendAndFlush(ByteBuffer packet) throws IOException {
}

@Override
public void reset() {
}

public MysqlSerializer getSerializer() {
return serializer;
}
}

0 comments on commit 5caf01d

Please sign in to comment.