Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 10, 2023
1 parent 6ca0f3f commit ebb4a93
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 120 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ DEFINE_mBool(disable_memory_gc, "false");

DEFINE_mInt64(large_memory_check_bytes, "2147483648");

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");

DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216");
Expand Down
6 changes: 3 additions & 3 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ void Daemon::memory_gc_thread() {
auto proc_mem_no_allocator_cache = doris::MemInfo::proc_mem_no_allocator_cache();

// GC excess memory for resource groups that not enable overcommit
auto tg_free_mem = doris::MemInfo::tg_hard_memory_limit_gc();
auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
proc_mem_no_allocator_cache -= tg_free_mem;

Expand All @@ -239,7 +239,7 @@ void Daemon::memory_gc_thread() {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("Start Full GC, {}.",
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
Expand All @@ -251,7 +251,7 @@ void Daemon::memory_gc_thread() {
proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("Start Minor GC, {}.",
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
Expand Down
202 changes: 132 additions & 70 deletions be/src/runtime/memory/mem_tracker_limiter.cpp

Large diffs are not rendered by default.

27 changes: 20 additions & 7 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class MemTrackerLimiter final : public MemTracker {
6 // Experimental memory statistics, usually inaccurate, used for debugging, and expect to add other types in the future.
};

// TODO There are more and more GC codes and there should be a separate manager class.
enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 };

struct TrackerLimiterGroup {
std::list<MemTrackerLimiter*> trackers;
std::mutex group_lock;
Expand Down Expand Up @@ -113,6 +116,19 @@ class MemTrackerLimiter final : public MemTracker {
__builtin_unreachable();
}

static std::string gc_type_string(GCType type) {
switch (type) {
case GCType::PROCESS:
return "process";
case GCType::WORK_LOAD_GROUP:
return "work load group";
default:
LOG(FATAL) << "not match gc type:" << static_cast<int>(type);
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}

static bool sys_mem_exceed_limit_check(int64_t bytes);

void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; }
Expand Down Expand Up @@ -173,7 +189,7 @@ class MemTrackerLimiter final : public MemTracker {
static int64_t free_top_memory_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile);
RuntimeProfile* profile, GCType GCtype);

static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
Expand All @@ -191,7 +207,7 @@ class MemTrackerLimiter final : public MemTracker {
static int64_t free_top_overcommit_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile);
RuntimeProfile* profile, GCType GCtype);

static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
Expand Down Expand Up @@ -221,11 +237,7 @@ class MemTrackerLimiter final : public MemTracker {
static std::string process_limit_exceeded_errmsg_str();
static std::string process_soft_limit_exceeded_errmsg_str();
// Log the memory usage when memory limit is exceeded.
std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded,
const std::string& last_consumer_tracker,
const std::string& executing_msg);
std::string tracker_limit_exceeded_str();
std::string tracker_limit_exceeded_str(int64_t bytes);

std::string debug_string() override {
std::stringstream msg;
Expand Down Expand Up @@ -289,7 +301,8 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
return Status::OK();
}
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
return Status::MemoryLimitExceeded(tracker_limit_exceeded_str(bytes));
return Status::MemoryLimitExceeded(fmt::format(
"failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str()));
}
return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ Status RuntimeState::check_query_state(const std::string& msg) {
// Usually used after SCOPED_ATTACH_TASK, during query execution.
if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
!config::enable_query_memory_overcommit) {
auto failed_msg = thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str(),
thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), msg);
auto failed_msg =
fmt::format("{}, {}", msg,
thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str());
thread_context()->thread_mem_tracker()->print_log_usage(failed_msg);
log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
Expand Down
94 changes: 69 additions & 25 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ bool MemInfo::process_minor_gc() {
je_purge_all_arena_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format("End Minor GC, Free Memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
LOG(INFO) << fmt::format(
"[MemoryGC] end minor GC, free memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
ss.str());
}};

freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
Expand All @@ -137,14 +138,14 @@ bool MemInfo::process_minor_gc() {
}

RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem, tg_profile);
freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - freed_mem, tg_profile);
if (freed_mem > _s_process_minor_gc_size) {
return true;
}

if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"Before free top memory overcommit query in Minor GC",
"[MemoryGC] before free top memory overcommit query in minor GC",
MemTrackerLimiter::Type::QUERY);
RuntimeProfile* toq_profile =
profile->create_child("FreeTopOvercommitMemoryQuery", true, true);
Expand Down Expand Up @@ -175,9 +176,10 @@ bool MemInfo::process_full_gc() {
je_purge_all_arena_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format("End Full GC, Free Memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
LOG(INFO) << fmt::format(
"[MemoryGC] end full GC, free Memory {}. cost(us): {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), watch.elapsed_time() / 1000,
ss.str());
}};

freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
Expand All @@ -187,13 +189,13 @@ bool MemInfo::process_full_gc() {
}

RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem, tg_profile);
freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - freed_mem, tg_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}

VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory query in Full GC",
MemTrackerLimiter::Type::QUERY);
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory query in full GC", MemTrackerLimiter::Type::QUERY);
RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true);
freed_mem += MemTrackerLimiter::free_top_memory_query(
_s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tmq_profile);
Expand All @@ -203,7 +205,8 @@ bool MemInfo::process_full_gc() {

if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD);
"[MemoryGC] before free top memory overcommit load in full GC",
MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tol_profile =
profile->create_child("FreeTopMemoryOvercommitLoad", true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
Expand All @@ -214,8 +217,8 @@ bool MemInfo::process_full_gc() {
}
}

VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory load in Full GC",
MemTrackerLimiter::Type::LOAD);
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory load in full GC", MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true);
freed_mem += MemTrackerLimiter::free_top_memory_load(
_s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tml_profile);
Expand All @@ -225,31 +228,39 @@ bool MemInfo::process_full_gc() {
return false;
}

int64_t MemInfo::tg_hard_memory_limit_gc() {
int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
std::unique_ptr<RuntimeProfile> tg_profile = std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;

ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return !task_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
return 0;
}

LOG(INFO) << fmt::format(
"[MemoryGC] start GC work load group that not enable overcommit, number of group: {}, "
"if it exceeds the limit, try free size = (group used - group limit).",
task_groups.size());

Defer defer {[&]() {
if (total_free_memory > 0) {
std::stringstream ss;
tg_profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"End Task Group Overcommit Memory GC, Free Memory {}. cost(us): {}, "
"details: {}",
PrettyPrinter::print(total_free_memory, TUnit::BYTES),
"[MemoryGC] end GC work load group that not enable overcommit, number of "
"group: {}, free memory {}. cost(us): {}, details: {}",
task_groups.size(), PrettyPrinter::print(total_free_memory, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
}
}};

ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return !task_group->enable_memory_overcommit();
},
&task_groups);

for (const auto& task_group : task_groups) {
taskgroup::TaskGroupInfo tg_info;
task_group->task_group_info(&tg_info);
Expand All @@ -261,13 +272,19 @@ int64_t MemInfo::tg_hard_memory_limit_gc() {
return total_free_memory;
}

int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile) {
int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile) {
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
return 0;
}

int64_t total_exceeded_memory = 0;
std::vector<int64_t> used_memorys;
Expand All @@ -283,6 +300,33 @@ int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimePro

int64_t total_free_memory = 0;
bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
std::string log_prefix = fmt::format(
"work load group that enable overcommit, number of group: {}, request_free_memory:{}, "
"total_exceeded_memory:{}",
task_groups.size(), request_free_memory, total_exceeded_memory);
if (gc_all_exceeded) {
LOG(INFO) << fmt::format(
"[MemoryGC] start GC {}, request more than exceeded, try free size = (group used - "
"group limit).",
log_prefix);
} else {
LOG(INFO) << fmt::format(
"[MemoryGC] start GC {}, request less than exceeded, try free size = ((group used "
"- group limit) / all group total_exceeded_memory) * request_free_memory.",
log_prefix);
}

Defer defer {[&]() {
if (total_free_memory > 0) {
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] end GC {}, free memory {}. cost(us): {}, details: {}", log_prefix,
PrettyPrinter::print(total_free_memory, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
}
}};

for (int i = 0; i < task_groups.size(); ++i) {
if (exceeded_memorys[i] == 0) {
continue;
Expand Down
5 changes: 3 additions & 2 deletions be/src/util/mem_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ class MemInfo {
static bool process_minor_gc();
static bool process_full_gc();

static int64_t tg_hard_memory_limit_gc();
static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile);
static int64_t tg_not_enable_overcommit_group_gc();
static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile);

// It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
// avoid multiple threads starting at the same time and causing OOM.
Expand Down
16 changes: 7 additions & 9 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
// Otherwise, if the external catch, directly throw bad::alloc.
auto err_msg = fmt::format(
"Allocator sys memory check failed: Cannot alloc:{}, consuming "
"tracker:<{}>, exec node:<{}>, {}.",
"tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.",
size, doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker()->consumption(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc &&
Expand Down Expand Up @@ -121,21 +123,17 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz
if (doris::thread_context()->skip_memory_check) return;
auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
if (!st) {
auto err_msg =
doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
st.to_string(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
"Allocator mem tracker check failed");
auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string());
doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg);
// If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel.
if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) {
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
if (!doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.",
LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
} else {
LOG(INFO) << fmt::format("Query:{} throw exception, {}.",
LOG(INFO) << fmt::format("query/load:{} throw exception, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
}
Expand Down Expand Up @@ -170,7 +168,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
<< fmt::format(
" OS physical memory {}. Process memory usage {}, Sys available memory "
" os physical memory {}. process memory used {}, sys available memory "
"{}, Stacktrace: {}",
doris::PrettyPrinter::print(doris::MemInfo::physical_mem(),
doris::TUnit::BYTES),
Expand Down

0 comments on commit ebb4a93

Please sign in to comment.