Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 29, 2024
1 parent 0b43fa9 commit a88dac4
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 96 deletions.
2 changes: 1 addition & 1 deletion be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void Daemon::wg_weighted_memory_ratio_refresh_thread() {
// Refresh weighted memory ratio of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
}
}

Expand Down
15 changes: 6 additions & 9 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,9 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
}
return false;
} else if (is_wg_mem_low_water_mark) {
int64_t query_weighted_limit = 0;
int64_t query_weighted_consumption = 0;
query_ctx->get_weighted_memory(query_weighted_limit, query_weighted_consumption);
if (query_weighted_limit == 0 || query_weighted_consumption < query_weighted_limit) {
int64_t spill_threshold = query_ctx->spill_threshold();
int64_t memory_usage = query_ctx->query_mem_tracker->consumption();
if (spill_threshold == 0 || memory_usage < spill_threshold) {
return false;
}
auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num();
Expand All @@ -419,7 +418,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
if (0 == big_memory_operator_num) {
return false;
} else {
mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
mem_limit_of_op = spill_threshold / big_memory_operator_num;
}

LOG_EVERY_T(INFO, 1) << "query " << print_id(state->query_id())
Expand All @@ -428,10 +427,8 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
<< ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op)
<< ", min_revocable_mem_bytes: "
<< PrettyPrinter::print_bytes(min_revocable_mem_bytes)
<< ", query_weighted_consumption: "
<< PrettyPrinter::print_bytes(query_weighted_consumption)
<< ", query_weighted_limit: "
<< PrettyPrinter::print_bytes(query_weighted_limit)
<< ", memory_usage: " << PrettyPrinter::print_bytes(memory_usage)
<< ", spill_threshold: " << PrettyPrinter::print_bytes(spill_threshold)
<< ", big_memory_operator_num: " << big_memory_operator_num;
return (revocable_mem_bytes > mem_limit_of_op ||
revocable_mem_bytes > min_revocable_mem_bytes);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/global_memory_arbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;

bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
Expand All @@ -50,7 +50,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::mem_limit())) {
MemInfo::soft_mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
return false;
}
auto wg_ptr = _wg_wptr.lock();
if (!wg_ptr) {
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
_limiter_tracker_raw->release(size); // rollback
return false;
Expand Down
18 changes: 3 additions & 15 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,8 @@ class QueryContext {
return _running_big_mem_op_num.load(std::memory_order_relaxed);
}

void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
_weighted_limit = weighted_limit;
_weighted_ratio = weighted_ratio;
}

void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
weighted_limit = _weighted_limit;
weighted_consumption = int64_t(query_mem_tracker->consumption() * _weighted_ratio);
}

void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
int64_t spill_threshold() { return _spill_threshold; }
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
Expand Down Expand Up @@ -311,9 +301,7 @@ class QueryContext {
std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
std::mutex _pipeline_map_write_lock;

std::mutex _weighted_mem_lock;
double _weighted_ratio = 0;
int64_t _weighted_limit = 0;
std::atomic<int64_t> _spill_threshold {0};

std::mutex _profile_mutex;

Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ void WorkloadGroup::refresh_memory(int64_t used_memory) {
_wg_refresh_interval_memory_growth.store(0.0);
}

void WorkloadGroup::set_weighted_memory_ratio(double ratio) {
_weighted_mem_ratio = ratio;
}

void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
Expand Down
40 changes: 20 additions & 20 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit;
};

int64_t weighted_memory_limit() const { return _weighted_memory_limit; };

void set_weighted_memory_limit(int64_t weighted_memory_limit) {
_weighted_memory_limit = weighted_memory_limit;
}

// make memory snapshots and refresh total memory used at the same time.
int64_t make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
Expand All @@ -93,13 +99,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void set_weighted_memory_ratio(double ratio);
bool add_wg_refresh_interval_memory_growth(int64_t size) {
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used =
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) *
_weighted_mem_ratio);
if ((weighted_mem_used > ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
Expand All @@ -111,17 +114,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
}

void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used =
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load()) *
_weighted_mem_ratio);
*is_low_wartermark =
(weighted_mem_used > ((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark =
(weighted_mem_used > ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}

std::string debug_string() const;
Expand Down Expand Up @@ -200,10 +199,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::string _name;
int64_t _version;
int64_t _memory_limit; // bytes
// `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory.
// more detailed description in `refresh_wg_weighted_memory_limit`.
std::atomic<int64_t> _weighted_memory_limit {0}; //
// last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called.
std::atomic_int64_t _total_mem_used = 0; // bytes
// last value of refresh_wg_weighted_memory_ratio.
std::atomic<double> _weighted_mem_ratio = 0.0;
std::atomic_int64_t _wg_refresh_interval_memory_growth;
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
Expand Down
101 changes: 58 additions & 43 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,86 +153,101 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>>();
};

void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() {
void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);

// 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time)
// and calculate total memory used of all queries.
int64_t all_queries_mem_used = 0;
int64_t all_workload_groups_mem_usage = 0;
std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
for (auto& [wg_id, wg] : _workload_groups) {
wgs_mem_info[wg_id].total_mem_used =
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used;
all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
}
if (all_queries_mem_used <= 0) {
if (all_workload_groups_mem_usage <= 0) {
return;
}

// 2. calculate weighted ratio.
// process memory used is actually bigger than all_queries_mem_used,
// because memory of page cache, allocator cache, segment cache etc. are included
// in proc_vm_rss.
// we count these cache memories equally on workload groups.
// 2. calculate weighted memory limit ratio.
// when construct workload group, mem_limit is equal to (process_memory_limit * group_limit_percent),
// here, it is assumed that the available memory of workload groups is equal to process_memory_limit.
//
// but process_memory_usage is actually bigger than all_workload_groups_mem_usage,
// because public_memory of page cache, allocator cache, segment cache etc. are included in process_memory_usage.
// so actual available memory of the workload groups is equal to (process_memory_limit - public_memory)
//
// we will exclude this public_memory when calculate workload group mem_limit.
// so a ratio is calculated to multiply the workload group mem_limit from the previous construction.
auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage();
all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used);
double ratio = (double)process_memory_usage / (double)all_queries_mem_used;
if (ratio <= 1.25) {
std::string debug_msg =
fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}",
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
LOG_EVERY_T(INFO, 10) << debug_msg;
auto process_memory_limit = MemInfo::mem_limit();
double weighted_memory_limit_ratio = 1;
// if all_workload_groups_mem_usage is greater than process_memory_usage, it means that the memory statistics
// of the workload group are inaccurate.
// the reason is that query/load/etc. tracked is virtual memory, and virtual memory is not used in time.
//
// At this time, weighted_memory_limit_ratio is equal to 1, and workload group mem_limit is still equal to
// (process_memory_limit * group_limit_percent), this may cause query spill to occur earlier,
// However, there is no good solution at present, but we cannot predict when these virtual memory will be used.
if (all_workload_groups_mem_usage < process_memory_usage) {
int64_t public_memory = process_memory_usage - all_workload_groups_mem_usage;
weighted_memory_limit_ratio = 1 - (double)public_memory / (double)process_memory_limit;
}

std::string debug_msg = fmt::format(
"\nProcess Memory Summary: {}, {}, all workload groups memory usage: {}, "
"weighted_memory_limit_ratio: {}",
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 10) << debug_msg;

for (auto& wg : _workload_groups) {
// 3.1 calculate query weighted memory limit of task group
auto wg_mem_limit = wg.second->memory_limit();
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_weighted_mem_limit =
wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit;
// 3.1 calculate query spill threshold of task group
auto wg_weighted_mem_limit =
int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio);
wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);

// 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio.
wg.second->set_weighted_memory_ratio(ratio);
// 3.2 set workload groups weighted memory limit and all query spill threshold.
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_spill_threshold =
wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count
: wg_weighted_mem_limit;
for (const auto& query : wg.second->queries()) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
query_ctx->set_weighted_memory(query_weighted_mem_limit, ratio);
query_ctx->set_spill_threshold(query_spill_threshold);
}

// 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark.
auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio);
bool is_high_wartermark =
(weighted_mem_used >
((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100));
bool is_low_wartermark =
(weighted_mem_used >
((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100));
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
std::string debug_msg;
if (is_high_wartermark || is_low_wartermark) {
debug_msg = fmt::format(
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used "
"ratio: {}, query "
"count: {}, query_weighted_mem_limit: {}",
wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES),
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, "
"used "
"ratio: {}, query count: {}, query spill threshold: {}",
wg.second->name(),
PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES),
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
PrettyPrinter::print(weighted_mem_used, TUnit::BYTES),
(double)weighted_mem_used / wg_mem_limit, wg_query_count,
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES));
PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
(double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit,
wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES));

debug_msg += "\n Query Memory Summary:";
// check whether queries need to revoke memory for task group
for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) {
debug_msg += fmt::format(
"\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, "
"\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, "
"Peak={}",
query_mem_tracker->label(), query_mem_tracker->parent_label(),
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio),
TUnit::BYTES),
PrettyPrinter::print(query_spill_threshold, TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
}
LOG_EVERY_T(INFO, 1) << debug_msg;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class WorkloadGroupMgr {

bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }

void refresh_wg_weighted_memory_ratio();
void refresh_wg_weighted_memory_limit();

private:
std::shared_mutex _group_mutex;
Expand Down

0 comments on commit a88dac4

Please sign in to comment.