Skip to content

Commit

Permalink
5
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 3, 2024
1 parent 4c36c97 commit 2e1c6f8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.base.Preconditions;
import com.google.protobuf.Any;
Expand Down Expand Up @@ -247,7 +248,13 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(remoteResultAddrHostName, remoteResultAddrPort);
Location location;
TNetworkAddress publicAccessAddr = flightSQLConnectProcessor.getPublicAccessAddr();
if (publicAccessAddr.isSetHostname()) {
location = Location.forGrpcInsecure(publicAccessAddr.hostname, publicAccessAddr.port);
} else {
location = Location.forGrpcInsecure(resultAddrHostName, resultAddrPort);
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class);
private TNetworkAddress publicAccessAddr;

public FlightSqlConnectProcessor(ConnectContext context) {
super(context);
Expand All @@ -66,6 +67,10 @@ public FlightSqlConnectProcessor(ConnectContext context) {
context.setReturnResultFromLocal(true);
}

public TNetworkAddress getPublicAccessAddr() {
return publicAccessAddr;
}

public void prepare(MysqlCommand command) {
// set status of query to OK.
ctx.getState().reset();
Expand Down Expand Up @@ -133,10 +138,10 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
DebugUtil.printId(tid), resultStatus));
}
if (pResult.hasBeArrowFlightIp()) {
ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8();
publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8();
}
if (pResult.hasBeArrowFlightPort()) {
ctx.getResultFlightServerAddr().port = pResult.getBeArrowFlightPort();
publicAccessAddr.port = pResult.getBeArrowFlightPort();
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
Expand Down

0 comments on commit 2e1c6f8

Please sign in to comment.