diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 33493be2d997d6..3b59394330aa0e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -801,33 +801,30 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_merge_controller_handler(handler); } - { + for (const auto& local_param : params.local_params) { + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. std::lock_guard lock(_lock); - for (const auto& local_param : params.local_params) { - const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", - params.fragment_id); - } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", + params.fragment_id); } + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - query_ctx->set_ready_to_execute_only(); - } + g_fragment_executing_count << 1; + g_fragment_last_active_time.set_value(now); + // TODO: simplify this mapping + _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); + } - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - { - g_fragment_executing_count << 1; - g_fragment_last_active_time.set_value(now); - // TODO: simplify this mapping - _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); - } + if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { + query_ctx->set_ready_to_execute_only(); } + query_ctx->set_pipeline_context(params.fragment_id, context); RETURN_IF_ERROR(context->submit());