From ebb4a933610b4e37a41c9ed69f1db3aa3e4bd51d Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 27 Sep 2023 17:59:22 +0800 Subject: [PATCH] 1 --- be/src/common/config.cpp | 2 +- be/src/common/daemon.cpp | 6 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 202 ++++++++++++------ be/src/runtime/memory/mem_tracker_limiter.h | 27 ++- be/src/runtime/runtime_state.cpp | 6 +- be/src/util/mem_info.cpp | 94 +++++--- be/src/util/mem_info.h | 5 +- be/src/vec/common/allocator.cpp | 16 +- 8 files changed, 238 insertions(+), 120 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 78b0a2c79dbf1c..6483ec14abe9b9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -133,7 +133,7 @@ DEFINE_mBool(disable_memory_gc, "false"); DEFINE_mInt64(large_memory_check_bytes, "2147483648"); -// The maximum time a thread waits for a full GC. Currently only query will wait for full gc. +// The maximum time a thread waits for full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 43a344538246fc..3879ef9ff8b2e8 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -229,7 +229,7 @@ void Daemon::memory_gc_thread() { auto proc_mem_no_allocator_cache = doris::MemInfo::proc_mem_no_allocator_cache(); // GC excess memory for resource groups that not enable overcommit - auto tg_free_mem = doris::MemInfo::tg_hard_memory_limit_gc(); + auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc(); sys_mem_available += tg_free_mem; proc_mem_no_allocator_cache -= tg_free_mem; @@ -239,7 +239,7 @@ void Daemon::memory_gc_thread() { // No longer full gc and minor gc during sleep. memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms; memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; - LOG(INFO) << fmt::format("Start Full GC, {}.", + LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", MemTrackerLimiter::process_limit_exceeded_errmsg_str()); doris::MemTrackerLimiter::print_log_process_usage(); if (doris::MemInfo::process_full_gc()) { @@ -251,7 +251,7 @@ void Daemon::memory_gc_thread() { proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) { // No minor gc during sleep, but full gc is possible. memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; - LOG(INFO) << fmt::format("Start Minor GC, {}.", + LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str()); doris::MemTrackerLimiter::print_log_process_usage(); if (doris::MemInfo::process_minor_gc()) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index d82202569c0a93..268b0ec38eaba2 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -31,6 +31,7 @@ #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/task_group/task_group.h" +#include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" @@ -40,6 +41,7 @@ namespace doris { bvar::Adder g_memtrackerlimiter_cnt("memtrackerlimiter_cnt"); +constexpr auto GC_MAX_SEEK_TRACKER = 1000; // Save all MemTrackerLimiters in use. // Each group corresponds to several MemTrackerLimiters and has a lock. @@ -301,7 +303,7 @@ bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) { std::string MemTrackerLimiter::process_mem_log_str() { return fmt::format( - "OS physical memory {}. Process memory usage {}, limit {}, soft limit {}. Sys " + "os physical memory {}. process memory used {}, limit {}, soft limit {}. sys " "available memory {}, low water mark {}, warning water mark {}. Refresh interval " "memory growth {} B", PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES), @@ -314,7 +316,7 @@ std::string MemTrackerLimiter::process_mem_log_str() { std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() { return fmt::format( - "process memory used {} exceed limit {} or sys mem available {} less than low " + "process memory used {} exceed limit {} or sys available memory {} less than low " "water mark {}", PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), @@ -323,36 +325,31 @@ std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() { std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() { return fmt::format( - "process memory used {} exceed soft limit {} or sys mem available {} less than warning " - "water mark {}.", + "process memory used {} exceed soft limit {} or sys available memory {} less than " + "warning water mark {}.", PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), MemInfo::sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)); } -std::string MemTrackerLimiter::query_tracker_limit_exceeded_str( - const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker, - const std::string& executing_msg) { - return fmt::format( - "Memory limit exceeded:{}, exec node:<{}>, execute msg:{}. backend {} " - "process memory used {}, limit {}. Can `set " - "exec_mem_limit=8G` to change limit, details see be.INFO.", - tracker_limit_exceeded, last_consumer_tracker, executing_msg, - BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), - MemInfo::mem_limit_str()); -} - std::string MemTrackerLimiter::tracker_limit_exceeded_str() { - return fmt::format( - "exceeded tracker:<{}>, limit {}, peak " - "used {}, current used {}", - label(), print_bytes(limit()), print_bytes(_consumption->peak_value()), - print_bytes(_consumption->current_value())); -} - -std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) { - return fmt::format("failed alloc size {}, {}", print_bytes(bytes), - tracker_limit_exceeded_str()); + std::string err_msg = fmt::format( + "memory tracker limit exceeded, tracker label:{}, type:{}, limit " + "{}, peak used {}, current used {}. backend {} process memory used {}.", + label(), type_string(_type), print_bytes(limit()), + print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()), + BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str()); + if (_type == Type::QUERY || _type == Type::LOAD) { + err_msg += fmt::format( + " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see " + "be.INFO.", + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()); + } else if (_type == Type::SCHEMA_CHANGE) { + err_msg += fmt::format( + " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to " + "change limit, details see be.INFO."); + } + return err_msg; } int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, @@ -364,9 +361,9 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, const std::string& label) { return fmt::format( - "Process has no memory available, cancel top memory usage {}: " + "Process has no memory available, cancel top memory used {}: " "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed limit {} or sys mem available {} " + "process memory used {} exceed limit {} or sys available memory {} " "less than low water mark {}. Execute again after enough memory, " "details see be.INFO.", type_string(type), type_string(type), label, print_bytes(mem_consumption), @@ -374,14 +371,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, mem_available_str, print_bytes(MemInfo::sys_mem_available_low_water_mark())); }, - profile); + profile, GCType::PROCESS); } template int64_t MemTrackerLimiter::free_top_memory_query( int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, - RuntimeProfile* profile) { + RuntimeProfile* profile, GCType GCtype) { using MemTrackerMinQueue = std::priority_queue, std::vector>, std::greater>>; @@ -397,9 +394,18 @@ int64_t MemTrackerLimiter::free_top_memory_query( COUNTER_SET(seek_tasks_counter, (int64_t)0); COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0); + std::string log_prefix = fmt::format("[MemoryGC] GC free {} top memory used {}, ", + gc_type_string(GCtype), type_string(type)); + LOG(INFO) << fmt::format("{}, start seek all {}, running query and load num: {}", log_prefix, + type_string(type), + ExecEnv::GetInstance()->fragment_mgr()->running_query_num()); + { SCOPED_TIMER(find_cost_time); for (unsigned i = 1; i < tracker_groups.size(); ++i) { + if (seek_num > GC_MAX_SEEK_TRACKER) { + break; + } std::lock_guard l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { @@ -436,9 +442,10 @@ int64_t MemTrackerLimiter::free_top_memory_query( COUNTER_UPDATE(seek_tasks_counter, seek_num); COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size()); - LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " seek finished, seek " - << seek_num << " tasks. among them, " << min_pq.size() << " tasks will be canceled, " - << prepare_free_mem << " memory size prepare free; " << canceling_task.size() + + LOG(INFO) << log_prefix << "seek finished, seek " << seek_num << " tasks. among them, " + << min_pq.size() << " tasks will be canceled, " << prepare_free_mem + << " memory size prepare free; " << canceling_task.size() << " tasks is being canceled and has not been completed yet;" << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : ""); @@ -448,8 +455,8 @@ int64_t MemTrackerLimiter::free_top_memory_query( while (!min_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); if (cancelled_queryid == TUniqueId()) { - LOG(WARNING) << "GC Free Top Memory Usage " << type_string(type) - << ", Task ID parsing failed, label: " << min_pq.top().second; + LOG(WARNING) << log_prefix + << "Task ID parsing failed, label: " << min_pq.top().second; min_pq.pop(); continue; } @@ -459,15 +466,14 @@ int64_t MemTrackerLimiter::free_top_memory_query( COUNTER_UPDATE(freed_memory_counter, min_pq.top().first); COUNTER_UPDATE(cancel_tasks_counter, 1); - usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, + usage_strings.push_back(fmt::format("{} memory used {} Bytes", min_pq.top().second, min_pq.top().first)); min_pq.pop(); } } profile->merge(free_top_memory_task_profile.get()); - LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " cancel finished, " - << cancel_tasks_counter->value() + LOG(INFO) << log_prefix << "cancel finished, " << cancel_tasks_counter->value() << " tasks canceled, memory size being freed: " << freed_memory_counter->value() << ", consist of: " << join(usage_strings, ","); return freed_memory_counter->value(); @@ -484,7 +490,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, return fmt::format( "Process has less memory, cancel top memory overcommit {}: " "{} memory tracker <{}> consumption {}, backend {} " - "process memory used {} exceed soft limit {} or sys mem available {} " + "process memory used {} exceed soft limit {} or sys available memory {} " "less than warning water mark {}. Execute again after enough memory, " "details see be.INFO.", type_string(type), type_string(type), label, print_bytes(mem_consumption), @@ -492,14 +498,14 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, mem_available_str, print_bytes(MemInfo::sys_mem_available_warning_water_mark())); }, - profile); + profile, GCType::PROCESS); } template int64_t MemTrackerLimiter::free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, - RuntimeProfile* profile) { + RuntimeProfile* profile, GCType GCtype) { std::priority_queue> max_pq; std::unordered_map query_consumption; std::vector canceling_task; @@ -512,9 +518,18 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( COUNTER_SET(seek_tasks_counter, (int64_t)0); COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0); + std::string log_prefix = fmt::format("[MemoryGC] GC free {} top memory overcommit {}, ", + gc_type_string(GCtype), type_string(type)); + LOG(INFO) << fmt::format("{}, start seek all {}, running query and load num: {}", log_prefix, + type_string(type), + ExecEnv::GetInstance()->fragment_mgr()->running_query_num()); + { SCOPED_TIMER(find_cost_time); for (unsigned i = 1; i < tracker_groups.size(); ++i) { + if (seek_num > GC_MAX_SEEK_TRACKER) { + break; + } std::lock_guard l(tracker_groups[i].group_lock); for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { @@ -542,22 +557,21 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( COUNTER_UPDATE(seek_tasks_counter, seek_num); COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size()); - LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " seek finished, seek " - << seek_num << " tasks. among them, " << query_consumption.size() - << " tasks can be canceled; " << small_num << " small tasks that were skipped; " - << canceling_task.size() << " tasks is being canceled and has not been completed yet;" + + LOG(INFO) << log_prefix << "seek finished, seek " << seek_num << " tasks. among them, " + << query_consumption.size() << " tasks can be canceled; " << small_num + << " small tasks that were skipped; " << canceling_task.size() + << " tasks is being canceled and has not been completed yet;" << (canceling_task.size() > 0 ? " consist of: " + join(canceling_task, ",") : ""); // Minor gc does not cancel when there is only one query. if (query_consumption.size() == 0) { - LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) - << " finished, no task need be canceled."; + LOG(INFO) << log_prefix << "finished, no task need be canceled."; return 0; } if (query_consumption.size() == 1) { auto iter = query_consumption.begin(); - LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) - << " finished, only one task: " << iter->first + LOG(INFO) << log_prefix << "finished, only one task: " << iter->first << ", memory consumption: " << iter->second << ", no cancel."; return 0; } @@ -568,8 +582,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( while (!max_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); if (cancelled_queryid == TUniqueId()) { - LOG(WARNING) << "GC Free Top Memory Overcommit " << type_string(type) - << ", Task ID parsing failed, label: " << max_pq.top().second; + LOG(WARNING) << log_prefix + << "Task ID parsing failed, label: " << max_pq.top().second; max_pq.pop(); continue; } @@ -578,7 +592,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_msg(query_mem, max_pq.top().second)); - usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", + usage_strings.push_back(fmt::format("{} memory used {} Bytes, overcommit ratio: {}", max_pq.top().second, query_mem, max_pq.top().first)); COUNTER_UPDATE(freed_memory_counter, query_mem); @@ -591,8 +605,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( } profile->merge(free_top_memory_task_profile.get()); - LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << " cancel finished, " - << cancel_tasks_counter->value() + LOG(INFO) << log_prefix << "cancel finished, " << cancel_tasks_counter->value() << " tasks canceled, memory size being freed: " << freed_memory_counter->value() << ", consist of: " << join(usage_strings, ","); return freed_memory_counter->value(); @@ -607,28 +620,77 @@ int64_t MemTrackerLimiter::tg_memory_limit_gc( } int64_t freed_mem = 0; - constexpr auto query_type = MemTrackerLimiter::Type::QUERY; - auto cancel_str = [id, &name, memory_limit, used_memory](int64_t mem_consumption, - const std::string& label) { + + std::string cancel_str = fmt::format( + "work load group memory exceeded limit, group id:{}, name:{}, used:{}, limit:{}, " + "backend:{}.", + id, name, MemTracker::print_bytes(used_memory), MemTracker::print_bytes(memory_limit), + BackendOptions::get_localhost()); + auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "{} cancel top memory overcommit tracker <{}> consumption {}. execute again after " + "enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption)); + }; + auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( - "Resource group id:{}, name:{} memory exceeded limit, cancel top memory {}: " - "memory tracker <{}> consumption {}, backend {}, " - "resource group memory used {}, memory limit {}.", - id, name, MemTrackerLimiter::type_string(query_type), label, - MemTracker::print_bytes(mem_consumption), BackendOptions::get_localhost(), - MemTracker::print_bytes(used_memory), MemTracker::print_bytes(memory_limit)); + "{} cancel top memory used tracker <{}> consumption {}. execute again after " + "enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption)); }; + + LOG(INFO) << fmt::format( + "[MemoryGC] work load group start gc, id:{} name:{}, memory limit: {}, used: {}, " + "need_free_mem: {}.", + id, name, memory_limit, used_memory, need_free_mem); + Defer defer {[&]() { + LOG(INFO) << fmt::format( + "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: " + "{}, need_free_mem: {}, freed memory: {}.", + id, name, memory_limit, used_memory, need_free_mem, freed_mem); + }}; + + // 1. free top overcommit query if (config::enable_query_memory_overcommit) { + RuntimeProfile* tmq_profile = profile->create_child( + fmt::format("FreeGroupTopOvercommitQuery:Name {}", name), true, true); freed_mem += MemTrackerLimiter::free_top_overcommit_query( - need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile); + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, tracker_limiter_groups, + cancel_top_overcommit_str, tmq_profile, GCType::WORK_LOAD_GROUP); } - if (freed_mem < need_free_mem) { - freed_mem += MemTrackerLimiter::free_top_memory_query( - need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile); + if (freed_mem >= need_free_mem) { + return freed_mem; } - LOG(INFO) << fmt::format( - "task group {} finished gc, memory_limit: {}, used_memory: {}, freed_mem: {}.", name, - memory_limit, used_memory, freed_mem); + + // 2. free top usage query + RuntimeProfile* tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name {}", name), true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, tracker_limiter_groups, + cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + + // 3. free top overcommit load + if (config::enable_query_memory_overcommit) { + tmq_profile = profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", name), + true, true); + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, tracker_limiter_groups, + cancel_top_overcommit_str, tmq_profile, GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + } + + // 4. free top usage load + tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", name), true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, tracker_limiter_groups, + cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP); return freed_mem; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 41601494079a27..936faa1592130b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -70,6 +70,9 @@ class MemTrackerLimiter final : public MemTracker { 6 // Experimental memory statistics, usually inaccurate, used for debugging, and expect to add other types in the future. }; + // TODO There are more and more GC codes and there should be a separate manager class. + enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 }; + struct TrackerLimiterGroup { std::list trackers; std::mutex group_lock; @@ -113,6 +116,19 @@ class MemTrackerLimiter final : public MemTracker { __builtin_unreachable(); } + static std::string gc_type_string(GCType type) { + switch (type) { + case GCType::PROCESS: + return "process"; + case GCType::WORK_LOAD_GROUP: + return "work load group"; + default: + LOG(FATAL) << "not match gc type:" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + static bool sys_mem_exceed_limit_check(int64_t bytes); void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } @@ -173,7 +189,7 @@ class MemTrackerLimiter final : public MemTracker { static int64_t free_top_memory_query( int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, - RuntimeProfile* profile); + RuntimeProfile* profile, GCType GCtype); static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, @@ -191,7 +207,7 @@ class MemTrackerLimiter final : public MemTracker { static int64_t free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, - RuntimeProfile* profile); + RuntimeProfile* profile, GCType GCtype); static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, @@ -221,11 +237,7 @@ class MemTrackerLimiter final : public MemTracker { static std::string process_limit_exceeded_errmsg_str(); static std::string process_soft_limit_exceeded_errmsg_str(); // Log the memory usage when memory limit is exceeded. - std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded, - const std::string& last_consumer_tracker, - const std::string& executing_msg); std::string tracker_limit_exceeded_str(); - std::string tracker_limit_exceeded_str(int64_t bytes); std::string debug_string() override { std::stringstream msg; @@ -289,7 +301,8 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { return Status::OK(); } if (_limit > 0 && _consumption->current_value() + bytes > _limit) { - return Status::MemoryLimitExceeded(tracker_limit_exceeded_str(bytes)); + return Status::MemoryLimitExceeded(fmt::format( + "failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str())); } return Status::OK(); } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index e43b8777c8671e..096712f1767a95 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -335,9 +335,9 @@ Status RuntimeState::check_query_state(const std::string& msg) { // Usually used after SCOPED_ATTACH_TASK, during query execution. if (thread_context()->thread_mem_tracker()->limit_exceeded() && !config::enable_query_memory_overcommit) { - auto failed_msg = thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( - thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str(), - thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), msg); + auto failed_msg = + fmt::format("{}, {}", msg, + thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str()); thread_context()->thread_mem_tracker()->print_log_usage(failed_msg); log_error(failed_msg); return Status::MemoryLimitExceeded(failed_msg); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index fc3b64a5a64bbb..38f9039816c095 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -125,9 +125,10 @@ bool MemInfo::process_minor_gc() { je_purge_all_arena_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); - LOG(INFO) << fmt::format("End Minor GC, Free Memory {}. cost(us): {}, details: {}", - PrettyPrinter::print(freed_mem, TUnit::BYTES), - watch.elapsed_time() / 1000, ss.str()); + LOG(INFO) << fmt::format( + "[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}", + PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, + ss.str()); }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); @@ -137,14 +138,14 @@ bool MemInfo::process_minor_gc() { } RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem, tg_profile); + freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile); if (freed_mem > _s_process_minor_gc_size) { return true; } if (config::enable_query_memory_overcommit) { VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "Before free top memory overcommit query in Minor GC", + "[MemoryGC] before free top memory overcommit query in minor GC", MemTrackerLimiter::Type::QUERY); RuntimeProfile* toq_profile = profile->create_child("FreeTopOvercommitMemoryQuery", true, true); @@ -175,9 +176,10 @@ bool MemInfo::process_full_gc() { je_purge_all_arena_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); - LOG(INFO) << fmt::format("End Full GC, Free Memory {}. cost(us): {}, details: {}", - PrettyPrinter::print(freed_mem, TUnit::BYTES), - watch.elapsed_time() / 1000, ss.str()); + LOG(INFO) << fmt::format( + "[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}", + PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000, + ss.str()); }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); @@ -187,13 +189,13 @@ bool MemInfo::process_full_gc() { } RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); - freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem, tg_profile); + freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile); if (freed_mem > _s_process_full_gc_size) { return true; } - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory query in Full GC", - MemTrackerLimiter::Type::QUERY); + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY); RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true); freed_mem += MemTrackerLimiter::free_top_memory_query( _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tmq_profile); @@ -203,7 +205,8 @@ bool MemInfo::process_full_gc() { if (config::enable_query_memory_overcommit) { VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD); + "[MemoryGC] before free top memory overcommit load in full GC", + MemTrackerLimiter::Type::LOAD); RuntimeProfile* tol_profile = profile->create_child("FreeTopMemoryOvercommitLoad", true, true); freed_mem += MemTrackerLimiter::free_top_overcommit_load( @@ -214,8 +217,8 @@ bool MemInfo::process_full_gc() { } } - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory load in Full GC", - MemTrackerLimiter::Type::LOAD); + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD); RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true); freed_mem += MemTrackerLimiter::free_top_memory_load( _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tml_profile); @@ -225,31 +228,39 @@ bool MemInfo::process_full_gc() { return false; } -int64_t MemInfo::tg_hard_memory_limit_gc() { +int64_t MemInfo::tg_not_enable_overcommit_group_gc() { MonotonicStopWatch watch; watch.start(); std::vector task_groups; std::unique_ptr tg_profile = std::make_unique("WorkloadGroup"); int64_t total_free_memory = 0; + ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( + [](const taskgroup::TaskGroupPtr& task_group) { + return !task_group->enable_memory_overcommit(); + }, + &task_groups); + if (task_groups.empty()) { + return 0; + } + + LOG(INFO) << fmt::format( + "[MemoryGC] start GC work load group that not enable overcommit, number of group: {}, " + "if it exceeds the limit, try free size = (group used - group limit).", + task_groups.size()); + Defer defer {[&]() { if (total_free_memory > 0) { std::stringstream ss; tg_profile->pretty_print(&ss); LOG(INFO) << fmt::format( - "End Task Group Overcommit Memory GC, Free Memory {}. cost(us): {}, " - "details: {}", - PrettyPrinter::print(total_free_memory, TUnit::BYTES), + "[MemoryGC] end GC work load group that not enable overcommit, number of " + "group: {}, free memory {}. cost(us): {}, details: {}", + task_groups.size(), PrettyPrinter::print(total_free_memory, TUnit::BYTES), watch.elapsed_time() / 1000, ss.str()); } }}; - ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( - [](const taskgroup::TaskGroupPtr& task_group) { - return !task_group->enable_memory_overcommit(); - }, - &task_groups); - for (const auto& task_group : task_groups) { taskgroup::TaskGroupInfo tg_info; task_group->task_group_info(&tg_info); @@ -261,13 +272,19 @@ int64_t MemInfo::tg_hard_memory_limit_gc() { return total_free_memory; } -int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile) { +int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, + RuntimeProfile* profile) { + MonotonicStopWatch watch; + watch.start(); std::vector task_groups; ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { return task_group->enable_memory_overcommit(); }, &task_groups); + if (task_groups.empty()) { + return 0; + } int64_t total_exceeded_memory = 0; std::vector used_memorys; @@ -283,6 +300,33 @@ int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimePro int64_t total_free_memory = 0; bool gc_all_exceeded = request_free_memory >= total_exceeded_memory; + std::string log_prefix = fmt::format( + "work load group that enable overcommit, number of group: {}, request_free_memory:{}, " + "total_exceeded_memory:{}", + task_groups.size(), request_free_memory, total_exceeded_memory); + if (gc_all_exceeded) { + LOG(INFO) << fmt::format( + "[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - " + "group limit).", + log_prefix); + } else { + LOG(INFO) << fmt::format( + "[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used " + "- group limit) / all group total_exceeded_memory) * request_free_memory.", + log_prefix); + } + + Defer defer {[&]() { + if (total_free_memory > 0) { + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format( + "[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix, + PrettyPrinter::print(total_free_memory, TUnit::BYTES), + watch.elapsed_time() / 1000, ss.str()); + } + }}; + for (int i = 0; i < task_groups.size(); ++i) { if (exceeded_memorys[i] == 0) { continue; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 1ccd918c4a3fda..3691934b800fff 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -184,8 +184,9 @@ class MemInfo { static bool process_minor_gc(); static bool process_full_gc(); - static int64_t tg_hard_memory_limit_gc(); - static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile); + static int64_t tg_not_enable_overcommit_group_gc(); + static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory, + RuntimeProfile* profile); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 089293b0f18c31..1c8085aac45d37 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -45,8 +45,10 @@ void Allocator::sys_memory_check(size_t // Otherwise, if the external catch, directly throw bad::alloc. auto err_msg = fmt::format( "Allocator sys memory check failed: Cannot alloc:{}, consuming " - "tracker:<{}>, exec node:<{}>, {}.", + "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", size, doris::thread_context()->thread_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker()->peak_consumption(), + doris::thread_context()->thread_mem_tracker()->consumption(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc && @@ -121,21 +123,17 @@ void Allocator::memory_tracker_check(siz if (doris::thread_context()->skip_memory_check) return; auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { - auto err_msg = - doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str( - st.to_string(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), - "Allocator mem tracker check failed"); + auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) { doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); if (!doris::enable_thread_catch_bad_alloc) { - LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.", + LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.", print_id(doris::thread_context()->task_id()), err_msg); doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); } else { - LOG(INFO) << fmt::format("Query:{} throw exception, {}.", + LOG(INFO) << fmt::format("query/load:{} throw exception, {}.", print_id(doris::thread_context()->task_id()), err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } @@ -170,7 +168,7 @@ void Allocator::throw_bad_alloc( const std::string& err) const { LOG(WARNING) << err << fmt::format( - " OS physical memory {}. Process memory usage {}, Sys available memory " + " os physical memory {}. process memory used {}, sys available memory " "{}, Stacktrace: {}", doris::PrettyPrinter::print(doris::MemInfo::physical_mem(), doris::TUnit::BYTES),