From a88dac47ee666ff61cec4e1839772cd4ce2f3d7f Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 29 Jul 2024 20:09:55 +0800 Subject: [PATCH] 1 --- be/src/common/daemon.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 15 ++- .../memory/global_memory_arbitrator.cpp | 4 +- .../runtime/memory/thread_mem_tracker_mgr.h | 2 +- be/src/runtime/query_context.h | 18 +--- .../runtime/workload_group/workload_group.cpp | 4 - .../runtime/workload_group/workload_group.h | 40 +++---- .../workload_group/workload_group_manager.cpp | 101 ++++++++++-------- .../workload_group/workload_group_manager.h | 2 +- 9 files changed, 92 insertions(+), 96 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 7667820b83f84f..3ab929a4c790f1 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -396,7 +396,7 @@ void Daemon::wg_weighted_memory_ratio_refresh_thread() { // Refresh weighted memory ratio of workload groups while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) { - doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio(); + doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index b8a52575b97682..b44de629cd3a65 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -407,10 +407,9 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m } return false; } else if (is_wg_mem_low_water_mark) { - int64_t query_weighted_limit = 0; - int64_t query_weighted_consumption = 0; - query_ctx->get_weighted_memory(query_weighted_limit, query_weighted_consumption); - if (query_weighted_limit == 0 || query_weighted_consumption < query_weighted_limit) { + int64_t spill_threshold = query_ctx->spill_threshold(); + int64_t memory_usage = query_ctx->query_mem_tracker->consumption(); + if (spill_threshold == 0 || memory_usage < spill_threshold) { return false; } auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num(); @@ -419,7 +418,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m if (0 == big_memory_operator_num) { return false; } else { - mem_limit_of_op = query_weighted_limit / big_memory_operator_num; + mem_limit_of_op = spill_threshold / big_memory_operator_num; } LOG_EVERY_T(INFO, 1) << "query " << print_id(state->query_id()) @@ -428,10 +427,8 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op) << ", min_revocable_mem_bytes: " << PrettyPrinter::print_bytes(min_revocable_mem_bytes) - << ", query_weighted_consumption: " - << PrettyPrinter::print_bytes(query_weighted_consumption) - << ", query_weighted_limit: " - << PrettyPrinter::print_bytes(query_weighted_limit) + << ", memory_usage: " << PrettyPrinter::print_bytes(memory_usage) + << ", spill_threshold: " << PrettyPrinter::print_bytes(spill_threshold) << ", big_memory_operator_num: " << big_memory_operator_num; return (revocable_mem_bytes > mem_limit_of_op || revocable_mem_bytes > min_revocable_mem_bytes); diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 35fa350987f34f..344bcbc59846d9 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -40,7 +40,7 @@ std::atomic GlobalMemoryArbitrator::_s_process_reserved_memory = 0; std::atomic GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { - if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { + if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) { return false; } int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); @@ -50,7 +50,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { if (UNLIKELY(vm_rss_sub_allocator_cache() + refresh_interval_memory_growth.load(std::memory_order_relaxed) + new_reserved_mem >= - MemInfo::mem_limit())) { + MemInfo::soft_mem_limit())) { return false; } } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 39e896d0f18d2d..141c54382cfd90 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -294,7 +294,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { return false; } auto wg_ptr = _wg_wptr.lock(); - if (!wg_ptr) { + if (wg_ptr) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { _limiter_tracker_raw->release(size); // rollback return false; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 0d7870a0e1d36a..1bbccfd33b7403 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -230,18 +230,8 @@ class QueryContext { return _running_big_mem_op_num.load(std::memory_order_relaxed); } - void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) { - std::lock_guard l(_weighted_mem_lock); - _weighted_limit = weighted_limit; - _weighted_ratio = weighted_ratio; - } - - void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) { - std::lock_guard l(_weighted_mem_lock); - weighted_limit = _weighted_limit; - weighted_consumption = int64_t(query_mem_tracker->consumption() * _weighted_ratio); - } - + void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; } + int64_t spill_threshold() { return _spill_threshold; } DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -311,9 +301,7 @@ class QueryContext { std::map> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; - std::mutex _weighted_mem_lock; - double _weighted_ratio = 0; - int64_t _weighted_limit = 0; + std::atomic _spill_threshold {0}; std::mutex _profile_mutex; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6347193e319cd2..29c6fc7ae1b80d 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -152,10 +152,6 @@ void WorkloadGroup::refresh_memory(int64_t used_memory) { _wg_refresh_interval_memory_growth.store(0.0); } -void WorkloadGroup::set_weighted_memory_ratio(double ratio) { - _weighted_mem_ratio = ratio; -} - void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { std::unique_lock wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 7605b9a17d8cf2..6b87a1cf7e5e69 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -77,6 +77,12 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit; }; + int64_t weighted_memory_limit() const { return _weighted_memory_limit; }; + + void set_weighted_memory_limit(int64_t weighted_memory_limit) { + _weighted_memory_limit = weighted_memory_limit; + } + // make memory snapshots and refresh total memory used at the same time. int64_t make_memory_tracker_snapshots( std::list>* tracker_snapshots); @@ -93,13 +99,10 @@ class WorkloadGroup : public std::enable_shared_from_this { void set_weighted_memory_ratio(double ratio); bool add_wg_refresh_interval_memory_growth(int64_t size) { - // `weighted_mem_used` is a rough memory usage in this group, - // because we can only get a precise memory usage by MemTracker which is not include page cache. - auto weighted_mem_used = - int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) * - _weighted_mem_ratio); - if ((weighted_mem_used > ((double)_memory_limit * - _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + if ((realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { return false; } else { _wg_refresh_interval_memory_growth.fetch_add(size); @@ -111,17 +114,13 @@ class WorkloadGroup : public std::enable_shared_from_this { } void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const { - // `weighted_mem_used` is a rough memory usage in this group, - // because we can only get a precise memory usage by MemTracker which is not include page cache. - auto weighted_mem_used = - int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load()) * - _weighted_mem_ratio); - *is_low_wartermark = - (weighted_mem_used > ((double)_memory_limit * - _spill_low_watermark.load(std::memory_order_relaxed) / 100)); - *is_high_wartermark = - (weighted_mem_used > ((double)_memory_limit * - _spill_high_watermark.load(std::memory_order_relaxed) / 100)); + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + *is_low_wartermark = (realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_low_watermark.load(std::memory_order_relaxed) / 100)); + *is_high_wartermark = (realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_high_watermark.load(std::memory_order_relaxed) / 100)); } std::string debug_string() const; @@ -200,10 +199,11 @@ class WorkloadGroup : public std::enable_shared_from_this { std::string _name; int64_t _version; int64_t _memory_limit; // bytes + // `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory. + // more detailed description in `refresh_wg_weighted_memory_limit`. + std::atomic _weighted_memory_limit {0}; // // last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called. std::atomic_int64_t _total_mem_used = 0; // bytes - // last value of refresh_wg_weighted_memory_ratio. - std::atomic _weighted_mem_ratio = 0.0; std::atomic_int64_t _wg_refresh_interval_memory_growth; bool _enable_memory_overcommit; std::atomic _cpu_share; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 9e595841c6770c..6a196497e724a8 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -153,86 +153,101 @@ struct WorkloadGroupMemInfo { std::list>(); }; -void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() { +void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { std::shared_lock r_lock(_group_mutex); // 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time) // and calculate total memory used of all queries. - int64_t all_queries_mem_used = 0; + int64_t all_workload_groups_mem_usage = 0; std::unordered_map wgs_mem_info; for (auto& [wg_id, wg] : _workload_groups) { wgs_mem_info[wg_id].total_mem_used = wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots); - all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used; + all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used; } - if (all_queries_mem_used <= 0) { + if (all_workload_groups_mem_usage <= 0) { return; } - // 2. calculate weighted ratio. - // process memory used is actually bigger than all_queries_mem_used, - // because memory of page cache, allocator cache, segment cache etc. are included - // in proc_vm_rss. - // we count these cache memories equally on workload groups. + // 2. calculate weighted memory limit ratio. + // when construct workload group, mem_limit is equal to (process_memory_limit * group_limit_percent), + // here, it is assumed that the available memory of workload groups is equal to process_memory_limit. + // + // but process_memory_usage is actually bigger than all_workload_groups_mem_usage, + // because public_memory of page cache, allocator cache, segment cache etc. are included in process_memory_usage. + // so actual available memory of the workload groups is equal to (process_memory_limit - public_memory) + // + // we will exclude this public_memory when calculate workload group mem_limit. + // so a ratio is calculated to multiply the workload group mem_limit from the previous construction. auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage(); - all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used); - double ratio = (double)process_memory_usage / (double)all_queries_mem_used; - if (ratio <= 1.25) { - std::string debug_msg = - fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}", - doris::GlobalMemoryArbitrator::process_memory_used_details_str(), - doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), - PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); - LOG_EVERY_T(INFO, 10) << debug_msg; + auto process_memory_limit = MemInfo::mem_limit(); + double weighted_memory_limit_ratio = 1; + // if all_workload_groups_mem_usage is greater than process_memory_usage, it means that the memory statistics + // of the workload group are inaccurate. + // the reason is that query/load/etc. tracked is virtual memory, and virtual memory is not used in time. + // + // At this time, weighted_memory_limit_ratio is equal to 1, and workload group mem_limit is still equal to + // (process_memory_limit * group_limit_percent), this may cause query spill to occur earlier, + // However, there is no good solution at present, but we cannot predict when these virtual memory will be used. + if (all_workload_groups_mem_usage < process_memory_usage) { + int64_t public_memory = process_memory_usage - all_workload_groups_mem_usage; + weighted_memory_limit_ratio = 1 - (double)public_memory / (double)process_memory_limit; } + std::string debug_msg = fmt::format( + "\nProcess Memory Summary: {}, {}, all workload groups memory usage: {}, " + "weighted_memory_limit_ratio: {}", + doris::GlobalMemoryArbitrator::process_memory_used_details_str(), + doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), + PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES), + weighted_memory_limit_ratio); + LOG_EVERY_T(INFO, 10) << debug_msg; + for (auto& wg : _workload_groups) { - // 3.1 calculate query weighted memory limit of task group - auto wg_mem_limit = wg.second->memory_limit(); - auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size(); - int64_t query_weighted_mem_limit = - wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit; + // 3.1 calculate query spill threshold of task group + auto wg_weighted_mem_limit = + int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio); + wg.second->set_weighted_memory_limit(wg_weighted_mem_limit); - // 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio. - wg.second->set_weighted_memory_ratio(ratio); + // 3.2 set workload groups weighted memory limit and all query spill threshold. + auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size(); + int64_t query_spill_threshold = + wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count + : wg_weighted_mem_limit; for (const auto& query : wg.second->queries()) { auto query_ctx = query.second.lock(); if (!query_ctx) { continue; } - query_ctx->set_weighted_memory(query_weighted_mem_limit, ratio); + query_ctx->set_spill_threshold(query_spill_threshold); } // 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark. - auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio); - bool is_high_wartermark = - (weighted_mem_used > - ((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100)); - bool is_low_wartermark = - (weighted_mem_used > - ((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100)); + bool is_low_wartermark = false; + bool is_high_wartermark = false; + wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark); std::string debug_msg; if (is_high_wartermark || is_low_wartermark) { debug_msg = fmt::format( - "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used " - "ratio: {}, query " - "count: {}, query_weighted_mem_limit: {}", - wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES), + "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, " + "used " + "ratio: {}, query count: {}, query spill threshold: {}", + wg.second->name(), + PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES), PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES), - PrettyPrinter::print(weighted_mem_used, TUnit::BYTES), - (double)weighted_mem_used / wg_mem_limit, wg_query_count, - PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); + PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES), + (double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit, + wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES)); debug_msg += "\n Query Memory Summary:"; // check whether queries need to revoke memory for task group for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( - "\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, " + "\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, " "Peak={}", query_mem_tracker->label(), query_mem_tracker->parent_label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), - PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio), - TUnit::BYTES), + PrettyPrinter::print(query_spill_threshold, TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); } LOG_EVERY_T(INFO, 1) << debug_msg; diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 37539ada8d85e6..f7f02bf63e6997 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -54,7 +54,7 @@ class WorkloadGroupMgr { bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - void refresh_wg_weighted_memory_ratio(); + void refresh_wg_weighted_memory_limit(); private: std::shared_mutex _group_mutex;