Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed May 23, 2024
1 parent 9adf326 commit dffa495
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
22 changes: 11 additions & 11 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,27 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
CHECK(init());
if (_reserved_mem == 0) {
flush_untracked_mem();
flush_untracked_mem();
_reserved_mem_stack.push_back(_reserved_mem);
if (_reserved_mem != 0) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
_limiter_tracker = mem_tracker;
_limiter_tracker_raw = mem_tracker.get();
_attach_level++;
}

void ThreadMemTrackerMgr::detach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
CHECK(init());
// If reserve memory is called in nested attach, release reserved in outermost detach.
if (_attach_level == 1) {
release_reserved();
}
if (_reserved_mem == 0) {
flush_untracked_mem();
}
flush_untracked_mem();
release_reserved();
DCHECK(!_reserved_mem_stack.empty());
_reserved_mem = _reserved_mem_stack.back();
_reserved_mem_stack.pop_back();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
_attach_level--;
}

void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
Expand Down
20 changes: 11 additions & 9 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ThreadMemTrackerMgr {
~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) {
DCHECK(_reserved_mem == 0);
flush_untracked_mem();
}
}
Expand Down Expand Up @@ -132,6 +133,9 @@ class ThreadMemTrackerMgr {
int64_t _old_untracked_mem = 0;

int64_t _reserved_mem = 0;
// SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used,
// so `attach_limiter_tracker` may be nested.
std::vector<int64_t> _reserved_mem_stack;

bool _count_scope_mem = false;
int64_t _scope_mem = 0;
Expand All @@ -149,9 +153,6 @@ class ThreadMemTrackerMgr {
bool _stop_consume = false;
TUniqueId _query_id = TUniqueId();
bool _is_query_cancelled = false;
// SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be called,
// so `attach_limiter_tracker` may be nested, _attach_level is the number of nesting levels.
size_t _attach_level = 0;
};

inline bool ThreadMemTrackerMgr::init() {
Expand Down Expand Up @@ -181,6 +182,7 @@ inline bool ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {

inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
DCHECK(!_consumer_tracker_stack.empty());
flush_untracked_mem();
_consumer_tracker_stack.back()->consume(_untracked_mem);
_consumer_tracker_stack.back()->release(_reserved_mem);
_consumer_tracker_stack.pop_back();
Expand All @@ -196,7 +198,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
_reserved_mem -= size;
// store bytes that not synchronized to process reserved memory.
_untracked_mem += size;
if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
// If _reserved_mem is added back, it still need synchronized to process reserved_memory.
if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
_untracked_mem = 0;
}
Expand All @@ -213,6 +216,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
}
}
_untracked_mem += size;
DCHECK(_reserved_mem == 0);
if (!_init && !ExecEnv::ready()) {
return;
}
Expand Down Expand Up @@ -243,7 +247,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the Memory Hook again, so suspend consumption to avoid falling into an infinite loop.
if (_untracked_mem == 0 || !init()) {
if (_untracked_mem == 0 || _reserved_mem != 0 || !init()) {
return;
}
_stop_consume = true;
Expand All @@ -267,9 +271,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
CHECK(init());
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
if (_reserved_mem == 0) {
flush_untracked_mem();
}
flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
return false;
}
Expand All @@ -288,7 +290,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
}

inline void ThreadMemTrackerMgr::release_reserved() {
if (_reserved_mem > 0) {
if (_reserved_mem != 0) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
_limiter_tracker_raw->release(_reserved_mem);
Expand Down

0 comments on commit dffa495

Please sign in to comment.