Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 7, 2024
1 parent 2368f58 commit 5d137a0
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 113 deletions.
40 changes: 6 additions & 34 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
Expand Down Expand Up @@ -63,11 +62,11 @@
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -134,10 +133,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
Expand Down Expand Up @@ -731,36 +727,12 @@ public String getRunningQuery() {
return runningQuery;
}

public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}

public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
}

public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}

public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}

public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}

public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}

public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}

public TUniqueId getFinstId() {
return finstId;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}

public void setReturnResultFromLocal(boolean returnResultFromLocal) {
Expand Down
29 changes: 15 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
Expand Down Expand Up @@ -740,22 +741,22 @@ private void execInternal() throws Exception {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
FlightSqlEndpointsLocation flightSqlEndpointsLocation = new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(execBeAddr), toBrpcHost(execBeAddr), fragments.get(0).getOutputExprs());
context.addFlightSqlEndpointsLocation(flightSqlEndpointsLocation);
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -231,35 +233,34 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
.toRuntimeException();
}

TUniqueId queryId = connectContext.queryId();
if (!connectContext.getSessionVariable().enableParallelResultSink()) {
// only one instance
queryId = connectContext.getFinstId();
}
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Lists.newArrayList();
for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) {
TUniqueId tid = endpointLoc.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ endpointLoc.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, and the
// external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
endpoints.add(new FlightEndpoint(ticket, location));
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -108,29 +110,23 @@ public void handleQuery(String query) throws ConnectionException {
// }

public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = ctx.getResultInternalServiceAddr();
TUniqueId tid;
if (ctx.getSessionVariable().enableParallelResultSink()) {
tid = ctx.queryId();
} else {
// only one instance
tid = ctx.getFinstId();
}
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
Preconditions.checkState(!ctx.getFlightSqlEndpointsLocations().isEmpty());
FlightSqlEndpointsLocation flightSqlEndpointsLocation = ctx.getFlightSqlEndpointsLocations().get(0);
TNetworkAddress address = flightSqlEndpointsLocation.getResultInternalServiceAddr();
TUniqueId tid = flightSqlEndpointsLocation.getFinstId();
ArrayList<Expr> resultOutputExprs = flightSqlEndpointsLocation.getResultOutputExprs();
Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
.setFinstId(queryId)
.build();
InternalService.PFetchArrowFlightSchemaRequest request
= InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build();

Future<InternalService.PFetchArrowFlightSchemaResult> future
= BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
Future<InternalService.PFetchArrowFlightSchemaResult> future = BackendServiceProxy.getInstance()
.fetchArrowFlightSchema(address, request);
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s",
DebugUtil.printId(tid)));
throw new RuntimeException(
String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid)));
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
Expand All @@ -144,41 +140,39 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(pResult.getSchema().toByteArray()),
rootAllocator
);
new ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator);
try {
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(String.format(
"Schema size %s' is not equal to arrow field size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
throw new RuntimeException(
String.format("Schema size %s' is not equal to arrow field size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
}
return root.getSchema();
} catch (Exception e) {
throw new RuntimeException("Read Arrow Flight Schema failed.", e);
}
} else {
throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s",
DebugUtil.printId(tid)));
throw new RuntimeException(
String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid)));
}
} catch (RpcException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
throw new RuntimeException(
String.format("arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get interrupted exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
throw new RuntimeException(
String.format("arrow flight schema future get interrupted exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get execution exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
throw new RuntimeException(
String.format("arrow flight schema future get execution exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch timeout, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
throw new RuntimeException(
String.format("arrow flight schema fetch timeout, queryId: %s,backend: %s", DebugUtil.printId(tid),
address), e);
}
}

Expand Down
Loading

0 comments on commit 5d137a0

Please sign in to comment.