Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 5, 2023
1 parent 86de722 commit 5328810
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 5 deletions.
2 changes: 1 addition & 1 deletion be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> s
arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::Create(
const std::shared_ptr<QueryStatement>& statement_) {
// Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket
// to the ADBC client, so that the row_descriptor and control block can be found.
// to the ADBC client, so that the schema and control block can be found.
auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
if (schema == nullptr) {
ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public enum ConnectType {
// Timestamp when the connection is make
protected volatile long loginTime;
// for arrow flight
protected volatile FlightSqlChannel flightSqlChannel;
protected volatile String peerIdentity;
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ private void handleQueryStmt() throws Exception {
// Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
// TODO support arrow flight sql
if (context.getConnectType().equals(ConnectType.MYSQL) && cacheAnalyzer.enableCache() && !isOutfileQuery
if (channel != null && cacheAnalyzer.enableCache() && !isOutfileQuery
&& context.getSessionVariable().getSqlSelectLimit() < 0
&& context.getSessionVariable().getDefaultOrderByLimit() < 0) {
if (queryStmt instanceof QueryStmt || queryStmt instanceof LogicalPlanAdapter) {
Expand All @@ -1422,9 +1422,9 @@ private void handleQueryStmt() throws Exception {
}
}

// TODO support arrow flight sql
// handle select .. from xx limit 0
if (parsedStmt instanceof SelectStmt) {
// TODO support arrow flight sql
if (channel != null && parsedStmt instanceof SelectStmt) {
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
if (parsedSelectStmt.getLimit() == 0) {
LOG.info("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class FlightSqlChannel {
private final BufferAllocator allocator;

public FlightSqlChannel() {
// The Stmt result is not picked up by the Client within 10 minutes and will be deleted.
resultCache =
CacheBuilder.newBuilder()
.maximumSize(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

public class FlightSqlConnectContext extends ConnectContext {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectContext.class);
protected volatile FlightSqlChannel flightSqlChannel;

public FlightSqlConnectContext(String peerIdentity) {
this.connectType = ConnectType.ARROW_FLIGHT_SQL;
Expand Down

0 comments on commit 5328810

Please sign in to comment.