diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5222100170e7a44..f66a7dd17c5e093 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -601,8 +601,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000"); // Sleep time in milliseconds between memtbale flush mgr refresh iterations DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5"); -// Sleep time in milliseconds between refresh iterations of workload group memory statistics -DEFINE_mInt64(wg_mem_refresh_interval_ms, "50"); +DEFINE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms, "50"); // percent of (active memtables size / all memtables size) when reach hard limit DEFINE_mInt32(memtable_hard_limit_active_percent, "50"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 53261ab2fb96020..fd38924f47e74ec 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -658,8 +658,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms); // Sleep time in milliseconds between memtbale flush mgr memory refresh iterations DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms); -// Sleep time in milliseconds between refresh iterations of workload group memory statistics -DECLARE_mInt64(wg_mem_refresh_interval_ms); +// Sleep time in milliseconds between refresh iterations of workload group weighted memory ratio +DECLARE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms); // percent of (active memtables size / all memtables size) when reach hard limit DECLARE_mInt32(memtable_hard_limit_active_percent); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index c97904f5677b44d..7667820b83f84fc 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -392,11 +392,11 @@ void Daemon::je_purge_dirty_pages_thread() const { } while (true); } -void Daemon::wg_mem_used_refresh_thread() { - // Refresh memory usage and limit of workload groups +void Daemon::wg_weighted_memory_ratio_refresh_thread() { + // Refresh weighted memory ratio of workload groups while (!_stop_background_threads_latch.wait_for( - std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) { - doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info(); + std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) { + doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio(); } } @@ -441,7 +441,8 @@ void Daemon::start() { CHECK(st.ok()) << st; st = Thread::create( - "Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); }, + "Daemon", "wg_weighted_memory_ratio_refresh_thread", + [this]() { this->wg_weighted_memory_ratio_refresh_thread(); }, &_threads.emplace_back()); if (config::enable_be_proc_monitor) { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 9dfb079b904ad46..2a8adf20e4681ae 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -44,7 +44,7 @@ class Daemon { void calculate_metrics_thread(); void je_purge_dirty_pages_thread() const; void report_runtime_query_statistics_thread(); - void wg_mem_used_refresh_thread(); + void wg_weighted_memory_ratio_refresh_thread(); void be_proc_monitor_thread(); CountDownLatch _stop_background_threads_latch; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 80b23d9401138c1..b0f115748e0d134 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -406,7 +406,8 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m } else if (is_wg_mem_low_water_mark) { int64_t query_weighted_limit = 0; int64_t query_weighted_consumption = 0; - query_ctx->get_weighted_mem_info(query_weighted_limit, query_weighted_consumption); + query_ctx->query_mem_tracker->get_weighted_memory(query_weighted_limit, + query_weighted_consumption); if (query_weighted_limit == 0 || query_weighted_consumption < query_weighted_limit) { return false; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5389bf2b7ec8622..0bf072e86945279 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -634,7 +634,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo WorkloadGroupPtr workload_group_ptr = _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), tg_id); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index dd2b89029cb7d34..46ca5cb984d7968 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -202,6 +202,18 @@ class MemTrackerLimiter final : public MemTracker { return querytid; } + void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) { + std::lock_guard l(_weighted_mem_lock); + _weighted_limit = weighted_limit; + _weighted_ratio = weighted_ratio; + } + + void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) { + std::lock_guard l(_weighted_mem_lock); + weighted_limit = _weighted_limit; + weighted_consumption = int64_t(_consumption->current_value() * _weighted_ratio); + } + // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); @@ -220,8 +232,10 @@ class MemTrackerLimiter final : public MemTracker { return msg.str(); } + // only for work load group // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list>::iterator tg_tracker_limiter_group_it; + std::list>::iterator wg_tracker_limiter_group_it; + std::shared_ptr wg_refresh_interval_memory_growth; private: friend class ThreadMemTrackerMgr; @@ -245,8 +259,11 @@ class MemTrackerLimiter final : public MemTracker { // to avoid frequent calls to consume/release of MemTracker. std::atomic _untracked_mem = 0; - // query or load + // only for query or load std::atomic _is_query_cancelled = false; + std::mutex _weighted_mem_lock; + double _weighted_ratio = 0; + int64_t _weighted_limit = 0; // Avoid frequent printing. bool _enable_print_log_usage = false; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 9d36cd2d8078136..da260eb707af26c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -291,6 +291,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { _limiter_tracker_raw->release(size); // rollback return false; } + _limiter_tracker_raw->wg_refresh_interval_memory_growth->fetch_add(size); if (_count_scope_mem) { _scope_mem += size; } @@ -306,6 +307,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); _limiter_tracker_raw->release(_reserved_mem); + _limiter_tracker_raw->wg_refresh_interval_memory_growth->fetch_sub(_reserved_mem); if (_count_scope_mem) { _scope_mem -= _reserved_mem; } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index dd7cf4f55b87063..109ae5d05037b65 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -141,7 +141,6 @@ QueryContext::~QueryContext() { if (_workload_group) { group_id = _workload_group->id(); // before remove _workload_group->remove_mem_tracker_limiter(query_mem_tracker); - _workload_group->remove_query(_query_id); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index b565214ef220824..4b4c5ffb2f03a30 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -230,17 +230,6 @@ class QueryContext { return _running_big_mem_op_num.load(std::memory_order_relaxed); } - void set_weighted_mem(int64_t weighted_limit, int64_t weighted_consumption) { - std::lock_guard l(_weighted_mem_lock); - _weighted_consumption = weighted_consumption; - _weighted_limit = weighted_limit; - } - void get_weighted_mem_info(int64_t& weighted_limit, int64_t& weighted_consumption) { - std::lock_guard l(_weighted_mem_lock); - weighted_limit = _weighted_limit; - weighted_consumption = _weighted_consumption; - } - DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -310,10 +299,6 @@ class QueryContext { std::map> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; - std::mutex _weighted_mem_lock; - int64_t _weighted_consumption = 0; - int64_t _weighted_limit = 0; - std::mutex _profile_mutex; // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index f4d1e0d4f7eb956..b061f0fff710ce8 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -62,7 +62,9 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num), _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num), _spill_low_watermark(tg_info.spill_low_watermark), - _spill_high_watermark(tg_info.spill_high_watermark) {} + _spill_high_watermark(tg_info.spill_high_watermark) { + _refresh_interval_memory_growth = std::make_shared(0); +} std::string WorkloadGroup::debug_string() const { std::shared_lock rl {_mutex}; @@ -74,7 +76,7 @@ std::string WorkloadGroup::debug_string() const { _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num, - _spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size()); + _spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_num); } void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { @@ -107,41 +109,60 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { } } -int64_t WorkloadGroup::memory_used() { +int64_t WorkloadGroup::make_memory_tracker_snapshots( + std::list>* tracker_snapshots) { int64_t used_memory = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard l(mem_tracker_group.group_lock); for (const auto& trackerWptr : mem_tracker_group.trackers) { auto tracker = trackerWptr.lock(); CHECK(tracker != nullptr); + if (tracker_snapshots != nullptr) { + tracker_snapshots->insert(tracker_snapshots->end(), tracker); + } used_memory += tracker->consumption(); } } + // refresh total memory used. + _total_mem_used = used_memory; + // reserve memory is recorded in the query mem tracker + // and _total_mem_used already contains all the current reserve memory. + // so after refreshing _total_mem_used, reset _refresh_interval_memory_growth. + _refresh_interval_memory_growth->store(0.0); return used_memory; } -void WorkloadGroup::set_weighted_memory_used(int64_t wg_total_mem_used, double ratio) { - _weighted_mem_used.store(int64_t(wg_total_mem_used * ratio), std::memory_order_relaxed); +int64_t WorkloadGroup::memory_used() { + return make_memory_tracker_snapshots(nullptr); +} + +void WorkloadGroup::set_weighted_memory_ratio(double ratio) { + _weighted_mem_ratio = ratio; } void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { + std::unique_lock wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); - mem_tracker_ptr->tg_tracker_limiter_group_it = + mem_tracker_ptr->wg_tracker_limiter_group_it = _mem_tracker_limiter_pool[group_num].trackers.insert( _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); + mem_tracker_ptr->wg_refresh_interval_memory_growth = _refresh_interval_memory_growth; + _query_num++; } void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { + std::unique_lock wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); - if (mem_tracker_ptr->tg_tracker_limiter_group_it != + if (mem_tracker_ptr->wg_tracker_limiter_group_it != _mem_tracker_limiter_pool[group_num].trackers.end()) { _mem_tracker_limiter_pool[group_num].trackers.erase( - mem_tracker_ptr->tg_tracker_limiter_group_it); - mem_tracker_ptr->tg_tracker_limiter_group_it = + mem_tracker_ptr->wg_tracker_limiter_group_it); + mem_tracker_ptr->wg_tracker_limiter_group_it = _mem_tracker_limiter_pool[group_num].trackers.end(); } + _query_num--; } int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index a82efab09043a31..f490e56b98288ae 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -76,6 +76,10 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit; }; + // make memory snapshots and refresh total memory used at the same time. + int64_t make_memory_tracker_snapshots( + std::list>* tracker_snapshots); + // call make_memory_tracker_snapshots, so also refresh total memory used. int64_t memory_used(); int spill_threshold_low_water_mark() const { @@ -85,10 +89,13 @@ class WorkloadGroup : public std::enable_shared_from_this { return _spill_high_watermark.load(std::memory_order_relaxed); } - void set_weighted_memory_used(int64_t wg_total_mem_used, double ratio); + void set_weighted_memory_ratio(double ratio); void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const { - auto weighted_mem_used = _weighted_mem_used.load(std::memory_order_relaxed); + // `weighted_mem_used` is a rough memory usage in this group, + // because we can only get a precise memory usage by MemTracker which is not include page cache. + auto weighted_mem_used = int64_t( + (_total_mem_used + _refresh_interval_memory_growth->load()) * _weighted_mem_ratio); *is_low_wartermark = (weighted_mem_used > ((double)_memory_limit * _spill_low_watermark.load(std::memory_order_relaxed) / 100)); @@ -112,24 +119,6 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit > 0; } - Status add_query(TUniqueId query_id, std::shared_ptr query_ctx) { - std::unique_lock wlock(_mutex); - if (_is_shutdown) { - // If the workload group is set shutdown, then should not run any more, - // because the scheduler pool and other pointer may be released. - return Status::InternalError( - "Failed add query to wg {}, the workload group is shutdown. host: {}", _id, - BackendOptions::get_localhost()); - } - _query_ctxs.insert({query_id, query_ctx}); - return Status::OK(); - } - - void remove_query(TUniqueId query_id) { - std::unique_lock wlock(_mutex); - _query_ctxs.erase(query_id); - } - void shutdown() { std::unique_lock wlock(_mutex); _is_shutdown = true; @@ -137,12 +126,7 @@ class WorkloadGroup : public std::enable_shared_from_this { bool can_be_dropped() { std::shared_lock r_lock(_mutex); - return _is_shutdown && _query_ctxs.size() == 0; - } - - int query_num() { - std::shared_lock r_lock(_mutex); - return _query_ctxs.size(); + return _is_shutdown && _query_num == 0; } int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); @@ -156,11 +140,6 @@ class WorkloadGroup : public std::enable_shared_from_this { void try_stop_schedulers(); - std::unordered_map> queries() { - std::shared_lock r_lock(_mutex); - return _query_ctxs; - } - std::string thread_debug_info(); private: @@ -169,9 +148,11 @@ class WorkloadGroup : public std::enable_shared_from_this { std::string _name; int64_t _version; int64_t _memory_limit; // bytes - // `_weighted_mem_used` is a rough memory usage in this group, - // because we can only get a precise memory usage by MemTracker which is not include page cache. - std::atomic_int64_t _weighted_mem_used = 0; // bytes + // last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called. + std::atomic_int64_t _total_mem_used = 0; // bytes + // last value of refresh_wg_weighted_memory_ratio. + std::atomic _weighted_mem_ratio = 0.0; + std::shared_ptr _refresh_interval_memory_growth; bool _enable_memory_overcommit; std::atomic _cpu_share; std::vector _mem_tracker_limiter_pool; @@ -186,7 +167,7 @@ class WorkloadGroup : public std::enable_shared_from_this { // new query can not submit // waiting running query to be cancelled or finish bool _is_shutdown = false; - std::unordered_map> _query_ctxs; + std::atomic _query_num; std::shared_mutex _task_sched_lock; std::unique_ptr _cgroup_cpu_ctl {nullptr}; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 6813bfd3b75130b..5e36570259d25fa 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -149,46 +149,33 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i struct WorkloadGroupMemInfo { int64_t total_mem_used = 0; - int64_t weighted_mem_used = 0; - bool is_low_wartermark = false; - bool is_high_wartermark = false; - double mem_used_ratio = 0; + std::list> tracker_snapshots = + std::list>(); }; -void WorkloadGroupMgr::refresh_wg_memory_info() { + +void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() { std::shared_lock r_lock(_group_mutex); - // workload group id -> workload group queries - std::unordered_map>> - all_wg_queries; - for (auto& [wg_id, wg] : _workload_groups) { - all_wg_queries.insert({wg_id, wg->queries()}); - } + // 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time) + // and calculate total memory used of all queries. int64_t all_queries_mem_used = 0; - - // calculate total memory used of each workload group and total memory used of all queries std::unordered_map wgs_mem_info; - for (auto& [wg_id, wg_queries] : all_wg_queries) { - int64_t wg_total_mem_used = 0; - for (const auto& [query_id, query_ctx_ptr] : wg_queries) { - if (auto query_ctx = query_ctx_ptr.lock()) { - wg_total_mem_used += query_ctx->query_mem_tracker->consumption(); - } - } - all_queries_mem_used += wg_total_mem_used; - wgs_mem_info[wg_id] = {wg_total_mem_used}; + for (auto& [wg_id, wg] : _workload_groups) { + wgs_mem_info[wg_id].total_mem_used = + wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots); + all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used; } - - auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage(); if (all_queries_mem_used <= 0) { return; } - all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used); - + // 2. calculate weighted ratio. // process memory used is actually bigger than all_queries_mem_used, // because memory of page cache, allocator cache, segment cache etc. are included // in proc_vm_rss. // we count these cache memories equally on workload groups. + auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage(); + all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used); double ratio = (double)process_memory_usage / (double)all_queries_mem_used; if (ratio <= 1.25) { std::string debug_msg = @@ -200,66 +187,53 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { } for (auto& wg : _workload_groups) { + // 3.1 calculate query weighted memory limit of task group auto wg_mem_limit = wg.second->memory_limit(); - auto& wg_mem_info = wgs_mem_info[wg.first]; - wg_mem_info.weighted_mem_used = int64_t(wg_mem_info.total_mem_used * ratio); - wg_mem_info.mem_used_ratio = (double)wg_mem_info.weighted_mem_used / wg_mem_limit; - - wg.second->set_weighted_memory_used(wg_mem_info.total_mem_used, ratio); - - auto spill_low_water_mark = wg.second->spill_threshold_low_water_mark(); - auto spill_high_water_mark = wg.second->spill_threashold_high_water_mark(); - wg_mem_info.is_high_wartermark = (wg_mem_info.weighted_mem_used > - ((double)wg_mem_limit * spill_high_water_mark / 100)); - wg_mem_info.is_low_wartermark = (wg_mem_info.weighted_mem_used > - ((double)wg_mem_limit * spill_low_water_mark / 100)); - - // calculate query weighted memory limit of task group - const auto& wg_queries = all_wg_queries[wg.first]; - auto wg_query_count = wg_queries.size(); + auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size(); int64_t query_weighted_mem_limit = wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit; + // 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio. + wg.second->set_weighted_memory_ratio(ratio); + for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { + query_mem_tracker->set_weighted_memory(query_weighted_mem_limit, ratio); + } + + // 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark. + auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio); + bool is_high_wartermark = + (weighted_mem_used > + ((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100)); + bool is_low_wartermark = + (weighted_mem_used > + ((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100)); std::string debug_msg; - if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { + if (is_high_wartermark || is_low_wartermark) { debug_msg = fmt::format( "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used " "ratio: {}, query " "count: {}, query_weighted_mem_limit: {}", wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES), - PrettyPrinter::print(wg_mem_info.total_mem_used, TUnit::BYTES), - PrettyPrinter::print(wg_mem_info.weighted_mem_used, TUnit::BYTES), - wg_mem_info.mem_used_ratio, wg_query_count, + PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES), + PrettyPrinter::print(weighted_mem_used, TUnit::BYTES), + (double)weighted_mem_used / wg_mem_limit, wg_query_count, PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); debug_msg += "\n Query Memory Summary:"; - } else { - continue; - } - // check whether queries need to revoke memory for task group - for (const auto& query : wg_queries) { - auto query_ctx = query.second.lock(); - if (!query_ctx) { - continue; - } - auto query_consumption = query_ctx->query_mem_tracker->consumption(); - auto query_weighted_consumption = int64_t(query_consumption * ratio); - query_ctx->set_weighted_mem(query_weighted_mem_limit, query_weighted_consumption); - - if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { + // check whether queries need to revoke memory for task group + for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( "\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, " "Peak={}", - query_ctx->query_mem_tracker->label(), - query_ctx->query_mem_tracker->parent_label(), - PrettyPrinter::print(query_consumption, TUnit::BYTES), - PrettyPrinter::print(query_weighted_consumption, TUnit::BYTES), - PrettyPrinter::print(query_ctx->query_mem_tracker->peak_consumption(), - TUnit::BYTES)); + query_mem_tracker->label(), query_mem_tracker->parent_label(), + PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), + PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio), + TUnit::BYTES), + PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); } - } - if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { LOG_EVERY_T(INFO, 1) << debug_msg; + } else { + continue; } } } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 8aeb8f988a30df5..37539ada8d85e66 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -54,7 +54,7 @@ class WorkloadGroupMgr { bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - void refresh_wg_memory_info(); + void refresh_wg_weighted_memory_ratio(); private: std::shared_mutex _group_mutex;