diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 0305e886ef964a..820a007026f44c 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -254,7 +254,7 @@ add_compile_options(-g $<$:-Wnon-virtual-dtor>) add_compile_options(-Wno-unused-parameter - -Wno-sign-compare) + -Wno-sign-compare -Wno-missing-field-initializers) if (COMPILER_GCC) if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "11.1") diff --git a/be/src/clucene b/be/src/clucene index 48fa9cc4ec32b4..7cf6cf410d41d9 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 48fa9cc4ec32b40bf3b02338d0a1b2cdbc6408cf +Subproject commit 7cf6cf410d41d95456edba263cc55b7b6f5ab027 diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 95d537320883e8..539a33ea3c487f 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -50,7 +50,10 @@ constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random"; io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state, const io::FileDescription& fd) { - io::FileReaderOptions opts {.file_size = fd.file_size, .mtime = fd.mtime}; + io::FileReaderOptions opts {}; + opts.file_size = fd.file_size; + opts.mtime = fd.mtime; + if (config::enable_file_cache && state != nullptr && state->query_options().__isset.enable_file_cache && state->query_options().enable_file_cache) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index cd8eabb06424c3..8a2f294e656389 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -269,12 +269,17 @@ void FragmentMgr::stop() { { std::lock_guard lock(_lock); _fragment_instance_map.clear(); - _query_ctx_map.clear(); for (auto& pipeline : _pipeline_map) { pipeline.second->close_sink(); } _pipeline_map.clear(); } + + { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.clear(); + } + _async_report_thread_pool->shutdown(); } @@ -610,11 +615,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr fragment_ex _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id())); - - if (all_done && query_ctx) { - _query_ctx_map.erase(query_ctx->query_id()); - LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); - } + } + if (all_done && query_ctx) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_ctx->query_id()); + LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); } // Callback after remove from this id @@ -700,10 +705,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) { std::shared_ptr q_ctx = nullptr; { - std::lock_guard lock(_lock); TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -720,22 +725,24 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { auto* q_context = f_context->get_query_ctx(); + bool all_done = false; + TUniqueId query_id = f_context->get_query_id(); { std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); std::vector ins_ids; f_context->instance_ids(ins_ids); - bool all_done = q_context->countdown(ins_ids.size()); + all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id), print_id(ins_id), all_done); _pipeline_map.erase(ins_id); g_pipeline_fragment_instances_count << -1; } - if (all_done) { - LOG_INFO("Query {} finished", print_id(query_id)); - _query_ctx_map.erase(query_id); - } + } + if (all_done) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_id); + LOG_INFO("Query {} finished", print_id(query_id)); } } @@ -747,7 +754,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -759,11 +766,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // Find _query_ctx_map, in case some other request has already // create the query fragments context. - std::lock_guard lock(_lock); - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - query_ctx = search->second; - return Status::OK(); + { + std::shared_lock lock(_query_ctx_map_lock); + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second; + return Status::OK(); + } } TNetworkAddress current_connect_fe_addr; @@ -826,7 +835,10 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } // There is some logic in query ctx's dctor, we could not check if exists and delete the // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); + { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); + } LOG(INFO) << "Register query/load memory tracker, query/load id: " << print_id(query_ctx->query_id()) << " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES); @@ -1158,7 +1170,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query } std::shared_ptr FragmentMgr::get_query_context(const TUniqueId& query_id) { - std::lock_guard state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx = _query_ctx_map.find(query_id); if (ctx != _query_ctx_map.end()) { return ctx->second; @@ -1172,7 +1184,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan std::shared_ptr query_ctx; std::vector all_instance_ids; { - std::lock_guard state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx_iter = _query_ctx_map.find(query_id); if (ctx_iter == _query_ctx_map.end()) { @@ -1239,7 +1251,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id, void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto q_ctx_iter = _query_ctx_map.find(query_id); if (q_ctx_iter != _query_ctx_map.end()) { // Has to use value to keep the shared ptr not deconstructed. @@ -1301,6 +1313,9 @@ void FragmentMgr::cancel_worker() { pipeline_itr.second->clear_finished_tasks(); } } + } + { + std::unique_lock lock(_query_ctx_map_lock); for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (it->second->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); @@ -1309,7 +1324,9 @@ void FragmentMgr::cancel_worker() { ++it; } } - + } + { + std::shared_lock lock(_query_ctx_map_lock); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1689,7 +1706,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished", @@ -1712,7 +1729,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1735,7 +1752,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1830,7 +1847,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); for (const auto& q : _query_ctx_map) { WorkloadQueryInfo workload_query_info; workload_query_info.query_id = print_id(q.first); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 16ad368ae6108f..7e71933917e855 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -143,7 +143,7 @@ class FragmentMgr : public RestMonitorIface { std::shared_ptr get_query_context(const TUniqueId& query_id); int32_t running_query_num() { - std::unique_lock ctx_lock(_lock); + std::shared_lock ctx_lock(_query_ctx_map_lock); return _query_ctx_map.size(); } @@ -192,6 +192,7 @@ class FragmentMgr : public RestMonitorIface { std::unordered_map> _pipeline_map; + std::shared_mutex _query_ctx_map_lock; // query id -> QueryContext std::unordered_map> _query_ctx_map; std::unordered_map> _bf_size_map;