diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 20b4a077b0292a2..adcd6478080c7ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -40,7 +40,7 @@ import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; -import org.apache.doris.service.arrowflight.FlightSqlChannel; +import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ee5fd09911d8fa9..363dff5c244a7b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -444,6 +444,9 @@ public void execute() throws Exception { public void execute(TUniqueId queryId) throws Exception { SessionVariable sessionVariable = context.getSessionVariable(); Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan(); + if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + context.setWaitSyncQueryResult(true); + } try (Scope scope = executeSpan.makeCurrent()) { if (parsedStmt instanceof LogicalPlanAdapter || (parsedStmt == null && sessionVariable.isEnableNereidsPlanner())) { @@ -581,6 +584,9 @@ private void executeByNereids(TUniqueId queryId) throws Exception { throw new NereidsException(new AnalysisException(e.getMessage(), e)); } profile.getSummaryProfile().setQueryPlanFinishTime(); + if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { + context.setWaitSyncQueryResult(false); + } handleQueryWithRetry(queryId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 9a65a4982aa7390..817bc332afa369c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -24,13 +24,13 @@ import org.apache.doris.common.util.Util; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.arrowflight.results.FlightSqlChannel; +import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; -import org.apache.arrow.adapter.jdbc.JdbcToArrow; import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.Criteria; @@ -66,16 +66,11 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Calendar; import java.util.Collections; import java.util.List; @@ -149,10 +144,8 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() .setStatementHandle(handle).build(); return getFlightInfoForSchema(ticketStatement, descriptor, - JdbcToArrowUtils.jdbcToArrowSchema( flightSqlChannel.getResultSet(DebugUtil.printId(connectContext.queryId())) - .getResultSet() - .getMetaData(), DEFAULT_CALENDAR)); + .getVectorSchemaRoot().getSchema()); } else { final ByteString handle = ByteString.copyFromUtf8( DebugUtil.printId(connectContext.getFinstId()) + ":" + query); @@ -174,8 +167,13 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi } catch (Exception e) { if (null != connectContext) { connectContext.setCommand(MysqlCommand.COM_SLEEP); + String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage( + e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + + connectContext.getState().getErrorMessage(); + LOG.warn(errMsg, e); // context query state + throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } - LOG.warn("get flight info statement failed, " + e.getMessage(), e); + LOG.warn("get flight info statement failed, " + e.getMessage(), e); // context query state throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); } } @@ -198,25 +196,12 @@ public SchemaResult getSchemaStatement(final CommandStatementQuery command, fina public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context, final ServerStreamListener listener) { final String handle = ticketStatementQuery.getStatementHandle().toStringUtf8(); - final FlightSqlResultSet flightSqlResultSet = + final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(flightSqlChannel.getResultSet(handle)); - try (final ResultSet resultSet = flightSqlResultSet.getResultSet()) { - final Schema schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR); - try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { - final VectorLoader loader = new VectorLoader(vectorSchemaRoot); - listener.start(vectorSchemaRoot); - - final ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, rootAllocator); - while (iterator.hasNext()) { - final VectorUnloader unloader = new VectorUnloader(iterator.next()); - loader.load(unloader.getRecordBatch()); - listener.putNext(); - vectorSchemaRoot.clear(); - } - - listener.putNext(); - } - } catch (SQLException | IOException e) { + try (final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot()) { + listener.start(vectorSchemaRoot); + listener.putNext(); + } catch (Exception e) { LOG.warn("Failed to getStreamStatement, " + e.getMessage(), e); listener.error(e); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlChannel.java deleted file mode 100644 index 9dbb99ffbec1687..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlChannel.java +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.service.arrowflight; - -import org.apache.doris.qe.ResultSet; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import org.apache.arrow.util.AutoCloseables; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.TimeUnit; - -public class FlightSqlChannel { - private final Cache resultCache; - - public FlightSqlChannel() { - resultCache = - CacheBuilder.newBuilder() - .maximumSize(100) - .expireAfterWrite(10, TimeUnit.MINUTES) - .removalListener(new ResultRemovalListener()) - .build(); - } - - // TODO - public String getRemoteIp() { - return "0.0.0.0"; - } - - // TODO - public String getRemoteHostPortString() { - return "0.0.0.0:0"; - } - - public void addResultSet(String queryId, String runningQuery, ResultSet resultSet) { - // connectcontext里存一个队列,把 relustSet 转成 arrow 后存到 队列里,然后 flight 哪里拿 - // handshake in packet with header and has encrypted, need to send in ssl format - // ssl mode in packet no header and no encrypted, need to encrypted and add header and send in ssl format - final FlightSqlResultSet flightSqlResultSet = new FlightSqlResultSet((java.sql.ResultSet) resultSet, - runningQuery); - resultCache.put(queryId, flightSqlResultSet); - } - - public FlightSqlResultSet getResultSet(String queryId) { - return resultCache.getIfPresent(queryId); - } - - public void invalidate(String handle) { - resultCache.invalidate(handle); - } - - private static class ResultRemovalListener implements RemovalListener { - @Override - public void onRemoval(@NotNull final RemovalNotification notification) { - try { - AutoCloseables.close(notification.getValue()); - } catch (final Exception e) { - // swallow - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java new file mode 100644 index 000000000000000..c95fcc2e254a594 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlChannel.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.results; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class FlightSqlChannel { + private final Cache resultCache; + private final BufferAllocator allocator; + + public FlightSqlChannel() { + resultCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener(new ResultRemovalListener()) + .build(); + allocator = new RootAllocator(Long.MAX_VALUE); + } + + // TODO + public String getRemoteIp() { + return "0.0.0.0"; + } + + // TODO + public String getRemoteHostPortString() { + return "0.0.0.0:0"; + } + + public void addResultSet(String queryId, String runningQuery, ResultSet resultSet) { + List schemaFields = new ArrayList<>(); + List dataFields = new ArrayList<>(); + List> resultData = resultSet.getResultRows(); + ResultSetMetaData metaData = resultSet.getMetaData(); + + // TODO: only support varchar type + for (Column col : metaData.getColumns()) { + schemaFields.add(new Field(col.getName(), FieldType.nullable(new Utf8()), null)); + VarCharVector varCharVector = new VarCharVector(col.getName(), allocator); + varCharVector.allocateNew(); + varCharVector.setValueCount(resultData.size()); + dataFields.add(varCharVector); + } + Schema schema = new Schema(schemaFields); + + for (int i = 0; i < resultData.size(); i++) { + List row = resultData.get(i); + for (int j = 0; j < row.size(); j++) { + String item = row.get(j); + if (item == null || item.equals(FeConstants.null_string)) { + dataFields.get(j).setNull(i); + } else { + ((VarCharVector) dataFields.get(j)).setSafe(i, item.getBytes()); + } + } + } + VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, dataFields); + final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(vectorSchemaRoot, + runningQuery); + resultCache.put(queryId, flightSqlResultCacheEntry); + } + + public FlightSqlResultCacheEntry getResultSet(String queryId) { + return resultCache.getIfPresent(queryId); + } + + public void invalidate(String handle) { + resultCache.invalidate(handle); + } + + private static class ResultRemovalListener implements RemovalListener { + @Override + public void onRemoval(@NotNull final RemovalNotification notification) { + try { + AutoCloseables.close(notification.getValue()); + } catch (final Exception e) { + // swallow + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlResultSet.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlResultSet.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java index 4f55a643c8c7b9d..d17f8fea625d463 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlResultSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlResultCacheEntry.java @@ -15,24 +15,25 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.service.arrowflight; +package org.apache.doris.service.arrowflight.results; + +import org.apache.arrow.vector.VectorSchemaRoot; -import java.sql.ResultSet; import java.util.Objects; -public final class FlightSqlResultSet implements AutoCloseable { +public final class FlightSqlResultCacheEntry implements AutoCloseable { - private final ResultSet result; + private final VectorSchemaRoot vectorSchemaRoot; private final String query; - public FlightSqlResultSet(final ResultSet result, final String query) { - this.result = Objects.requireNonNull(result, "result cannot be null."); + public FlightSqlResultCacheEntry(final VectorSchemaRoot vectorSchemaRoot, final String query) { + this.vectorSchemaRoot = Objects.requireNonNull(vectorSchemaRoot, "result cannot be null."); this.query = query; } - public ResultSet getResultSet() { - return result; + public VectorSchemaRoot getVectorSchemaRoot() { + return vectorSchemaRoot; } public String getQuery() { @@ -48,15 +49,15 @@ public boolean equals(final Object other) { if (this == other) { return true; } - if (!(other instanceof ResultSet)) { + if (!(other instanceof VectorSchemaRoot)) { return false; } - final ResultSet that = (ResultSet) other; - return result.equals(that); + final VectorSchemaRoot that = (VectorSchemaRoot) other; + return vectorSchemaRoot.equals(that); } @Override public int hashCode() { - return Objects.hash(result); + return Objects.hash(vectorSchemaRoot); } }