From 94b56f181cdeca52aa50b808f5912447f7d103ee Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 5 Sep 2024 11:28:51 +0800 Subject: [PATCH] 3 --- be/src/runtime/exec_env.h | 2 - be/src/runtime/exec_env_init.cpp | 1 - .../runtime/memory/thread_mem_tracker_mgr.cpp | 2 - .../runtime/memory/thread_mem_tracker_mgr.h | 27 +++------ be/src/runtime/thread_context.h | 55 +++++++++---------- be/src/vec/common/allocator.cpp | 2 +- 6 files changed, 37 insertions(+), 52 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f751aeb5d82274..98d82f274e746d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -173,7 +173,6 @@ class ExecEnv { std::vector mem_tracker_limiter_pool; void init_mem_tracker(); std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } - MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); } std::shared_ptr page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } @@ -355,7 +354,6 @@ class ExecEnv { // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; - MemTrackerLimiter* _orphan_mem_tracker_raw = nullptr; std::shared_ptr _details_mem_tracker_set; // page size not in cache, data page/index page/etc. std::shared_ptr _page_no_cache_mem_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 674d5ee5115d76..834e402ca1ebb4 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -587,7 +587,6 @@ void ExecEnv::init_mem_tracker() { _s_tracking_memory = true; _orphan_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); _details_mem_tracker_set = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet"); _page_no_cache_mem_tracker = diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index daa4954881974f..33dd0d41822ae1 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -55,7 +55,6 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _untracked_mem = 0; } _limiter_tracker = mem_tracker; - _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker( @@ -67,7 +66,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _reserved_mem = _reserved_mem_stack.back(); _reserved_mem_stack.pop_back(); _limiter_tracker = old_mem_tracker; - _limiter_tracker_raw = old_mem_tracker.get(); } void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 510a75e9943884..bb0091f2e6d6fb 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -64,9 +64,6 @@ class ThreadMemTrackerMgr { // Must be fast enough! Thread update_tracker may be called very frequently. bool push_consumer_tracker(MemTracker* mem_tracker); void pop_consumer_tracker(); - MemTracker* last_consumer_tracker() { - return _consumer_tracker_stack.empty() ? nullptr : _consumer_tracker_stack.back(); - } std::string last_consumer_tracker_label() { return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label(); } @@ -100,10 +97,6 @@ class ThreadMemTrackerMgr { CHECK(init()); return _limiter_tracker; } - MemTrackerLimiter* limiter_mem_tracker_raw() { - CHECK(init()); - return _limiter_tracker_raw; - } void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } @@ -118,7 +111,7 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _limiter_tracker_raw->log_usage(), + std::to_string(_untracked_mem), _limiter_tracker->log_usage(), fmt::to_string(consumer_tracker_buf)); } @@ -143,7 +136,6 @@ class ThreadMemTrackerMgr { bool _wait_gc = false; std::shared_ptr _limiter_tracker; - MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector _consumer_tracker_stack; std::weak_ptr _wg_wptr; @@ -159,7 +151,6 @@ inline bool ThreadMemTrackerMgr::init() { if (_init) return true; if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); _wait_gc = true; _init = true; return true; @@ -271,26 +262,26 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { return; } _stop_consume = true; - DCHECK(_limiter_tracker_raw); + DCHECK(_limiter_tracker); _old_untracked_mem = _untracked_mem; - _limiter_tracker_raw->consume(_old_untracked_mem); + _limiter_tracker->consume(_old_untracked_mem); _untracked_mem -= _old_untracked_mem; _stop_consume = false; } inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { - DCHECK(_limiter_tracker_raw); + DCHECK(_limiter_tracker); DCHECK(size >= 0); CHECK(init()); // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); - if (!_limiter_tracker_raw->try_consume(size)) { + if (!_limiter_tracker->try_consume(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " "{}", - size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit()); + size, _limiter_tracker->consumption(), _limiter_tracker->limit()); return doris::Status::MemoryLimitExceeded(err_msg); } auto wg_ptr = _wg_wptr.lock(); @@ -298,14 +289,14 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, wg_ptr->memory_debug_string()); - _limiter_tracker_raw->release(size); // rollback + _limiter_tracker->release(size); // rollback return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker_raw->release(size); // rollback + _limiter_tracker->release(size); // rollback if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback } @@ -319,7 +310,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { if (_reserved_mem != 0) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); - _limiter_tracker_raw->release(_reserved_mem); + _limiter_tracker->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 81499049ffbf26..0ee47a979628d8 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -100,8 +100,7 @@ #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) #else -#define SCOPED_PEAK_MEM() \ - auto VARNAME_LINENUM(scoped_tls_pm) = doris::ScopedInitThreadContext() +#define SCOPED_PEAK_MEM() auto VARNAME_LINENUM(scoped_tls_pm) = doris::ScopedInitThreadContext() #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext() #endif @@ -235,7 +234,7 @@ class ThreadContext { // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr thread_mem_tracker_mgr; [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { - return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + return thread_mem_tracker_mgr->limiter_mem_tracker().get(); } QueryThreadContext query_thread_context(); @@ -521,18 +520,18 @@ class ScopeSkipMemoryCheck { // Basic macros for mem tracker, usually do not need to be modified and used. #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - do { \ - ORPHAN_TRACKER_CHECK(); \ - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ - size, tracker); \ +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + do { \ + ORPHAN_TRACKER_CHECK(); \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \ + size, tracker); \ } while (0) -#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - do { \ - ORPHAN_TRACKER_CHECK(); \ - tracker->transfer_to( \ - size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \ +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ + do { \ + ORPHAN_TRACKER_CHECK(); \ + tracker->transfer_to( \ + size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \ } while (0) // Mem Hook to consume thread mem tracker @@ -558,21 +557,21 @@ class ScopeSkipMemoryCheck { // if use mem hook, avoid repeated consume. // must call create_thread_local_if_not_exits() before use thread_context(). -#define CONSUME_THREAD_MEM_TRACKER(size) \ - do { \ - if (size == 0 || doris::use_mem_hook) { \ - break; \ - } \ - if (doris::pthread_context_ptr_init) { \ - DCHECK(bthread_self() == 0); \ - doris::thread_context_ptr->consume_memory(size); \ - } else if (bthread_self() != 0) { \ - static_cast(bthread_getspecific(doris::btls_key)) \ - ->consume_memory(size); \ - } else if (doris::ExecEnv::ready()) { \ - MEMORY_ORPHAN_CHECK(); \ - doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ - } \ +#define CONSUME_THREAD_MEM_TRACKER(size) \ + do { \ + if (size == 0 || doris::use_mem_hook) { \ + break; \ + } \ + if (doris::pthread_context_ptr_init) { \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(size); \ + } else if (bthread_self() != 0) { \ + static_cast(bthread_getspecific(doris::btls_key)) \ + ->consume_memory(size); \ + } else if (doris::ExecEnv::ready()) { \ + MEMORY_ORPHAN_CHECK(); \ + doris::ExecEnv::GetInstance()->orphan_mem_tracker()->consume_no_update_peak(size); \ + } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER(size) CONSUME_THREAD_MEM_TRACKER(-size) diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index ae5f27989b2922..c7dda2b4c19a06 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -90,7 +90,7 @@ void Allocator::sys_mem size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker()->peak_consumption(), doris::thread_context()->thread_mem_tracker()->consumption(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&