Skip to content

Commit

Permalink
5
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 18, 2023
1 parent a66a683 commit 2539004
Show file tree
Hide file tree
Showing 18 changed files with 59 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class BackendsProcDir implements ProcDirInterface {
private static final Logger LOG = LogManager.getLogger(BackendsProcDir.class);

public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("BackendId")
.add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("ArrowFlightPort")
.add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("ArrowFlightSQLPort")
.add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum")
.add("DataUsedCapacity").add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct")
.add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status")
Expand Down Expand Up @@ -107,7 +107,7 @@ public static List<List<String>> getBackendInfos() {
backendInfo.add(String.valueOf(backend.getBePort()));
backendInfo.add(String.valueOf(backend.getHttpPort()));
backendInfo.add(String.valueOf(backend.getBrpcPort()));
backendInfo.add(String.valueOf(backend.getArrowFlightPort()));
backendInfo.add(String.valueOf(backend.getArrowFlightSQLPort()));
backendInfo.add(TimeUtils.longToTimeString(backend.getLastStartTime()));
backendInfo.add(TimeUtils.longToTimeString(backend.getLastUpdateMs()));
backendInfo.add(String.valueOf(backend.isAlive()));
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1663,10 +1663,10 @@ private TNetworkAddress toArrowFlightHost(TNetworkAddress host) throws Exception
if (backend == null) {
throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
if (backend.getArrowFlightPort() < 0) {
if (backend.getArrowFlightSQLPort() < 0) {
return null;
}
return new TNetworkAddress(backend.getHost(), backend.getArrowFlightPort());
return new TNetworkAddress(backend.getHost(), backend.getArrowFlightSQLPort());
}

// estimate if this fragment contains UnionNode
Expand Down
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public void start() throws Exception {
if (!flightSqlService.start()) {
System.exit(-1);
}
} else {
LOG.info("No Arrow Flight SQL service that needs to be started.");
}
LOG.info("QE service start.");
}
Expand Down
12 changes: 6 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.FlightStatementContext;
import org.apache.doris.service.arrowflight.FlightStatementExecutor;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -2634,7 +2634,7 @@ public List<ResultRow> executeInternalQuery() {
}
}

public void executeArrowFlightQuery(FlightStatementContext flightStatementContext) {
public void executeArrowFlightQuery(FlightStatementExecutor flightStatementExecutor) {
LOG.debug("ARROW FLIGHT QUERY: " + originStmt.toString());
try {
try {
Expand Down Expand Up @@ -2686,10 +2686,10 @@ public void executeArrowFlightQuery(FlightStatementContext flightStatementContex
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); // TODO for query profile
}
flightStatementContext.setFinstId(coord.getFinstId());
flightStatementContext.setResultFlightServerAddr(coord.getResultFlightServerAddr());
flightStatementContext.setResultInternalServiceAddr(coord.getResultInternalServiceAddr());
flightStatementContext.setResultOutputExprs(coord.getResultOutputExprs());
flightStatementExecutor.setFinstId(coord.getFinstId());
flightStatementExecutor.setResultFlightServerAddr(coord.getResultFlightServerAddr());
flightStatementExecutor.setResultInternalServiceAddr(coord.getResultInternalServiceAddr());
flightStatementExecutor.setResultOutputExprs(coord.getResultOutputExprs());
}

private List<ResultRow> convertResultBatchToResultRows(TResultBatch batch) {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.doris.service.arrowflight;

import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightServerMiddleware;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.auth.BasicServerAuthHandler;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.logging.log4j.LogManager;
Expand All @@ -35,18 +33,12 @@ public class FlightSqlService {
private static final Logger LOG = LogManager.getLogger(FlightSqlService.class);
private final FlightServer flightServer;
private volatile boolean running;
public static final String FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE = "client-properties-middleware";
public static final FlightServerMiddleware.Key<FlightServerCookieMiddleware> FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY
= FlightServerMiddleware.Key.of(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE);

public FlightSqlService(int port) {
BufferAllocator allocator = new RootAllocator();
Location location = Location.forGrpcInsecure("0.0.0.0", port);
FlightSqlServiceImpl producer = new FlightSqlServiceImpl(location);
flightServer = FlightServer.builder(allocator, location, producer)
.middleware(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY,
new FlightServerCookieMiddleware.Factory())
.authHandler(new BasicServerAuthHandler(new FlightServerBasicAuthValidator())).build();
flightServer = FlightServer.builder(allocator, location, producer).build();
}

// start Arrow Flight SQL service, return true if success, otherwise false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,21 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi
final FlightDescriptor descriptor) {
try {
final String query = request.getQuery();
final FlightStatementContext flightStatementContext = new FlightStatementContext(query);
final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query);

flightStatementContext.executeQuery();
flightStatementExecutor.executeQuery();

TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(ByteString.copyFromUtf8(
DebugUtil.printId(flightStatementContext.getFinstId()) + ":" + query)).build();
DebugUtil.printId(flightStatementExecutor.getFinstId()) + ":" + query)).build();
final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(flightStatementContext.getResultFlightServerAddr().hostname,
flightStatementContext.getResultFlightServerAddr().port);
Location location = Location.forGrpcInsecure(flightStatementExecutor.getResultFlightServerAddr().hostname,
flightStatementExecutor.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));

Schema schema;
schema = flightStatementContext.fetchArrowFlightSchema(5000);
schema = flightStatementExecutor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public final class FlightStatementContext {
public final class FlightStatementExecutor {
private AutoCloseConnectContext acConnectContext;
private final String query;
private TUniqueId queryId;
Expand All @@ -64,7 +64,7 @@ public final class FlightStatementContext {
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;

public FlightStatementContext(final String query) {
public FlightStatementExecutor(final String query) {
this.query = query;
acConnectContext = buildConnectContext();
}
Expand Down Expand Up @@ -115,7 +115,7 @@ public ArrayList<Expr> getResultOutputExprs() {

@Override
public boolean equals(final Object other) {
if (!(other instanceof FlightStatementContext)) {
if (!(other instanceof FlightStatementExecutor)) {
return false;
}
return this == other;
Expand Down
18 changes: 9 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class Backend implements Writable {
private volatile int beRpcPort; // be rpc port
@SerializedName("brpcPort")
private volatile int brpcPort = -1;
@SerializedName("arrowFlightPort")
private volatile int arrowFlightPort = -1;
@SerializedName("arrowFlightSQLPort")
private volatile int arrowFlightSQLPort = -1;

@SerializedName("lastUpdateMs")
private volatile long lastUpdateMs;
Expand Down Expand Up @@ -206,8 +206,8 @@ public int getBrpcPort() {
return brpcPort;
}

public int getArrowFlightPort() {
return arrowFlightPort;
public int getArrowFlightSQLPort() {
return arrowFlightSQLPort;
}

public String getHeartbeatErrMsg() {
Expand Down Expand Up @@ -295,8 +295,8 @@ public void setBrpcPort(int brpcPort) {
this.brpcPort = brpcPort;
}

public void setArrowFlightPort(int arrowFlightPort) {
this.arrowFlightPort = arrowFlightPort;
public void setArrowFlightSQLPort(int arrowFlightSQLPort) {
this.arrowFlightSQLPort = arrowFlightSQLPort;
}

public void setCpuCores(int cpuCores) {
Expand Down Expand Up @@ -680,9 +680,9 @@ public boolean handleHbResponse(BackendHbResponse hbResponse, boolean isReplay)
this.brpcPort = hbResponse.getBrpcPort();
}

if (this.arrowFlightPort != hbResponse.getArrowFlightPort() && !FeConstants.runningUnitTest) {
if (this.arrowFlightSQLPort != hbResponse.getArrowFlightSQLPort() && !FeConstants.runningUnitTest) {
isChanged = true;
this.arrowFlightPort = hbResponse.getArrowFlightPort();
this.arrowFlightSQLPort = hbResponse.getArrowFlightSQLPort();
}

if (this.isShutDown.get() != hbResponse.isShutDown()) {
Expand Down Expand Up @@ -816,7 +816,7 @@ public TNetworkAddress getBrpcAddress() {
}

public TNetworkAddress getArrowFlightAddress() {
return new TNetworkAddress(getHost(), getArrowFlightPort());
return new TNetworkAddress(getHost(), getArrowFlightSQLPort());
}

public String getTagMapString() {
Expand Down
Loading

0 comments on commit 2539004

Please sign in to comment.