From 809bf307f36428f4c22eb9d4c0edf2a1d39f614a Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 13 Sep 2023 18:09:56 +0800 Subject: [PATCH] 4 --- .../org/apache/doris/qe/StmtExecutor.java | 2 +- .../FlightServerBasicAuthValidator.java | 4 + ...java => FlightServerCookieMiddleware.java} | 18 ++-- .../service/arrowflight/FlightSqlService.java | 19 ++--- .../arrowflight/FlightSqlServiceImpl.java | 85 +++++++++---------- .../arrowflight/FlightStatementContext.java | 65 ++++++-------- 6 files changed, 93 insertions(+), 100 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/{ServerCookieMiddleware.java => FlightServerCookieMiddleware.java} (82%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index adfa3a8339925ab..2e0bd115b47d96e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2674,7 +2674,7 @@ public void executeArrowFlightQuery(FlightStatementContext flightStatementContex } catch (Exception e) { queryScheduleSpan.recordException(e); LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e); - throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e); + throw new RuntimeException(e.getMessage() + Util.getRootCauseMessage(e), e); } finally { queryScheduleSpan.end(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerBasicAuthValidator.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerBasicAuthValidator.java index 4b46b85f3842ee4..5fe8dd8c8971ee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerBasicAuthValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerBasicAuthValidator.java @@ -14,6 +14,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java +// and modified by Doris package org.apache.doris.service.arrowflight; @@ -25,6 +28,7 @@ /** * authentication specialized implementation of BasicAuthValidator. Authenticates with provided * credentials. + * TODO */ public class FlightServerBasicAuthValidator implements BasicServerAuthHandler.BasicAuthValidator { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/ServerCookieMiddleware.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerCookieMiddleware.java similarity index 82% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/ServerCookieMiddleware.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerCookieMiddleware.java index 3ccd3399805eb40..dfa2a639382abb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/ServerCookieMiddleware.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightServerCookieMiddleware.java @@ -14,6 +14,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java +// and modified by Doris package org.apache.doris.service.arrowflight; @@ -30,16 +33,14 @@ /** * ServerCookieMiddleware allows a FlightServer to retrieve cookies from the request as well as set outgoing cookies + * TODO */ -public final class ServerCookieMiddleware implements FlightServerMiddleware { +public final class FlightServerCookieMiddleware implements FlightServerMiddleware { private RequestContext requestContext; private Map cookieValues; private final CallHeaders incomingHeaders; - /** - * Factory to construct @see com.dremio.service.flight.ServerCookieMiddlewares - */ - public static class Factory implements FlightServerMiddleware.Factory { + public static class Factory implements FlightServerMiddleware.Factory { /** * Construct a factory for receiving call headers. */ @@ -47,14 +48,15 @@ public Factory() { } @Override - public ServerCookieMiddleware onCallStarted(CallInfo callInfo, CallHeaders incomingHeaders, + public FlightServerCookieMiddleware onCallStarted(CallInfo callInfo, CallHeaders incomingHeaders, RequestContext context) { - return new ServerCookieMiddleware(callInfo, incomingHeaders, context); + return new FlightServerCookieMiddleware(callInfo, incomingHeaders, context); } } - private ServerCookieMiddleware(CallInfo callInfo, CallHeaders incomingHeaders, RequestContext requestContext) { + private FlightServerCookieMiddleware(CallInfo callInfo, CallHeaders incomingHeaders, + RequestContext requestContext) { this.incomingHeaders = incomingHeaders; this.requestContext = requestContext; this.cookieValues = new HashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java index 376e1bb1b5a5746..2157f1750b3fafb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java @@ -36,7 +36,7 @@ public class FlightSqlService { private final FlightServer flightServer; private volatile boolean running; public static final String FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE = "client-properties-middleware"; - public static final FlightServerMiddleware.Key FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY + public static final FlightServerMiddleware.Key FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY = FlightServerMiddleware.Key.of(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE); public FlightSqlService(int port) { @@ -44,19 +44,19 @@ public FlightSqlService(int port) { Location location = Location.forGrpcInsecure("0.0.0.0", port); FlightSqlServiceImpl producer = new FlightSqlServiceImpl(location); flightServer = FlightServer.builder(allocator, location, producer) - .middleware(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY, - new ServerCookieMiddleware.Factory()) - .authHandler(new BasicServerAuthHandler(new FlightServerBasicAuthValidator())).build(); + .middleware(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY, + new FlightServerCookieMiddleware.Factory()) + .authHandler(new BasicServerAuthHandler(new FlightServerBasicAuthValidator())).build(); } - // start Flightsql protocol service - // return true if success, otherwise false + // start Arrow Flight SQL service, return true if success, otherwise false public boolean start() { try { flightServer.start(); - LOG.info("Flightsql network service is started."); + running = true; + LOG.info("Arrow Flight SQL service is started."); } catch (IOException e) { - LOG.warn("Open Flightsql network service failed.", e); + LOG.error("Start Arrow Flight SQL service failed.", e); return false; } return true; @@ -65,11 +65,10 @@ public boolean start() { public void stop() { if (running) { running = false; - // close server channel, make accept throw exception try { flightServer.close(); } catch (InterruptedException e) { - LOG.warn("close server channel failed.", e); + LOG.warn("close Arrow Flight SQL server failed.", e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java index 97a9f74c60df0af..8fb476e2172397e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java @@ -14,10 +14,12 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +// and modified by Doris package org.apache.doris.service.arrowflight; -import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; @@ -59,11 +61,14 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; public class FlightSqlServiceImpl implements FlightSqlProducer, AutoCloseable { + private static final Logger LOG = LogManager.getLogger(FlightSqlServiceImpl.class); private final Location location; private final BufferAllocator rootAllocator = new RootAllocator(); private final SqlInfoBuilder sqlInfoBuilder; @@ -85,19 +90,19 @@ public FlightSqlServiceImpl(final Location location) { @Override public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPreparedStatement unimplemented").toRuntimeException(); } @Override public void closePreparedStatement(final ActionClosePreparedStatementRequest request, final CallContext context, - final StreamListener listener) { + final StreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("closePreparedStatement unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { try { final String query = request.getQuery(); final FlightStatementContext flightStatementContext = new FlightStatementContext(query); @@ -113,37 +118,29 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi flightStatementContext.getResultFlightServerAddr().port); List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); - Status status = new Status(); Schema schema; - try { - schema = flightStatementContext.fetchArrowFlightSchema(5000, status); - } catch (Exception e) { - throw CallStatus.INTERNAL.withDescription("failed to fetch Arrow Flight SQL schema. " - + Util.getRootCauseMessage(e)).toRuntimeException(); - } - if (!status.ok()) { - throw CallStatus.INTERNAL.withDescription(status.toString()).toRuntimeException(); - } + schema = flightStatementContext.fetchArrowFlightSchema(5000); if (schema == null) { - throw CallStatus.INTERNAL.withDescription("schema is null, status: " + status).toRuntimeException(); + throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); } return new FlightInfo(schema, descriptor, endpoints, -1, -1); } catch (Exception e) { - throw CallStatus.INTERNAL.withCause(e).toRuntimeException(); + LOG.warn("get flight info statement failed, " + e.getMessage(), e); + throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); } } @Override public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command, - final CallContext context, - final FlightDescriptor descriptor) { + final CallContext context, + final FlightDescriptor descriptor) { throw CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement unimplemented") .toRuntimeException(); } @Override public SchemaResult getSchemaStatement(final CommandStatementQuery command, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement unimplemented").toRuntimeException(); } @@ -159,7 +156,7 @@ public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { + final StreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("createPreparedStatement unimplemented").toRuntimeException(); } @@ -170,52 +167,52 @@ public void doExchange(CallContext context, FlightStream reader, ServerStreamLis @Override public Runnable acceptPutStatement(CommandStatementUpdate command, - CallContext context, FlightStream flightStream, - StreamListener ackStream) { + CallContext context, FlightStream flightStream, + StreamListener ackStream) { throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutStatement unimplemented").toRuntimeException(); } @Override public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context, - FlightStream flightStream, StreamListener ackStream) { + FlightStream flightStream, StreamListener ackStream) { throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementUpdate unimplemented") .toRuntimeException(); } @Override public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context, - FlightStream flightStream, StreamListener ackStream) { + FlightStream flightStream, StreamListener ackStream) { throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementQuery unimplemented") .toRuntimeException(); } @Override public FlightInfo getFlightInfoSqlInfo(final CommandGetSqlInfo request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA); } @Override public void getStreamSqlInfo(final CommandGetSqlInfo command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { this.sqlInfoBuilder.send(command.getInfoList(), listener); } @Override public FlightInfo getFlightInfoTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, - FlightDescriptor descriptor) { + FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_TYPE_INFO_SCHEMA); } @Override public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, - ServerStreamListener listener) { + ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTypeInfo unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA); } @@ -226,19 +223,19 @@ public void getStreamCatalogs(final CallContext context, final ServerStreamListe @Override public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA); } @Override public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoTables(final CommandGetTables request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { Schema schemaToUse = Schemas.GET_TABLES_SCHEMA; if (!request.getIncludeSchema()) { schemaToUse = Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; @@ -248,13 +245,13 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call @Override public void getStreamTables(final CommandGetTables command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoTableTypes(final CommandGetTableTypes request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA); } @@ -265,61 +262,61 @@ public void getStreamTableTypes(final CallContext context, final ServerStreamLis @Override public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA); } @Override public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoExportedKeys(final CommandGetExportedKeys request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_EXPORTED_KEYS_SCHEMA); } @Override public void getStreamExportedKeys(final CommandGetExportedKeys command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamExportedKeys unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoImportedKeys(final CommandGetImportedKeys request, final CallContext context, - final FlightDescriptor descriptor) { + final FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_KEYS_SCHEMA); } @Override public void getStreamImportedKeys(final CommandGetImportedKeys command, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamImportedKeys unimplemented").toRuntimeException(); } @Override public FlightInfo getFlightInfoCrossReference(CommandGetCrossReference request, CallContext context, - FlightDescriptor descriptor) { + FlightDescriptor descriptor) { return getFlightInfoForSchema(request, descriptor, Schemas.GET_CROSS_REFERENCE_SCHEMA); } @Override public void getStreamCrossReference(CommandGetCrossReference command, CallContext context, - ServerStreamListener listener) { + ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference unimplemented").toRuntimeException(); } @Override public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context, - final ServerStreamListener listener) { + final ServerStreamListener listener) { throw CallStatus.UNIMPLEMENTED.withDescription("getStreamStatement unimplemented").toRuntimeException(); } private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, - final Schema schema) { + final Schema schema) { final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); // TODO Support multiple endpoints. final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java index 1e28cd173ec2264..5bcbdc67230a74d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java @@ -14,6 +14,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java +// and modified by Doris package org.apache.doris.service.arrowflight; @@ -22,12 +25,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.Util; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.InternalQueryExecutionException; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; @@ -43,8 +44,6 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.ByteArrayInputStream; import java.util.ArrayList; @@ -57,8 +56,6 @@ import java.util.concurrent.TimeoutException; public final class FlightStatementContext { - private static final Logger LOG = LogManager.getLogger(FlightStatementContext.class); - private AutoCloseConnectContext acConnectContext; private final String query; private TUniqueId queryId; @@ -154,12 +151,11 @@ public void executeQuery() { acConnectContext.connectContext.setExecutor(stmtExecutor); stmtExecutor.executeArrowFlightQuery(this); } catch (Exception e) { - LOG.warn("Failed to coord exec, because: {}", e.getMessage(), e); - throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e); + throw new RuntimeException("Failed to coord exec", e); } } - public Schema fetchArrowFlightSchema(int timeoutMs, Status status) { + public Schema fetchArrowFlightSchema(int timeoutMs) { TNetworkAddress address = getResultInternalServiceAddr(); TUniqueId tid = getFinstId(); ArrayList resultOutputExprs = getResultOutputExprs(); @@ -175,14 +171,15 @@ public Schema fetchArrowFlightSchema(int timeoutMs, Status status) { InternalService.PFetchArrowFlightSchemaResult pResult; pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (pResult == null) { - LOG.warn("fetch arrow flight schema timeout, finstId={}", DebugUtil.printId(tid)); - status.setStatus("fetch arrow flight schema timeout"); - return null; + throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + DebugUtil.printId(tid))); } TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); if (code != TStatusCode.OK) { + Status status = null; status.setPstatus(pResult.getStatus()); - return null; + throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + DebugUtil.printId(tid), status)); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); @@ -194,40 +191,34 @@ public Schema fetchArrowFlightSchema(int timeoutMs, Status status) { VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); List fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != resultOutputExprs.size()) { - LOG.error("Schema size '{}' is not equal to arrow field size '{}'.", - fieldVectors.size(), resultOutputExprs.size()); - status.setStatus(new Status(TStatusCode.INTERNAL_ERROR, - "Load Doris data failed, schema size of fetch data is wrong.")); - return null; + throw new RuntimeException(String.format( + "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); } return root.getSchema(); } catch (Exception e) { - LOG.error("Read Arrow Flight Schema failed because: ", e); - status.setStatus(new Status(TStatusCode.INTERNAL_ERROR, - "Read Arrow Flight Schema failed because.")); - return null; + throw new RuntimeException("Read Arrow Flight Schema failed.", e); } } else { - LOG.info("finistId={}, get empty arrow flight schema", DebugUtil.printId(tid)); - return null; + throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + DebugUtil.printId(tid))); } } catch (RpcException e) { - LOG.warn("arrow flight schema fetch catch rpc exception, finstId {} backend {}", - DebugUtil.printId(tid), address.toString(), e); - status.setRpcStatus(e.getMessage()); + throw new RuntimeException(String.format( + "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } catch (InterruptedException e) { - LOG.warn("arrow flight schema future get interrupted exception, finstId {} backend {}", - DebugUtil.printId(tid), address.toString(), e); - status.setStatus("interrupted exception"); + throw new RuntimeException(String.format( + "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } catch (ExecutionException e) { - LOG.warn("arrow flight schema future get execution exception, finstId {} backend {}", - DebugUtil.printId(tid), address.toString(), e); - status.setStatus("execution exception"); + throw new RuntimeException(String.format( + "arrow flight schema future get execution exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } catch (TimeoutException e) { - LOG.warn("arrow flight schema fetch timeout, finstId {} backend {}", - DebugUtil.printId(tid), address.toString(), e); - status.setStatus("fetch timeout"); + throw new RuntimeException(String.format( + "arrow flight schema fetch timeout, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } - return null; } }