diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 05dfde7dc0ce3ad..10526efb820ea3d 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -60,6 +60,10 @@ static RuntimeProfile::Counter* freed_memory_counter = ADD_COUNTER(free_top_memory_task_profile, "FreedMemory", TUnit::BYTES); static RuntimeProfile::Counter* cancel_tasks_counter = ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT); +static RuntimeProfile::Counter* seek_tasks_counter = + ADD_COUNTER(free_top_memory_task_profile, "SeekTasksNum", TUnit::UNIT); +static RuntimeProfile::Counter* previously_canceling_tasks_counter = + ADD_COUNTER(free_top_memory_task_profile, "PreviouslyCancelingTasksNum", TUnit::UNIT); MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) { DCHECK_GE(byte_limit, -1); @@ -81,7 +85,9 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ } MemTrackerLimiter::~MemTrackerLimiter() { - if (_type == Type::GLOBAL) return; + if (_type == Type::GLOBAL) { + return; + } consume(_untracked_mem); // mem hook record tracker cannot guarantee that the final consumption is 0, // nor can it guarantee that the memory alloc and free are recorded in a one-to-one correspondence. @@ -178,6 +184,25 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s } } +void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, + int top_num) { + std::priority_queue> max_pq; + // not include global type. + for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { + std::lock_guard l(mem_tracker_limiter_pool[i].group_lock); + for (auto tracker : mem_tracker_limiter_pool[i].trackers) { + max_pq.emplace(tracker->consumption(), tracker); + } + } + + while (!max_pq.empty() && top_num > 0) { + auto tracker = max_pq.top().second; + (*snapshots).emplace_back(tracker->make_snapshot()); + top_num--; + max_pq.pop(); + } +} + std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { return fmt::format( "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", @@ -217,7 +242,9 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { for (const auto& snapshot : snapshots) { child_trackers_usage += "\n " + MemTracker::log_usage(snapshot); } - if (!child_trackers_usage.empty()) detail += child_trackers_usage; + if (!child_trackers_usage.empty()) { + detail += child_trackers_usage; + } LOG(WARNING) << detail; } @@ -229,6 +256,7 @@ std::string MemTrackerLimiter::log_process_usage_str() { std::vector snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); + MemTrackerLimiter::make_top_consumption_snapshots(&snapshots); // Add additional tracker printed when memory exceeds limit. snapshots.emplace_back( @@ -361,40 +389,13 @@ int64_t MemTrackerLimiter::free_top_memory_query( // After greater than min_free_mem, will not be modified. int64_t prepare_free_mem = 0; std::vector canceling_task; + int seek_num = 0; COUNTER_SET(cancel_cost_time, (int64_t)0); COUNTER_SET(find_cost_time, (int64_t)0); COUNTER_SET(freed_memory_counter, (int64_t)0); COUNTER_SET(cancel_tasks_counter, (int64_t)0); - - auto cancel_top_query = [&cancel_msg, type, profile](auto& min_pq, - auto& canceling_task) -> int64_t { - std::vector usage_strings; - { - SCOPED_TIMER(cancel_cost_time); - while (!min_pq.empty()) { - TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); - if (cancelled_queryid == TUniqueId()) { - min_pq.pop(); - continue; - } - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - cancel_msg(min_pq.top().first, min_pq.top().second)); - - COUNTER_UPDATE(freed_memory_counter, min_pq.top().first); - COUNTER_UPDATE(cancel_tasks_counter, 1); - usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, - min_pq.top().first)); - min_pq.pop(); - } - } - - profile->merge(free_top_memory_task_profile.get()); - LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": " - << join(usage_strings, ",") - << ". previous canceling task: " << join(canceling_task, ","); - return freed_memory_counter->value(); - }; + COUNTER_SET(seek_tasks_counter, (int64_t)0); + COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0); { SCOPED_TIMER(find_cost_time); @@ -402,15 +403,17 @@ int64_t MemTrackerLimiter::free_top_memory_query( std::lock_guard l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { + seek_num++; if (tracker->is_query_cancelled()) { canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption())); continue; } if (tracker->consumption() > min_free_mem) { - MemTrackerMinQueue min_pq_single; - min_pq_single.emplace(tracker->consumption(), tracker->label()); - return cancel_top_query(min_pq_single, canceling_task); + min_pq = MemTrackerMinQueue(); + min_pq.emplace(tracker->consumption(), tracker->label()); + prepare_free_mem = tracker->consumption(); + break; } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { min_pq.emplace(tracker->consumption(), tracker->label()); prepare_free_mem += tracker->consumption(); @@ -424,9 +427,50 @@ int64_t MemTrackerLimiter::free_top_memory_query( } } } + if (prepare_free_mem > min_free_mem && min_pq.size() == 1) { + // Found a big task, short circuit seek. + break; + } } } - return cancel_top_query(min_pq, canceling_task); + + COUNTER_UPDATE(seek_tasks_counter, seek_num); + COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size()); + LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " seek finished, seek " + << seek_num << " tasks. among them, " << min_pq.size() << " tasks will be canceled, " + << prepare_free_mem << " memory size prepare free; " << canceling_task.size() + << " tasks is being canceled and has not been completed yet;" + << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : ""); + + std::vector usage_strings; + { + SCOPED_TIMER(cancel_cost_time); + while (!min_pq.empty()) { + TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + LOG(WARNING) << "GC Free Top Memory Usage " << type_string(type) + << ", Task ID parsing failed, label: " << min_pq.top().second; + min_pq.pop(); + continue; + } + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + cancel_msg(min_pq.top().first, min_pq.top().second)); + + COUNTER_UPDATE(freed_memory_counter, min_pq.top().first); + COUNTER_UPDATE(cancel_tasks_counter, 1); + usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, + min_pq.top().first)); + min_pq.pop(); + } + } + + profile->merge(free_top_memory_task_profile.get()); + LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " cancel finished, " + << cancel_tasks_counter->value() + << " tasks canceled, memory size being freed: " << freed_memory_counter->value() + << ", consist of: " << join(usage_strings, ","); + return freed_memory_counter->value(); } int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, @@ -459,10 +503,14 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( std::priority_queue> max_pq; std::unordered_map query_consumption; std::vector canceling_task; + int seek_num = 0; + int small_num = 0; COUNTER_SET(cancel_cost_time, (int64_t)0); COUNTER_SET(find_cost_time, (int64_t)0); COUNTER_SET(freed_memory_counter, (int64_t)0); COUNTER_SET(cancel_tasks_counter, (int64_t)0); + COUNTER_SET(seek_tasks_counter, (int64_t)0); + COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0); { SCOPED_TIMER(find_cost_time); @@ -470,9 +518,11 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( std::lock_guard l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { + seek_num++; // 32M small query does not cancel if (tracker->consumption() <= 33554432 || tracker->consumption() < tracker->limit()) { + small_num++; continue; } if (tracker->is_query_cancelled()) { @@ -490,8 +540,25 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( } } + COUNTER_UPDATE(seek_tasks_counter, seek_num); + COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size()); + LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " seek finished, seek " + << seek_num << " tasks. among them, " << query_consumption.size() + << " tasks can be canceled; " << small_num << " small tasks that were skipped; " + << canceling_task.size() << " tasks is being canceled and has not been completed yet;" + << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : ""); + // Minor gc does not cancel when there is only one query. - if (query_consumption.size() <= 1) { + if (query_consumption.size() == 0) { + LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) + << " finished, no task need be canceled."; + return 0; + } + if (query_consumption.size() == 1) { + auto iter = query_consumption.begin(); + LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) + << " finished, only one task: " << iter->first + << ", memory consumption: " << iter->second << ", no cancel."; return 0; } @@ -501,6 +568,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( while (!max_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); if (cancelled_queryid == TUniqueId()) { + LOG(WARNING) << "GC Free Top Memory Overcommit " << type_string(type) + << ", Task ID parsing failed, label: " << max_pq.top().second; max_pq.pop(); continue; } @@ -522,9 +591,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( } profile->merge(free_top_memory_task_profile.get()); - LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": " - << join(usage_strings, ",") - << ". previous canceling task: " << join(canceling_task, ","); + LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " cancel finished, " + << cancel_tasks_counter->value() + << " tasks canceled, memory size being freed: " << freed_memory_counter->value() + << ", consist of: " << join(usage_strings, ","); return freed_memory_counter->value(); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 144a189721e6e13..16df39c59ca5a27 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -149,6 +149,8 @@ class MemTrackerLimiter final : public MemTracker { // Returns a list of all the valid tracker snapshots. static void make_process_snapshots(std::vector* snapshots); static void make_type_snapshots(std::vector* snapshots, Type type); + static void make_top_consumption_snapshots(std::vector* snapshots, + int top_num = 15); static std::string log_usage(MemTracker::Snapshot snapshot); std::string log_usage() { return log_usage(make_snapshot()); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index cb5193c2f7e0daf..e6fdfc802668020 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -174,7 +174,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) !_stop_consume) { flush_untracked_mem(); } - // Large memory alloc should use allocator.h + // Large memory alloc should use allocator.h and catch std::bad_alloc // Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM. if (large_memory_check && size > doris::config::large_memory_check_bytes) { _stop_consume = true; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index ed3a2440ee0dd12..4a168d82e67c8c2 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -177,7 +177,9 @@ void Allocator::throw_bad_alloc( template void* Allocator::alloc(size_t size, size_t alignment) { - doris::thread_context()->large_memory_check = false; + if (doris::enable_thread_catch_bad_alloc) { + doris::thread_context()->large_memory_check = false; + } DEFER({ doris::thread_context()->large_memory_check = true; }); return alloc_impl(size, alignment); } @@ -186,7 +188,9 @@ template void* Allocator::realloc(void* buf, size_t old_size, size_t new_size, size_t alignment) { - doris::thread_context()->large_memory_check = false; + if (doris::enable_thread_catch_bad_alloc) { + doris::thread_context()->large_memory_check = false; + } DEFER({ doris::thread_context()->large_memory_check = true; }); return realloc_impl(buf, old_size, new_size, alignment); }