From c5bfe741fcfe96241e90bdb025bd600c526dd92c Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Tue, 19 Nov 2024 16:28:06 +0800 Subject: [PATCH] 1 --- .../org/apache/doris/qe/ConnectContext.java | 40 +---- .../java/org/apache/doris/qe/Coordinator.java | 28 ++-- .../arrowflight/DorisFlightSqlProducer.java | 71 ++++---- .../FlightSqlConnectProcessor.java | 154 +++++++++--------- .../results/FlightSqlEndpointsLocation.java | 65 ++++++++ 5 files changed, 201 insertions(+), 157 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 5e0716da7d767fc..dc6ecb420289962 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DecimalLiteral; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FloatLiteral; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; @@ -63,11 +62,11 @@ import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -134,10 +133,7 @@ public enum ConnectType { protected volatile String peerIdentity; private final Map preparedQuerys = new HashMap<>(); private String runningQuery; - private TNetworkAddress resultFlightServerAddr; - private TNetworkAddress resultInternalServiceAddr; - private ArrayList resultOutputExprs; - private TUniqueId finstId; + private final List flightSqlEndpointsLocations = Lists.newArrayList(); private boolean returnResultFromLocal = true; // mysql net protected volatile MysqlChannel mysqlChannel; @@ -730,36 +726,12 @@ public String getRunningQuery() { return runningQuery; } - public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) { - this.resultFlightServerAddr = resultFlightServerAddr; + public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) { + this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation); } - public TNetworkAddress getResultFlightServerAddr() { - return resultFlightServerAddr; - } - - public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) { - this.resultInternalServiceAddr = resultInternalServiceAddr; - } - - public TNetworkAddress getResultInternalServiceAddr() { - return resultInternalServiceAddr; - } - - public void setResultOutputExprs(ArrayList resultOutputExprs) { - this.resultOutputExprs = resultOutputExprs; - } - - public ArrayList getResultOutputExprs() { - return resultOutputExprs; - } - - public void setFinstId(TUniqueId finstId) { - this.finstId = finstId; - } - - public TUniqueId getFinstId() { - return finstId; + public List getFlightSqlEndpointsLocations() { + return flightSqlEndpointsLocations; } public void setReturnResultFromLocal(boolean returnResultFromLocal) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dee130886ec8f55..54433844d18bcc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -83,6 +83,7 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; @@ -740,22 +741,21 @@ protected void execInternal() throws Exception { continue; } addrs.add(param.host); - receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), - toBrpcHost(param.host), this.timeoutDeadline, - context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); - } - - if (!context.isReturnResultFromLocal()) { - Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); - if (enableParallelResultSink) { - context.setFinstId(queryId); + if (context.isReturnResultFromLocal()) { + receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), + toBrpcHost(param.host), this.timeoutDeadline, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); } else { - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); + Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); + TUniqueId finstId; + if (enableParallelResultSink) { + finstId = queryId; + } else { + finstId = topParams.instanceExecParams.get(0).instanceId; + } + context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId, + toArrowFlightHost(execBeAddr), toBrpcHost(execBeAddr), fragments.get(0).getOutputExprs())); } - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); - context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); - context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); - context.setResultOutputExprs(fragments.get(0).getOutputExprs()); } LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId), diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 9d44a55b0816457..977776b92470ea3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -25,11 +25,13 @@ import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -225,50 +227,47 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con } } else { // Now only query stmt will pull results from BE. - Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); - if (schema == null) { + flightSQLConnectProcessor.fetchArrowFlightSchema(5000); + if (flightSQLConnectProcessor.getArrowSchema() == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } - TUniqueId queryId = connectContext.queryId(); - if (!connectContext.getSessionVariable().enableParallelResultSink()) { - // only one instance - queryId = connectContext.getFinstId(); - } - // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. - final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname - + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) - .build(); - Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); - // TODO Support multiple endpoints. - Location location; - if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { - // In a production environment, it is often inconvenient to expose Doris BE nodes - // to the external network. - // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, - // and the external client will be randomly routed to a Doris BE node when connecting to nginx. - // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. - // If it is different from the Doris BE node randomly routed by nginx, - // data forwarding needs to be done inside the Doris BE node. - if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) { - location = Location.forGrpcInsecure( - flightSQLConnectProcessor.getPublicAccessAddr().hostname, - flightSQLConnectProcessor.getPublicAccessAddr().port); + List endpoints = Lists.newArrayList(); + for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) { + TUniqueId tid = endpointLoc.getFinstId(); + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&" + + endpointLoc.getResultInternalServiceAddr().port + "&" + query); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(handle).build(); + Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + Location location; + if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting + // to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + if (endpointLoc.getResultPublicAccessAddr().isSetPort()) { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); + } } else { - location = Location.forGrpcInsecure( - flightSQLConnectProcessor.getPublicAccessAddr().hostname, - connectContext.getResultFlightServerAddr().port); + location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); } - } else { - location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + endpoints.add(new FlightEndpoint(ticket, location)); } - List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. - return new FlightInfo(schema, descriptor, endpoints, -1, -1); + return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1); } } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index db5213cb7d4d08f..3fba602a1c1e2f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -31,6 +31,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -58,7 +59,7 @@ */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); - private TNetworkAddress publicAccessAddr = new TNetworkAddress(); + private Schema arrowSchema; public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -67,8 +68,8 @@ public FlightSqlConnectProcessor(ConnectContext context) { context.setReturnResultFromLocal(true); } - public TNetworkAddress getPublicAccessAddr() { - return publicAccessAddr; + public Schema getArrowSchema() { + return arrowSchema; } public void prepare(MysqlCommand command) { @@ -107,80 +108,87 @@ public void handleQuery(String query) throws ConnectionException { // handleFieldList(tableName); // } - public Schema fetchArrowFlightSchema(int timeoutMs) { - TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid; - if (ctx.getSessionVariable().enableParallelResultSink()) { - tid = ctx.queryId(); - } else { - // only one instance - tid = ctx.getFinstId(); + public void fetchArrowFlightSchema(int timeoutMs) { + if (ctx.getFlightSqlEndpointsLocations().isEmpty()) { + throw new RuntimeException("fetch arrow flight schema failed, no FlightSqlEndpointsLocations."); } - ArrayList resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); - try { - InternalService.PFetchArrowFlightSchemaRequest request = - InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(queryId) - .build(); - - Future future - = BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request); - InternalService.PFetchArrowFlightSchemaResult pResult; - pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); - if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", - DebugUtil.printId(tid))); - } - Status resultStatus = new Status(pResult.getStatus()); - if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus)); - } - if (pResult.hasBeArrowFlightIp()) { - publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); - } - if (pResult.hasBeArrowFlightPort()) { - publicAccessAddr.setPort(pResult.getBeArrowFlightPort()); - } - if (pResult.hasSchema() && pResult.getSchema().size() > 0) { - RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); - ArrowStreamReader arrowStreamReader = new ArrowStreamReader( - new ByteArrayInputStream(pResult.getSchema().toByteArray()), - rootAllocator - ); - try { - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - List fieldVectors = root.getFieldVectors(); - if (fieldVectors.size() != resultOutputExprs.size()) { - throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, queryId: %s.", - fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + for (FlightSqlEndpointsLocation endpointLoc : ctx.getFlightSqlEndpointsLocations()) { + TNetworkAddress address = endpointLoc.getResultInternalServiceAddr(); + TUniqueId tid = endpointLoc.getFinstId(); + ArrayList resultOutputExprs = endpointLoc.getResultOutputExprs(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + try { + InternalService.PFetchArrowFlightSchemaRequest request + = InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build(); + + Future future = BackendServiceProxy.getInstance() + .fetchArrowFlightSchema(address, request); + InternalService.PFetchArrowFlightSchemaResult pResult; + pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); + if (pResult == null) { + throw new RuntimeException( + String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); + } + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + throw new RuntimeException( + String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", + DebugUtil.printId(tid), resultStatus)); + } + + TNetworkAddress resultPublicAccessAddr = new TNetworkAddress(); + if (pResult.hasBeArrowFlightIp()) { + resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); + } + if (pResult.hasBeArrowFlightPort()) { + resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort()); + } + endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr); + if (pResult.hasSchema() && pResult.getSchema().size() > 0) { + RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + ArrowStreamReader arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator); + try { + Schema schema; + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != resultOutputExprs.size()) { + throw new RuntimeException( + String.format("Schema size %s' is not equal to arrow field size %s, queryId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + } + schema = root.getSchema(); + if (arrowSchema == null) { + arrowSchema = schema; + } else if (!arrowSchema.equals(schema)) { + throw new RuntimeException(String.format( + "The schema returned by results BE is different, first schema: %s, " + + "new schema: %s, queryId: %s,backend: %s", arrowSchema, schema, + DebugUtil.printId(tid), address)); + } + } catch (Exception e) { + throw new RuntimeException("Read Arrow Flight Schema failed.", e); } - return root.getSchema(); - } catch (Exception e) { - throw new RuntimeException("Read Arrow Flight Schema failed.", e); + } else { + throw new RuntimeException( + String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } - } else { - throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", - DebugUtil.printId(tid))); + } catch (RpcException e) { + throw new RuntimeException( + String.format("arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("arrow flight schema future get interrupted exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (ExecutionException e) { + throw new RuntimeException( + String.format("arrow flight schema future get execution exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (TimeoutException e) { + throw new RuntimeException(String.format("arrow flight schema fetch timeout, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } - } catch (RpcException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (InterruptedException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (ExecutionException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (TimeoutException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java new file mode 100644 index 000000000000000..61adc797cc5dc4f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.results; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TUniqueId; + +import java.util.ArrayList; + +public class FlightSqlEndpointsLocation { + private TUniqueId finstId; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private TNetworkAddress resultPublicAccessAddr; + private ArrayList resultOutputExprs; + + public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress resultFlightServerAddr, + TNetworkAddress resultInternalServiceAddr, ArrayList resultOutputExprs) { + this.finstId = finstId; + this.resultFlightServerAddr = resultFlightServerAddr; + this.resultInternalServiceAddr = resultInternalServiceAddr; + this.resultPublicAccessAddr = new TNetworkAddress(); + this.resultOutputExprs = resultOutputExprs; + } + + public TUniqueId getFinstId() { + return finstId; + } + + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public void setResultPublicAccessAddr(TNetworkAddress resultPublicAccessAddr) { + this.resultPublicAccessAddr = resultPublicAccessAddr; + } + + public TNetworkAddress getResultPublicAccessAddr() { + return resultPublicAccessAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } +}