From 5d6e0bc282831cbf77a62608eeaa3649c2929475 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 24 Oct 2024 18:59:33 +0800 Subject: [PATCH] 1 --- be/src/runtime/fragment_mgr.cpp | 50 +++++++++++++++++------------- be/src/runtime/fragment_mgr.h | 1 + be/src/service/backend_service.cpp | 14 +++++++-- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 26fb098c76dfc5..33493be2d997d6 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report( // including the final status when execution finishes. void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.status.ok() || req.done); // if !status.ok() => done + if (req.coord_addr.hostname == "external") { + // External query (flink/spark read tablets) not need to report to FE. + return; + } Status exec_status = req.status; Status coord_status; FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, @@ -797,30 +801,32 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_merge_controller_handler(handler); } - for (const auto& local_param : params.local_params) { - const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + { + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. std::lock_guard lock(_lock); - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", - params.fragment_id); + for (const auto& local_param : params.local_params) { + const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", + params.fragment_id); + } + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); - } - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - query_ctx->set_ready_to_execute_only(); - } + if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { + query_ctx->set_ready_to_execute_only(); + } - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - { - g_fragment_executing_count << 1; - g_fragment_last_active_time.set_value(now); - std::lock_guard lock(_lock); - // TODO: simplify this mapping - _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + { + g_fragment_executing_count << 1; + g_fragment_last_active_time.set_value(now); + // TODO: simplify this mapping + _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); + } } query_ctx->set_pipeline_context(params.fragment_id, context); @@ -1031,6 +1037,7 @@ void FragmentMgr::debug(std::stringstream& ss) {} */ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector* selected_columns) { // set up desc tbl @@ -1071,8 +1078,9 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, // assign the param used for executing of PlanFragment-self TPipelineInstanceParams fragment_exec_params; - exec_fragment_params.query_id = t_query_plan_info.query_id; + exec_fragment_params.query_id = query_id; fragment_exec_params.fragment_instance_id = fragment_instance_id; + exec_fragment_params.coord.hostname = "external"; std::map<::doris::TPlanNodeId, std::vector> per_node_scan_ranges; std::vector scan_ranges; std::vector tablet_ids = params.tablet_ids; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 41b63db0b23ad9..20b2fd8cdc2063 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -112,6 +112,7 @@ class FragmentMgr : public RestMonitorIface { // execute external query, all query info are packed in TScanOpenParams Status exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector* selected_columns); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index d56aa49b19b1cf..e6fdfaa87657f8 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus& t_status, void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) { TStatus t_status; TUniqueId fragment_instance_id = generate_uuid(); + // A query_id is randomly generated to replace t_query_plan_info.query_id. + // external query does not need to report anything to FE, so the query_id can be changed. + // Otherwise, multiple independent concurrent open tablet scanners have the same query_id. + // when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id). + TUniqueId query_id = generate_uuid(); std::shared_ptr p_context; static_cast(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context)); p_context->fragment_instance_id = fragment_instance_id; @@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenP << " deserialize error, should not be modified after returned Doris FE processed"; exec_st = Status::InvalidArgument(msg.str()); } - p_context->query_id = t_query_plan_info.query_id; + p_context->query_id = query_id; } std::vector selected_columns; if (exec_st.ok()) { // start the scan procedure + LOG(INFO) << fmt::format( + "exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id " + "= {}", + print_id(t_query_plan_info.query_id), print_id(query_id), + print_id(fragment_instance_id)); exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment( - params, t_query_plan_info, fragment_instance_id, &selected_columns); + params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns); } exec_st.to_thrift(&t_status); //return status