Skip to content

Commit

Permalink
fix thread local3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 24, 2023
1 parent c083917 commit 5c82eb6
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 169 deletions.
4 changes: 2 additions & 2 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {
return Status::InternalError("failed to stat file {}: {}", file.native(), err);
}
size_t file_len = statbuf.st_size;
CONSUME_THREAD_MEM_TRACKER(file_len);
CONSUME_MEM_TRACKER(file_len);
void* buf = mmap(nullptr, file_len, PROT_READ, MAP_SHARED, fd, 0);

unsigned char result[MD5_DIGEST_LENGTH];
MD5((unsigned char*)buf, file_len, result);
munmap(buf, file_len);
RELEASE_THREAD_MEM_TRACKER(file_len);
RELEASE_MEM_TRACKER(file_len);

std::stringstream ss;
for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) {
Expand Down
14 changes: 8 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ namespace doris {
// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
public:
ThreadMemTrackerMgr() {}
ThreadMemTrackerMgr() = default;

~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) flush_untracked_mem();
if (_init) {
flush_untracked_mem();
}
}

bool init();
Expand Down Expand Up @@ -76,7 +78,7 @@ class ThreadMemTrackerMgr {
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded.
void consume(int64_t size, bool large_memory_check = false);
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
Expand All @@ -91,7 +93,7 @@ class ThreadMemTrackerMgr {
}

void disable_wait_gc() { _wait_gc = false; }
bool wait_gc() { return _wait_gc; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_fragment(const std::string& exceed_msg);

std::string print_debug_string() {
Expand Down Expand Up @@ -160,7 +162,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
_consumer_tracker_stack.pop_back();
}

inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) {
inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
_untracked_mem += size;
if (!ExecEnv::GetInstance()->initialized()) {
return;
Expand All @@ -176,7 +178,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check)
}
// Large memory alloc should use allocator.h
// Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM.
if (large_memory_check && size > doris::config::large_memory_check_bytes) {
if (skip_large_memory_check == 0 && size > doris::config::large_memory_check_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
"malloc or new large memory: {}, looking forward to using Allocator, this is just "
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ Status RuntimeState::check_query_state(const std::string& msg) {
//
// If the thread MemTrackerLimiter exceeds the limit, an error status is returned.
// Usually used after SCOPED_ATTACH_TASK, during query execution.
if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
if (thread_context_ptr_init && thread_context()->thread_mem_tracker()->limit_exceeded() &&
!config::enable_query_memory_overcommit) {
auto failed_msg = thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str(),
Expand Down
27 changes: 6 additions & 21 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,27 @@
namespace doris {
class MemTracker;

ThreadContextPtr::ThreadContextPtr() {
if (thread_context_raw == nullptr) {
thread_context_raw = new ThreadContext;
_own_thread_context = true;
}
doris::thread_context_ptr_init = true;
}

ThreadContextPtr::~ThreadContextPtr() {
doris::thread_context_ptr_init = false;
if (_own_thread_context) {
delete thread_context_raw;
}
}

AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& task_id, const TUniqueId& fragment_instance_id) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
signal::set_signal_task_id(task_id);
thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
}

AttachTask::AttachTask(RuntimeState* runtime_state) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
signal::set_signal_task_id(runtime_state->query_id());
thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(),
runtime_state->query_mem_tracker());
}

AttachTask::~AttachTask() {
thread_context()->detach_task();
SwitchBthreadLocal::switch_back_pthread_local();
ThreadLocalHandle::release_thread_local();
}

AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
if (mem_tracker) {
_need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
}
Expand All @@ -67,7 +52,7 @@ AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
const std::shared_ptr<MemTracker>& mem_tracker)
: _mem_tracker(mem_tracker) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
if (_mem_tracker) {
_need_pop =
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
Expand All @@ -78,7 +63,7 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
if (_need_pop) {
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
}
SwitchBthreadLocal::switch_back_pthread_local();
ThreadLocalHandle::release_thread_local();
}

} // namespace doris
Loading

0 comments on commit 5c82eb6

Please sign in to comment.