From 2dc22e3c0e02cad0743266875d39dd14586e05ad Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 13 Sep 2023 16:39:50 +0800 Subject: [PATCH] 3 --- be/src/runtime/buffer_control_block.cpp | 7 ------- be/src/runtime/result_buffer_mgr.cpp | 8 ++------ .../arrow_flight/arrow_flight_batch_reader.cpp | 13 ++++--------- be/src/service/internal_service.cpp | 2 -- .../main/java/org/apache/doris/common/Config.java | 4 ++-- fe/fe-core/pom.xml | 12 ------------ .../glue/translator/PhysicalPlanTranslator.java | 3 ++- .../main/java/org/apache/doris/qe/QeService.java | 9 +++++---- .../main/java/org/apache/doris/qe/StmtExecutor.java | 6 +++--- .../service/arrowflight/FlightStatementContext.java | 2 +- 10 files changed, 19 insertions(+), 47 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 869c24badfc45d4..03d05efd2a26b1e 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -208,23 +208,18 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result) { std::unique_lock l(_lock); - LOG(INFO) << "11111111 d " << _buffer_rows << ", " << _packet_num; if (!_status.ok()) { - LOG(INFO) << "11111111 h " << _buffer_rows << ", " << _packet_num; return _status; } if (_is_cancelled) { - LOG(INFO) << "11111111 g " << _buffer_rows << ", " << _packet_num; return Status::Cancelled("Cancelled"); } while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - LOG(INFO) << "11111111 e " << _buffer_rows << ", " << _packet_num; _data_arrival.wait_for(l, std::chrono::seconds(1)); } if (_is_cancelled) { - LOG(INFO) << "11111111 f " << _buffer_rows << ", " << _packet_num; return Status::Cancelled("Cancelled"); } @@ -234,13 +229,11 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* _buffer_rows -= (*result)->num_rows(); _data_removal.notify_one(); _packet_num++; - LOG(INFO) << "11111111 c " << _buffer_rows << ", " << _packet_num; return Status::OK(); } // normal path end if (_is_close) { - LOG(INFO) << "11111111 i " << _buffer_rows << ", " << _packet_num; return Status::OK(); } return Status::InternalError("Abnormal Ending"); diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 521ebb9a73562cf..c4d0f148ed2a235 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -85,7 +85,6 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size { std::unique_lock wlock(_buffer_map_lock); - LOG(INFO) << "11111111 9 " << query_id << ", " << print_id(query_id); _buffer_map.insert(std::make_pair(query_id, control_block)); // BufferControlBlock should destroy after max_timeout // for exceed max_timeout FE will return timeout to client @@ -101,7 +100,6 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr ResultBufferMgr::find_control_block(const TUniqueId& query_id) { std::shared_lock rlock(_buffer_map_lock); - LOG(INFO) << "11111111 8 " << query_id; BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { @@ -144,9 +142,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr* result) { std::shared_ptr cb = find_control_block(finst_id); - LOG(INFO) << "11111111 7 " << finst_id; if (cb == nullptr) { - LOG(INFO) << "11111111 j " << finst_id; LOG(WARNING) << "no result for this query, id=" << print_id(finst_id); return Status::InternalError("no result for this query"); } @@ -173,13 +169,14 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) { _row_descriptor_map.erase(row_desc_iter); } } + return Status::OK(); } Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { std::lock_guard l(_timeout_lock); TimeoutMap::iterator iter = _timeout_map.find(cancel_time); - LOG(INFO) << "11111111 3 " << query_id << ", " << cancel_time; + if (_timeout_map.end() == iter) { _timeout_map.insert( std::pair>(cancel_time, std::vector())); @@ -212,7 +209,6 @@ void ResultBufferMgr::cancel_thread() { // cancel query for (int i = 0; i < query_to_cancel.size(); ++i) { - LOG(INFO) << "11111111 4 " << query_to_cancel[i]; cancel(query_to_cancel[i]); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index d3890238747a557..8a0f1e67859494c 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -46,7 +46,6 @@ arrow::Result> ArrowFlightBatchReader::C ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( "Schema RowDescriptor Not Found, queryid: {}", print_id(statement_->query_id)))); } - LOG(INFO) << "11111111 2 " << statement_->query_id; std::shared_ptr schema; auto st = convert_to_arrow_schema(row_desc, &schema); if (UNLIKELY(!st.ok())) { @@ -58,20 +57,16 @@ arrow::Result> ArrowFlightBatchReader::C } arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr* out) { - // CHECK(*out == nullptr); // not nullptr - LOG(INFO) << "11111111 1 " << statement_->query_id; + // *out not nullptr *out = nullptr; auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); if (UNLIKELY(!st.ok())) { - LOG(INFO) << "11111111 k " << statement_->query_id; - LOG(WARNING) << st.to_string(); + LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } if (*out != nullptr) { - std::stringstream ss; - arrow_pretty_print(*(*out), &ss); - LOG(INFO) << "11111111 m " << (*out)->num_rows() << ", " << (*out)->num_columns(); - LOG(INFO) << "11111111 l " << ss.str(); + VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns(); } return arrow::Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2b9163297f5da54..6e5e6ad2e62d121 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -709,7 +709,6 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro const PFetchArrowFlightSchemaRequest* request, PFetchArrowFlightSchemaResult* result, google::protobuf::Closure* done) { - LOG(INFO) << "fetch_arrow_flight_schema"; bool ret = _light_work_pool.try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor( @@ -931,7 +930,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - LOG(INFO) << "fetch_cache"; bool ret = _heavy_work_pool.try_offer([this, request, result, done]() { brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index bcd4708a4126e0a..7937358cbe075ff 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -398,8 +398,8 @@ public class Config extends ConfigBase { @ConfField(description = {"FE MySQL server 的端口号", "The port of FE MySQL server"}) public static int query_port = 9030; - @ConfField(description = {"FE Flight-SQL server 的端口号", "The port of FE Flight-SQL server"}) - public static int flight_sql_query_port = 9040; + @ConfField(description = {"FE Arrow-Flight-SQL server 的端口号", "The port of FE Arrow-Flight-SQ server"}) + public static int flight_sql_query_port = -1; @ConfField(description = {"MySQL 服务的 IO 线程数", "The number of IO threads in MySQL service"}) public static int mysql_service_io_threads_num = 4; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 02722a175326672..0c00f1f4425a8dd 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -36,7 +36,6 @@ under the License. 2.17.257 3.1.1-hw-46 3.3.5 - 2.9.0 @@ -761,17 +760,6 @@ under the License. org.apache.arrow arrow-vector - - org.apache.commons - commons-dbcp2 - ${commons-dbcp2.version} - - - commons-logging - commons-logging - - - org.hamcrest hamcrest diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 9832221f8e9d411..8e2b52b06b482d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -332,7 +332,8 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d public PlanFragment visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanTranslatorContext context) { PlanFragment planFragment = physicalResultSink.child().accept(this, context); - planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId())); + planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), + ConnectContext.get().getResultSinkType())); return planFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java index 70164e4afd6e13e..a4cd3d0475f1b0b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java @@ -62,10 +62,11 @@ public void start() throws Exception { LOG.error("mysql server start failed"); System.exit(-1); } - this.flightSqlService = new FlightSqlService(flightsqlPort); - if (!flightSqlService.start()) { - LOG.error("flightsql server start failed"); - System.exit(-1); + if (flightsqlPort != -1) { + this.flightSqlService = new FlightSqlService(flightsqlPort); + if (!flightSqlService.start()) { + System.exit(-1); + } } LOG.info("QE service start."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8f386c16cc27708..adfa3a8339925ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -187,7 +187,6 @@ import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -2571,7 +2570,8 @@ public List executeInternalQuery() { planner = new NereidsPlanner(statementContext); planner.plan(parsedStmt, context.getSessionVariable().toThrift()); } catch (Exception e) { - LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + LOG.warn("Arrow Flight SQL fall back to legacy planner, because: {}", + e.getMessage(), e); parsedStmt = null; planner = null; context.getState().setNereids(false); @@ -2673,7 +2673,7 @@ public void executeArrowFlightQuery(FlightStatementContext flightStatementContex coord.exec(); } catch (Exception e) { queryScheduleSpan.recordException(e); - LOG.warn("Failed to coord exec, because: {}", e.getMessage(), e); + LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e); throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e); } finally { queryScheduleSpan.end(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java index 86f335d22585cd7..1e28cd173ec2264 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementContext.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; +import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; @@ -132,7 +133,6 @@ public static AutoCloseConnectContext buildConnectContext() { ConnectContext connectContext = new ConnectContext(); SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; - sessionVariable.setEnableNereidsPlanner(false); // TODO sessionVariable.setEnablePipelineEngine(false); // TODO sessionVariable.setEnablePipelineXEngine(false); // TODO connectContext.setEnv(Env.getCurrentEnv());