Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 5, 2024
1 parent f84d6f5 commit 94b56f1
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 52 deletions.
2 changes: 0 additions & 2 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ class ExecEnv {
std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool;
void init_mem_tracker();
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); }
std::shared_ptr<MemTracker> page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; }
MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); }
Expand Down Expand Up @@ -355,7 +354,6 @@ class ExecEnv {
// Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership",
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw = nullptr;
std::shared_ptr<MemTrackerLimiter> _details_mem_tracker_set;
// page size not in cache, data page/index page/etc.
std::shared_ptr<MemTracker> _page_no_cache_mem_tracker;
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ void ExecEnv::init_mem_tracker() {
_s_tracking_memory = true;
_orphan_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan");
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
_details_mem_tracker_set =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet");
_page_no_cache_mem_tracker =
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
_untracked_mem = 0;
}
_limiter_tracker = mem_tracker;
_limiter_tracker_raw = mem_tracker.get();
}

void ThreadMemTrackerMgr::detach_limiter_tracker(
Expand All @@ -67,7 +66,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
_reserved_mem = _reserved_mem_stack.back();
_reserved_mem_stack.pop_back();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
}

void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
Expand Down
27 changes: 9 additions & 18 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class ThreadMemTrackerMgr {
// Must be fast enough! Thread update_tracker may be called very frequently.
bool push_consumer_tracker(MemTracker* mem_tracker);
void pop_consumer_tracker();
MemTracker* last_consumer_tracker() {
return _consumer_tracker_stack.empty() ? nullptr : _consumer_tracker_stack.back();
}
std::string last_consumer_tracker_label() {
return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label();
}
Expand Down Expand Up @@ -100,10 +97,6 @@ class ThreadMemTrackerMgr {
CHECK(init());
return _limiter_tracker;
}
MemTrackerLimiter* limiter_mem_tracker_raw() {
CHECK(init());
return _limiter_tracker_raw;
}

void enable_wait_gc() { _wait_gc = true; }
void disable_wait_gc() { _wait_gc = false; }
Expand All @@ -118,7 +111,7 @@ class ThreadMemTrackerMgr {
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _limiter_tracker_raw->log_usage(),
std::to_string(_untracked_mem), _limiter_tracker->log_usage(),
fmt::to_string(consumer_tracker_buf));
}

Expand All @@ -143,7 +136,6 @@ class ThreadMemTrackerMgr {
bool _wait_gc = false;

std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
MemTrackerLimiter* _limiter_tracker_raw = nullptr;
std::vector<MemTracker*> _consumer_tracker_stack;
std::weak_ptr<WorkloadGroup> _wg_wptr;

Expand All @@ -159,7 +151,6 @@ inline bool ThreadMemTrackerMgr::init() {
if (_init) return true;
if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) {
_limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker();
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_wait_gc = true;
_init = true;
return true;
Expand Down Expand Up @@ -271,41 +262,41 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
return;
}
_stop_consume = true;
DCHECK(_limiter_tracker_raw);
DCHECK(_limiter_tracker);

_old_untracked_mem = _untracked_mem;
_limiter_tracker_raw->consume(_old_untracked_mem);
_limiter_tracker->consume(_old_untracked_mem);
_untracked_mem -= _old_untracked_mem;
_stop_consume = false;
}

inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
DCHECK(_limiter_tracker_raw);
DCHECK(_limiter_tracker);
DCHECK(size >= 0);
CHECK(init());
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
if (!_limiter_tracker->try_consume(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because memory tracker consumption: {}, limit: "
"{}",
size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit());
size, _limiter_tracker->consumption(), _limiter_tracker->limit());
return doris::Status::MemoryLimitExceeded(err_msg);
}
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
wg_ptr->memory_debug_string());
_limiter_tracker_raw->release(size); // rollback
_limiter_tracker->release(size); // rollback
return doris::Status::MemoryLimitExceeded(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker_raw->release(size); // rollback
_limiter_tracker->release(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
Expand All @@ -319,7 +310,7 @@ inline void ThreadMemTrackerMgr::release_reserved() {
if (_reserved_mem != 0) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
_limiter_tracker_raw->release(_reserved_mem);
_limiter_tracker->release(_reserved_mem);
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
Expand Down
55 changes: 27 additions & 28 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
#else
#define SCOPED_PEAK_MEM() \
auto VARNAME_LINENUM(scoped_tls_pm) = doris::ScopedInitThreadContext()
#define SCOPED_PEAK_MEM() auto VARNAME_LINENUM(scoped_tls_pm) = doris::ScopedInitThreadContext()
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext()
#endif
Expand Down Expand Up @@ -235,7 +234,7 @@ class ThreadContext {
// is released somewhere, the hook is triggered to cause the crash.
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
[[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
return thread_mem_tracker_mgr->limiter_mem_tracker().get();
}

QueryThreadContext query_thread_context();
Expand Down Expand Up @@ -521,18 +520,18 @@ class ScopeSkipMemoryCheck {
// Basic macros for mem tracker, usually do not need to be modified and used.
#if defined(USE_MEM_TRACKER) && !defined(BE_TEST)
// used to fix the tracking accuracy of caches.
#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
do { \
ORPHAN_TRACKER_CHECK(); \
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \
size, tracker); \
#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
do { \
ORPHAN_TRACKER_CHECK(); \
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \
size, tracker); \
} while (0)

#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
do { \
ORPHAN_TRACKER_CHECK(); \
tracker->transfer_to( \
size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
do { \
ORPHAN_TRACKER_CHECK(); \
tracker->transfer_to( \
size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \
} while (0)

// Mem Hook to consume thread mem tracker
Expand All @@ -558,21 +557,21 @@ class ScopeSkipMemoryCheck {

// if use mem hook, avoid repeated consume.
// must call create_thread_local_if_not_exits() before use thread_context().
#define CONSUME_THREAD_MEM_TRACKER(size) \
do { \
if (size == 0 || doris::use_mem_hook) { \
break; \
} \
if (doris::pthread_context_ptr_init) { \
DCHECK(bthread_self() == 0); \
doris::thread_context_ptr->consume_memory(size); \
} else if (bthread_self() != 0) { \
static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \
->consume_memory(size); \
} else if (doris::ExecEnv::ready()) { \
MEMORY_ORPHAN_CHECK(); \
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \
} \
#define CONSUME_THREAD_MEM_TRACKER(size) \
do { \
if (size == 0 || doris::use_mem_hook) { \
break; \
} \
if (doris::pthread_context_ptr_init) { \
DCHECK(bthread_self() == 0); \
doris::thread_context_ptr->consume_memory(size); \
} else if (bthread_self() != 0) { \
static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \
->consume_memory(size); \
} else if (doris::ExecEnv::ready()) { \
MEMORY_ORPHAN_CHECK(); \
doris::ExecEnv::GetInstance()->orphan_mem_tracker()->consume_no_update_peak(size); \
} \
} while (0)
#define RELEASE_THREAD_MEM_TRACKER(size) CONSUME_THREAD_MEM_TRACKER(-size)

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem
size, doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker()->consumption(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(),
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());

if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&
Expand Down

0 comments on commit 94b56f1

Please sign in to comment.