From 2e1c6f8a395699c80a0e17e2fad928ca51891057 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sun, 3 Nov 2024 22:09:50 +0800 Subject: [PATCH] 5 --- .../service/arrowflight/DorisFlightSqlProducer.java | 9 ++++++++- .../service/arrowflight/FlightSqlConnectProcessor.java | 9 +++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 817c35679c63522..342174c1f36dd0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Preconditions; import com.google.protobuf.Any; @@ -247,7 +248,13 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con .build(); Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(remoteResultAddrHostName, remoteResultAddrPort); + Location location; + TNetworkAddress publicAccessAddr = flightSQLConnectProcessor.getPublicAccessAddr(); + if (publicAccessAddr.isSetHostname()) { + location = Location.forGrpcInsecure(publicAccessAddr.hostname, publicAccessAddr.port); + } else { + location = Location.forGrpcInsecure(resultAddrHostName, resultAddrPort); + } List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. return new FlightInfo(schema, descriptor, endpoints, -1, -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 840992a7a14a151..bebcfe11d5dcd3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -58,6 +58,7 @@ */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); + private TNetworkAddress publicAccessAddr; public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -66,6 +67,10 @@ public FlightSqlConnectProcessor(ConnectContext context) { context.setReturnResultFromLocal(true); } + public TNetworkAddress getPublicAccessAddr() { + return publicAccessAddr; + } + public void prepare(MysqlCommand command) { // set status of query to OK. ctx.getState().reset(); @@ -133,10 +138,10 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { DebugUtil.printId(tid), resultStatus)); } if (pResult.hasBeArrowFlightIp()) { - ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); } if (pResult.hasBeArrowFlightPort()) { - ctx.getResultFlightServerAddr().port = pResult.getBeArrowFlightPort(); + publicAccessAddr.port = pResult.getBeArrowFlightPort(); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);