Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 19, 2024
1 parent def0bc2 commit f1daf3a
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 169 deletions.
40 changes: 8 additions & 32 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
Expand Down Expand Up @@ -63,11 +62,11 @@
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -134,10 +133,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
Expand Down Expand Up @@ -730,36 +726,16 @@ public String getRunningQuery() {
return runningQuery;
}

public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}

public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}

public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}

public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}

public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}

public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}

public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}

public TUniqueId getFinstId() {
return finstId;
public void clearFlightSqlEndpointsLocations() {
flightSqlEndpointsLocations.clear();
}

public void setReturnResultFromLocal(boolean returnResultFromLocal) {
Expand Down
32 changes: 16 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
Expand Down Expand Up @@ -733,29 +734,27 @@ protected void execInternal() throws Exception {
enableParallelResultSink = queryOptions.isEnableParallelOutfile();
}

TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
Expand All @@ -766,7 +765,8 @@ protected void execInternal() throws Exception {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.qe.runtime.ThriftPlansBuilder;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -90,7 +91,7 @@ public NereidsCoordinator(ConnectContext context, Analyzer analyzer,
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));

Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
}

// broker load
Expand Down Expand Up @@ -431,18 +432,22 @@ private void setForArrowFlight(CoordinatorContext coordinatorContext, PipelineDi
if (dataSink instanceof ResultSink || dataSink instanceof ResultFileSink) {
if (connectContext != null && !connectContext.isReturnResultFromLocal()) {
Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));

AssignedJob firstInstance = topPlan.getInstanceJobs().get(0);
BackendWorker worker = (BackendWorker) firstInstance.getAssignedWorker();
Backend backend = worker.getBackend();

connectContext.setFinstId(firstInstance.instanceId());
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
for (AssignedJob instance : topPlan.getInstanceJobs()) {
BackendWorker worker = (BackendWorker) instance.getAssignedWorker();
Backend backend = worker.getBackend();
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
}
TUniqueId finstId;
if (connectContext.getSessionVariable().enableParallelResultSink()) {
finstId = getQueryId();
} else {
finstId = instance.instanceId();
}
connectContext.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
backend.getArrowFlightAddress(), backend.getBrpcAddress(),
topPlan.getFragmentJob().getFragment().getOutputExprs()));
}
connectContext.setResultFlightServerAddr(backend.getArrowFlightAddress());
connectContext.setResultInternalServiceAddr(backend.getBrpcAddress());
connectContext.setResultOutputExprs(topPlan.getFragmentJob().getFragment().getOutputExprs());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -187,6 +189,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
connectContext.clearFlightSqlEndpointsLocations();
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
Expand Down Expand Up @@ -225,50 +228,52 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
}
} else {
// Now only query stmt will pull results from BE.
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (flightSQLConnectProcessor.getArrowSchema() == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
.toRuntimeException();
}

TUniqueId queryId = connectContext.queryId();
if (!connectContext.getSessionVariable().enableParallelResultSink()) {
// only one instance
queryId = connectContext.getFinstId();
}
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
List<FlightEndpoint> endpoints = Lists.newArrayList();
for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) {
TUniqueId tid = endpointLoc.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ endpointLoc.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
Location location;
if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting
// to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (endpointLoc.getResultPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
connectContext.getResultFlightServerAddr().port);
location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
// By default, the query results of all BE nodes will be aggregated to one BE node.
// ADBC Client will only receive one endpoint and pull data from the BE node
// corresponding to this endpoint.
// `set global enable_parallel_result_sink=true;` to allow each BE to return query results
// separately. ADBC Client will receive multiple endpoints and pull data from each endpoint.
endpoints.add(new FlightEndpoint(ticket, location));
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1);
}
}
} catch (Exception e) {
Expand Down
Loading

0 comments on commit f1daf3a

Please sign in to comment.