Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 31, 2023
1 parent 02bcbe3 commit 3bb5139
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ public void execute() throws Exception {
public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setWaitSyncQueryResult(true);
}
try (Scope scope = executeSpan.makeCurrent()) {
if (parsedStmt instanceof LogicalPlanAdapter
|| (parsedStmt == null && sessionVariable.isEnableNereidsPlanner())) {
Expand Down Expand Up @@ -581,6 +584,9 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
throw new NereidsException(new AnalysisException(e.getMessage(), e));
}
profile.getSummaryProfile().setQueryPlanFinishTime();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setWaitSyncQueryResult(false);
}
handleQueryWithRetry(queryId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
Expand Down Expand Up @@ -66,16 +66,11 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -149,10 +144,8 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
return getFlightInfoForSchema(ticketStatement, descriptor,
JdbcToArrowUtils.jdbcToArrowSchema(
flightSqlChannel.getResultSet(DebugUtil.printId(connectContext.queryId()))
.getResultSet()
.getMetaData(), DEFAULT_CALENDAR));
.getVectorSchemaRoot().getSchema());
} else {
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
Expand All @@ -174,8 +167,13 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi
} catch (Exception e) {
if (null != connectContext) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(
e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e); // context query state
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
LOG.warn("get flight info statement failed, " + e.getMessage(), e);
LOG.warn("get flight info statement failed, " + e.getMessage(), e); // context query state
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
}
}
Expand All @@ -198,25 +196,12 @@ public SchemaResult getSchemaStatement(final CommandStatementQuery command, fina
public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context,
final ServerStreamListener listener) {
final String handle = ticketStatementQuery.getStatementHandle().toStringUtf8();
final FlightSqlResultSet flightSqlResultSet =
final FlightSqlResultCacheEntry flightSqlResultCacheEntry =
Objects.requireNonNull(flightSqlChannel.getResultSet(handle));
try (final ResultSet resultSet = flightSqlResultSet.getResultSet()) {
final Schema schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR);
try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
final VectorLoader loader = new VectorLoader(vectorSchemaRoot);
listener.start(vectorSchemaRoot);

final ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, rootAllocator);
while (iterator.hasNext()) {
final VectorUnloader unloader = new VectorUnloader(iterator.next());
loader.load(unloader.getRecordBatch());
listener.putNext();
vectorSchemaRoot.clear();
}

listener.putNext();
}
} catch (SQLException | IOException e) {
try (final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot()) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (Exception e) {
LOG.warn("Failed to getStreamStatement, " + e.getMessage(), e);
listener.error(e);
} finally {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.service.arrowflight.results;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class FlightSqlChannel {
private final Cache<String, FlightSqlResultCacheEntry> resultCache;
private final BufferAllocator allocator;

public FlightSqlChannel() {
resultCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new ResultRemovalListener())
.build();
allocator = new RootAllocator(Long.MAX_VALUE);
}

// TODO
public String getRemoteIp() {
return "0.0.0.0";
}

// TODO
public String getRemoteHostPortString() {
return "0.0.0.0:0";
}

public void addResultSet(String queryId, String runningQuery, ResultSet resultSet) {
List<Field> schemaFields = new ArrayList<>();
List<FieldVector> dataFields = new ArrayList<>();
List<List<String>> resultData = resultSet.getResultRows();
ResultSetMetaData metaData = resultSet.getMetaData();

// TODO: only support varchar type
for (Column col : metaData.getColumns()) {
schemaFields.add(new Field(col.getName(), FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector(col.getName(), allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(resultData.size());
dataFields.add(varCharVector);
}
Schema schema = new Schema(schemaFields);

for (int i = 0; i < resultData.size(); i++) {
List<String> row = resultData.get(i);
for (int j = 0; j < row.size(); j++) {
String item = row.get(j);
if (item == null || item.equals(FeConstants.null_string)) {
dataFields.get(j).setNull(i);
} else {
((VarCharVector) dataFields.get(j)).setSafe(i, item.getBytes());
}
}
}
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, dataFields);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(vectorSchemaRoot,
runningQuery);
resultCache.put(queryId, flightSqlResultCacheEntry);
}

public FlightSqlResultCacheEntry getResultSet(String queryId) {
return resultCache.getIfPresent(queryId);
}

public void invalidate(String handle) {
resultCache.invalidate(handle);
}

private static class ResultRemovalListener implements RemovalListener<String, FlightSqlResultCacheEntry> {
@Override
public void onRemoval(@NotNull final RemovalNotification<String, FlightSqlResultCacheEntry> notification) {
try {
AutoCloseables.close(notification.getValue());
} catch (final Exception e) {
// swallow
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.service.arrowflight;
package org.apache.doris.service.arrowflight.results;

import org.apache.arrow.vector.VectorSchemaRoot;

import java.sql.ResultSet;
import java.util.Objects;


public final class FlightSqlResultSet implements AutoCloseable {
public final class FlightSqlResultCacheEntry implements AutoCloseable {

private final ResultSet result;
private final VectorSchemaRoot vectorSchemaRoot;
private final String query;

public FlightSqlResultSet(final ResultSet result, final String query) {
this.result = Objects.requireNonNull(result, "result cannot be null.");
public FlightSqlResultCacheEntry(final VectorSchemaRoot vectorSchemaRoot, final String query) {
this.vectorSchemaRoot = Objects.requireNonNull(vectorSchemaRoot, "result cannot be null.");
this.query = query;
}

public ResultSet getResultSet() {
return result;
public VectorSchemaRoot getVectorSchemaRoot() {
return vectorSchemaRoot;
}

public String getQuery() {
Expand All @@ -48,15 +49,15 @@ public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ResultSet)) {
if (!(other instanceof VectorSchemaRoot)) {
return false;
}
final ResultSet that = (ResultSet) other;
return result.equals(that);
final VectorSchemaRoot that = (VectorSchemaRoot) other;
return vectorSchemaRoot.equals(that);
}

@Override
public int hashCode() {
return Objects.hash(result);
return Objects.hash(vectorSchemaRoot);
}
}

0 comments on commit 3bb5139

Please sign in to comment.