Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 20, 2023
1 parent 0fb79e4 commit d2c12e0
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 43 deletions.
150 changes: 110 additions & 40 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ static RuntimeProfile::Counter* freed_memory_counter =
ADD_COUNTER(free_top_memory_task_profile, "FreedMemory", TUnit::BYTES);
static RuntimeProfile::Counter* cancel_tasks_counter =
ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT);
static RuntimeProfile::Counter* seek_tasks_counter =
ADD_COUNTER(free_top_memory_task_profile, "SeekTasksNum", TUnit::UNIT);
static RuntimeProfile::Counter* previously_canceling_tasks_counter =
ADD_COUNTER(free_top_memory_task_profile, "PreviouslyCancelingTasksNum", TUnit::UNIT);

MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) {
DCHECK_GE(byte_limit, -1);
Expand All @@ -81,7 +85,9 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_
}

MemTrackerLimiter::~MemTrackerLimiter() {
if (_type == Type::GLOBAL) return;
if (_type == Type::GLOBAL) {
return;
}
consume(_untracked_mem);
// mem hook record tracker cannot guarantee that the final consumption is 0,
// nor can it guarantee that the memory alloc and free are recorded in a one-to-one correspondence.
Expand Down Expand Up @@ -178,6 +184,25 @@ void MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s
}
}

void MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::Snapshot>* snapshots,
int top_num) {
std::priority_queue<std::pair<int64_t, MemTrackerLimiter*>> max_pq;
// not include global type.
for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
max_pq.emplace(tracker->consumption(), tracker);
}
}

while (!max_pq.empty() && top_num > 0) {
auto tracker = max_pq.top().second;
(*snapshots).emplace_back(tracker->make_snapshot());
top_num--;
max_pq.pop();
}
}

std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format(
"MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)",
Expand Down Expand Up @@ -217,7 +242,9 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) {
for (const auto& snapshot : snapshots) {
child_trackers_usage += "\n " + MemTracker::log_usage(snapshot);
}
if (!child_trackers_usage.empty()) detail += child_trackers_usage;
if (!child_trackers_usage.empty()) {
detail += child_trackers_usage;
}

LOG(WARNING) << detail;
}
Expand All @@ -229,6 +256,7 @@ std::string MemTrackerLimiter::log_process_usage_str() {
std::vector<MemTracker::Snapshot> snapshots;
MemTrackerLimiter::make_process_snapshots(&snapshots);
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL);
MemTrackerLimiter::make_top_consumption_snapshots(&snapshots);

// Add additional tracker printed when memory exceeds limit.
snapshots.emplace_back(
Expand Down Expand Up @@ -361,56 +389,31 @@ int64_t MemTrackerLimiter::free_top_memory_query(
// After greater than min_free_mem, will not be modified.
int64_t prepare_free_mem = 0;
std::vector<std::string> canceling_task;
int seek_num = 0;
COUNTER_SET(cancel_cost_time, (int64_t)0);
COUNTER_SET(find_cost_time, (int64_t)0);
COUNTER_SET(freed_memory_counter, (int64_t)0);
COUNTER_SET(cancel_tasks_counter, (int64_t)0);

auto cancel_top_query = [&cancel_msg, type, profile](auto& min_pq,
auto& canceling_task) -> int64_t {
std::vector<std::string> usage_strings;
{
SCOPED_TIMER(cancel_cost_time);
while (!min_pq.empty()) {
TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
if (cancelled_queryid == TUniqueId()) {
min_pq.pop();
continue;
}
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_msg(min_pq.top().first, min_pq.top().second));

COUNTER_UPDATE(freed_memory_counter, min_pq.top().first);
COUNTER_UPDATE(cancel_tasks_counter, 1);
usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
min_pq.top().first));
min_pq.pop();
}
}

profile->merge(free_top_memory_task_profile.get());
LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
return freed_memory_counter->value();
};
COUNTER_SET(seek_tasks_counter, (int64_t)0);
COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0);

{
SCOPED_TIMER(find_cost_time);
for (unsigned i = 1; i < tracker_groups.size(); ++i) {
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
seek_num++;
if (tracker->is_query_cancelled()) {
canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(),
tracker->consumption()));
continue;
}
if (tracker->consumption() > min_free_mem) {
MemTrackerMinQueue min_pq_single;
min_pq_single.emplace(tracker->consumption(), tracker->label());
return cancel_top_query(min_pq_single, canceling_task);
min_pq = MemTrackerMinQueue();
min_pq.emplace(tracker->consumption(), tracker->label());
prepare_free_mem = tracker->consumption();
break;
} else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
min_pq.emplace(tracker->consumption(), tracker->label());
prepare_free_mem += tracker->consumption();
Expand All @@ -424,9 +427,50 @@ int64_t MemTrackerLimiter::free_top_memory_query(
}
}
}
if (prepare_free_mem > min_free_mem && min_pq.size() == 1) {
// Found a big task, short circuit seek.
break;
}
}
}
return cancel_top_query(min_pq, canceling_task);

COUNTER_UPDATE(seek_tasks_counter, seek_num);
COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size());
LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " seek finished, seek "
<< seek_num << " tasks. among them, " << min_pq.size() << " tasks will be canceled, "
<< prepare_free_mem << " memory size prepare free; " << canceling_task.size()
<< " tasks is being canceled and has not been completed yet;"
<< (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : "");

std::vector<std::string> usage_strings;
{
SCOPED_TIMER(cancel_cost_time);
while (!min_pq.empty()) {
TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
if (cancelled_queryid == TUniqueId()) {
LOG(WARNING) << "GC Free Top Memory Usage " << type_string(type)
<< ", Task ID parsing failed, label: " << min_pq.top().second;
min_pq.pop();
continue;
}
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_msg(min_pq.top().first, min_pq.top().second));

COUNTER_UPDATE(freed_memory_counter, min_pq.top().first);
COUNTER_UPDATE(cancel_tasks_counter, 1);
usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
min_pq.top().first));
min_pq.pop();
}
}

profile->merge(free_top_memory_task_profile.get());
LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " cancel finished, "
<< cancel_tasks_counter->value()
<< " tasks canceled, memory size being freed: " << freed_memory_counter->value()
<< ", consist of: " << join(usage_strings, ",");
return freed_memory_counter->value();
}

int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
Expand Down Expand Up @@ -459,20 +503,26 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
std::unordered_map<std::string, int64_t> query_consumption;
std::vector<std::string> canceling_task;
int seek_num = 0;
int small_num = 0;
COUNTER_SET(cancel_cost_time, (int64_t)0);
COUNTER_SET(find_cost_time, (int64_t)0);
COUNTER_SET(freed_memory_counter, (int64_t)0);
COUNTER_SET(cancel_tasks_counter, (int64_t)0);
COUNTER_SET(seek_tasks_counter, (int64_t)0);
COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0);

{
SCOPED_TIMER(find_cost_time);
for (unsigned i = 1; i < tracker_groups.size(); ++i) {
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
seek_num++;
// 32M small query does not cancel
if (tracker->consumption() <= 33554432 ||
tracker->consumption() < tracker->limit()) {
small_num++;
continue;
}
if (tracker->is_query_cancelled()) {
Expand All @@ -490,8 +540,25 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}
}

COUNTER_UPDATE(seek_tasks_counter, seek_num);
COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size());
LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " seek finished, seek "
<< seek_num << " tasks. among them, " << query_consumption.size()
<< " tasks can be canceled; " << small_num << " small tasks that were skipped; "
<< canceling_task.size() << " tasks is being canceled and has not been completed yet;"
<< (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : "");

// Minor gc does not cancel when there is only one query.
if (query_consumption.size() <= 1) {
if (query_consumption.size() == 0) {
LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type)
<< " finished, no task need be canceled.";
return 0;
}
if (query_consumption.size() == 1) {
auto iter = query_consumption.begin();
LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type)
<< " finished, only one task: " << iter->first
<< ", memory consumption: " << iter->second << ", no cancel.";
return 0;
}

Expand All @@ -501,6 +568,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
while (!max_pq.empty()) {
TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
if (cancelled_queryid == TUniqueId()) {
LOG(WARNING) << "GC Free Top Memory Overcommit " << type_string(type)
<< ", Task ID parsing failed, label: " << max_pq.top().second;
max_pq.pop();
continue;
}
Expand All @@ -522,9 +591,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}

profile->merge(free_top_memory_task_profile.get());
LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " cancel finished, "
<< cancel_tasks_counter->value()
<< " tasks canceled, memory size being freed: " << freed_memory_counter->value()
<< ", consist of: " << join(usage_strings, ",");
return freed_memory_counter->value();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class MemTrackerLimiter final : public MemTracker {
// Returns a list of all the valid tracker snapshots.
static void make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots);
static void make_type_snapshots(std::vector<MemTracker::Snapshot>* snapshots, Type type);
static void make_top_consumption_snapshots(std::vector<MemTracker::Snapshot>* snapshots,
int top_num = 15);

static std::string log_usage(MemTracker::Snapshot snapshot);
std::string log_usage() { return log_usage(make_snapshot()); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check)
!_stop_consume) {
flush_untracked_mem();
}
// Large memory alloc should use allocator.h
// Large memory alloc should use allocator.h and catch std::bad_alloc
// Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM.
if (large_memory_check && size > doris::config::large_memory_check_bytes) {
_stop_consume = true;
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) {
doris::thread_context()->large_memory_check = false;
if (doris::enable_thread_catch_bad_alloc) {
doris::thread_context()->large_memory_check = false;
}
DEFER({ doris::thread_context()->large_memory_check = true; });
return alloc_impl(size, alignment);
}
Expand All @@ -186,7 +188,9 @@ template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, size_t old_size,
size_t new_size,
size_t alignment) {
doris::thread_context()->large_memory_check = false;
if (doris::enable_thread_catch_bad_alloc) {
doris::thread_context()->large_memory_check = false;
}
DEFER({ doris::thread_context()->large_memory_check = true; });
return realloc_impl(buf, old_size, new_size, alignment);
}
Expand Down

0 comments on commit d2c12e0

Please sign in to comment.