Skip to content

Commit

Permalink
fix thread context
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 26, 2023
1 parent bc60685 commit 0ccf813
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 369 deletions.
4 changes: 2 additions & 2 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {
return Status::InternalError("failed to stat file {}: {}", file.native(), err);
}
size_t file_len = statbuf.st_size;
CONSUME_THREAD_MEM_TRACKER(file_len);
CONSUME_MEM_TRACKER(file_len);
void* buf = mmap(nullptr, file_len, PROT_READ, MAP_SHARED, fd, 0);

unsigned char result[MD5_DIGEST_LENGTH];
MD5((unsigned char*)buf, file_len, result);
munmap(buf, file_len);
RELEASE_THREAD_MEM_TRACKER(file_len);
RELEASE_MEM_TRACKER(file_len);

std::stringstream ss;
for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
_parent_label = parent->label();
_parent_group_num = parent->group_num();
} else if (thread_context_ptr.init) {
} else if (doris::is_thread_context_init()) {
_parent_label = thread_context()->thread_mem_tracker()->label();
_parent_group_num = thread_context()->thread_mem_tracker()->group_num();
}
Expand Down
14 changes: 8 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ namespace doris {
// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
public:
ThreadMemTrackerMgr() {}
ThreadMemTrackerMgr() = default;

~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) flush_untracked_mem();
if (_init) {
flush_untracked_mem();
}
}

bool init();
Expand Down Expand Up @@ -77,7 +79,7 @@ class ThreadMemTrackerMgr {
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded.
void consume(int64_t size, bool large_memory_check = false);
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
Expand All @@ -92,7 +94,7 @@ class ThreadMemTrackerMgr {
}

void disable_wait_gc() { _wait_gc = false; }
bool wait_gc() { return _wait_gc; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_instance(const std::string& exceed_msg);

std::string print_debug_string() {
Expand Down Expand Up @@ -161,7 +163,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
_consumer_tracker_stack.pop_back();
}

inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) {
inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
_untracked_mem += size;
if (!ExecEnv::ready()) {
return;
Expand All @@ -176,7 +178,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check)
flush_untracked_mem();
}

if (large_memory_check && doris::config::large_memory_check_bytes > 0 &&
if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 &&
size > doris::config::large_memory_check_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ Status RuntimeState::check_query_state(const std::string& msg) {
//
// If the thread MemTrackerLimiter exceeds the limit, an error status is returned.
// Usually used after SCOPED_ATTACH_TASK, during query execution.
if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() &&
!config::enable_query_memory_overcommit) {
auto failed_msg =
fmt::format("{}, {}", msg,
Expand Down
19 changes: 6 additions & 13 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,27 @@
namespace doris {
class MemTracker;

DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr);

ThreadContextPtr::ThreadContextPtr() {
INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
init = true;
}

AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& task_id, const TUniqueId& fragment_instance_id) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
signal::set_signal_task_id(task_id);
thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
}

AttachTask::AttachTask(RuntimeState* runtime_state) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
signal::set_signal_task_id(runtime_state->query_id());
thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(),
runtime_state->query_mem_tracker());
}

AttachTask::~AttachTask() {
thread_context()->detach_task();
SwitchBthreadLocal::switch_back_pthread_local();
ThreadLocalHandle::release_thread_local();
}

AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
if (mem_tracker) {
_need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
}
Expand All @@ -59,7 +52,7 @@ AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
const std::shared_ptr<MemTracker>& mem_tracker)
: _mem_tracker(mem_tracker) {
SwitchBthreadLocal::switch_to_bthread_local();
ThreadLocalHandle::handle_thread_local();
if (_mem_tracker) {
_need_pop =
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
Expand All @@ -70,7 +63,7 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
if (_need_pop) {
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
}
SwitchBthreadLocal::switch_back_pthread_local();
ThreadLocalHandle::release_thread_local();
}

} // namespace doris
Loading

0 comments on commit 0ccf813

Please sign in to comment.