Skip to content

Commit

Permalink
mutex to shared_mutext by Happen Lee
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Nov 29, 2024
1 parent 2aaab82 commit d4d9fe2
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 32 deletions.
2 changes: 1 addition & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ add_compile_options(-g
$<$<COMPILE_LANGUAGE:CXX>:-Wnon-virtual-dtor>)

add_compile_options(-Wno-unused-parameter
-Wno-sign-compare)
-Wno-sign-compare -Wno-missing-field-initializers)

if (COMPILER_GCC)
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "11.1")
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";

io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
const io::FileDescription& fd) {
io::FileReaderOptions opts {.file_size = fd.file_size, .mtime = fd.mtime};
io::FileReaderOptions opts {};
opts.file_size = fd.file_size;
opts.mtime = fd.mtime;

if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
state->query_options().enable_file_cache) {
Expand Down
73 changes: 45 additions & 28 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,17 @@ void FragmentMgr::stop() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.clear();
_query_ctx_map.clear();
for (auto& pipeline : _pipeline_map) {
pipeline.second->close_sink();
}
_pipeline_map.clear();
}

{
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.clear();
}

_async_report_thread_pool->shutdown();
}

Expand Down Expand Up @@ -610,11 +615,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());

LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id()));

if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
}
if (all_done && query_ctx) {
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}

// Callback after remove from this id
Expand Down Expand Up @@ -700,10 +705,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
std::shared_ptr<QueryContext> q_ctx = nullptr;
{
std::lock_guard<std::mutex> lock(_lock);
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
Expand All @@ -720,22 +725,24 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
auto* q_context = f_context->get_query_ctx();
bool all_done = false;
TUniqueId query_id = f_context->get_query_id();
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
all_done = q_context->countdown(ins_ids.size());
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id),
print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
g_pipeline_fragment_instances_count << -1;
}
if (all_done) {
LOG_INFO("Query {} finished", print_id(query_id));
_query_ctx_map.erase(query_id);
}
}
if (all_done) {
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_id);
LOG_INFO("Query {} finished", print_id(query_id));
}
}

Expand All @@ -747,7 +754,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
{ return Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
return Status::InternalError(
Expand All @@ -759,11 +766,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
return Status::OK();
{
std::shared_lock lock(_query_ctx_map_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
return Status::OK();
}
}

TNetworkAddress current_connect_fe_addr;
Expand Down Expand Up @@ -826,7 +835,10 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
{
std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
}
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
Expand Down Expand Up @@ -1158,7 +1170,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query
}

std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& query_id) {
std::lock_guard<std::mutex> state_lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
return ctx->second;
Expand All @@ -1172,7 +1184,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
std::shared_ptr<QueryContext> query_ctx;
std::vector<TUniqueId> all_instance_ids;
{
std::lock_guard<std::mutex> state_lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto ctx_iter = _query_ctx_map.find(query_id);

if (ctx_iter == _query_ctx_map.end()) {
Expand Down Expand Up @@ -1239,7 +1251,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id,

void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
const PPlanFragmentCancelReason& reason, const std::string& msg) {
std::unique_lock<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto q_ctx_iter = _query_ctx_map.find(query_id);
if (q_ctx_iter != _query_ctx_map.end()) {
// Has to use value to keep the shared ptr not deconstructed.
Expand Down Expand Up @@ -1301,6 +1313,9 @@ void FragmentMgr::cancel_worker() {
pipeline_itr.second->clear_finished_tasks();
}
}
}
{
std::unique_lock lock(_query_ctx_map_lock);
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
Expand All @@ -1309,7 +1324,9 @@ void FragmentMgr::cancel_worker() {
++it;
}
}

}
{
std::shared_lock lock(_query_ctx_map_lock);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
Expand Down Expand Up @@ -1689,7 +1706,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished",
Expand All @@ -1712,7 +1729,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
Expand All @@ -1735,7 +1752,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
auto iter = _query_ctx_map.find(query_id);
if (iter == _query_ctx_map.end()) {
return Status::InvalidArgument("query-id: {}", queryid.to_string());
Expand Down Expand Up @@ -1830,7 +1847,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag

void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
std::shared_lock lock(_query_ctx_map_lock);
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class FragmentMgr : public RestMonitorIface {
std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);

int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
std::shared_lock ctx_lock(_query_ctx_map_lock);
return _query_ctx_map.size();
}

Expand Down Expand Up @@ -192,6 +192,7 @@ class FragmentMgr : public RestMonitorIface {

std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;

std::shared_mutex _query_ctx_map_lock;
// query id -> QueryContext
std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
Expand Down

0 comments on commit d4d9fe2

Please sign in to comment.