From fab0bca008e4c4a76009dd49ff1cf79d07cb6c7c Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 23 Oct 2023 02:21:52 +0800 Subject: [PATCH] fix thread context --- be/src/io/fs/local_file_system.cpp | 4 +- be/src/runtime/memory/mem_tracker.cpp | 2 +- .../runtime/memory/thread_mem_tracker_mgr.h | 14 +- be/src/runtime/runtime_state.cpp | 2 +- be/src/runtime/thread_context.cpp | 19 +- be/src/runtime/thread_context.h | 285 ++++++++++-------- be/src/runtime/threadlocal.cc | 84 ------ be/src/runtime/threadlocal.h | 129 -------- be/src/service/doris_main.cpp | 3 + be/src/util/ref_count_closure.h | 1 - be/src/util/thread.cpp | 6 + be/src/vec/common/allocator.cpp | 34 ++- be/src/vec/sink/vtablet_sink.cpp | 1 - be/test/testutil/run_all_tests.cpp | 1 + 14 files changed, 216 insertions(+), 369 deletions(-) delete mode 100644 be/src/runtime/threadlocal.cc delete mode 100644 be/src/runtime/threadlocal.h diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 83e1554fe6b48f3..6ba402b5eff38f3 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -274,13 +274,13 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) { return Status::InternalError("failed to stat file {}: {}", file.native(), err); } size_t file_len = statbuf.st_size; - CONSUME_THREAD_MEM_TRACKER(file_len); + CONSUME_MEM_TRACKER(file_len); void* buf = mmap(nullptr, file_len, PROT_READ, MAP_SHARED, fd, 0); unsigned char result[MD5_DIGEST_LENGTH]; MD5((unsigned char*)buf, file_len, result); munmap(buf, file_len); - RELEASE_THREAD_MEM_TRACKER(file_len); + RELEASE_MEM_TRACKER(file_len); std::stringstream ss; for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) { diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index df8e9d80a4ad345..9b35b2a7d849ee4 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) { if (parent) { _parent_label = parent->label(); _parent_group_num = parent->group_num(); - } else if (thread_context_ptr.init) { + } else if (doris::is_thread_context_init()) { _parent_label = thread_context()->thread_mem_tracker()->label(); _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 487f82ac5494a21..c6849bdf15e81ad 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -40,11 +40,13 @@ namespace doris { // Memory Hook is counted in the memory tracker of the current thread. class ThreadMemTrackerMgr { public: - ThreadMemTrackerMgr() {} + ThreadMemTrackerMgr() = default; ~ThreadMemTrackerMgr() { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. - if (_init) flush_untracked_mem(); + if (_init) { + flush_untracked_mem(); + } } bool init(); @@ -77,7 +79,7 @@ class ThreadMemTrackerMgr { // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. - void consume(int64_t size, bool large_memory_check = false); + void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } @@ -92,7 +94,7 @@ class ThreadMemTrackerMgr { } void disable_wait_gc() { _wait_gc = false; } - bool wait_gc() { return _wait_gc; } + [[nodiscard]] bool wait_gc() const { return _wait_gc; } void cancel_instance(const std::string& exceed_msg); std::string print_debug_string() { @@ -161,7 +163,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() { _consumer_tracker_stack.pop_back(); } -inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) { +inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) { _untracked_mem += size; if (!ExecEnv::ready()) { return; @@ -176,7 +178,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) flush_untracked_mem(); } - if (large_memory_check && doris::config::large_memory_check_bytes > 0 && + if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 && size > doris::config::large_memory_check_bytes) { _stop_consume = true; LOG(WARNING) << fmt::format( diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 7a24b86621bbfb0..d269da7dd5e49fd 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -334,7 +334,7 @@ Status RuntimeState::check_query_state(const std::string& msg) { // // If the thread MemTrackerLimiter exceeds the limit, an error status is returned. // Usually used after SCOPED_ATTACH_TASK, during query execution. - if (thread_context()->thread_mem_tracker()->limit_exceeded() && + if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() && !config::enable_query_memory_overcommit) { auto failed_msg = fmt::format("{}, {}", msg, diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index a092385f410bc54..9055b722b939826 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -23,22 +23,15 @@ namespace doris { class MemTracker; -DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr); - -ThreadContextPtr::ThreadContextPtr() { - INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr); - init = true; -} - AttachTask::AttachTask(const std::shared_ptr& mem_tracker, const TUniqueId& task_id, const TUniqueId& fragment_instance_id) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::handle_thread_local(); signal::set_signal_task_id(task_id); thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::handle_thread_local(); signal::set_signal_task_id(runtime_state->query_id()); thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(), runtime_state->query_mem_tracker()); @@ -46,11 +39,11 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { AttachTask::~AttachTask() { thread_context()->detach_task(); - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::release_thread_local(); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::handle_thread_local(); if (mem_tracker) { _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); } @@ -59,7 +52,7 @@ AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( const std::shared_ptr& mem_tracker) : _mem_tracker(mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::handle_thread_local(); if (_mem_tracker) { _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); @@ -70,7 +63,7 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { if (_need_pop) { thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); } - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::release_thread_local(); } } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index a5c9473d665a0c6..fb4679c9224720f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,7 +33,6 @@ #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" -#include "runtime/threadlocal.h" #include "util/defer_op.h" // IWYU pragma: keep // Used to observe the memory usage of the specified code segment @@ -86,11 +85,26 @@ #define SCOPED_TRACK_MEMORY_TO_UNKNOWN() (void)0 #endif -#define SKIP_MEMORY_CHECK(...) \ - do { \ - doris::thread_context()->skip_memory_check++; \ - DEFER({ doris::thread_context()->skip_memory_check--; }); \ - __VA_ARGS__; \ +#define SKIP_MEMORY_CHECK(...) \ + do { \ + doris::ThreadLocalHandle::handle_thread_local(); \ + doris::thread_context()->skip_memory_check++; \ + DEFER({ \ + doris::thread_context()->skip_memory_check--; \ + doris::ThreadLocalHandle::release_thread_local(); \ + }); \ + __VA_ARGS__; \ + } while (0) + +#define SKIP_LARGE_MEMORY_CHECK(...) \ + do { \ + doris::ThreadLocalHandle::handle_thread_local(); \ + doris::thread_context()->skip_large_memory_check++; \ + DEFER({ \ + doris::thread_context()->skip_large_memory_check--; \ + doris::ThreadLocalHandle::release_thread_local(); \ + }); \ + __VA_ARGS__; \ } while (0) namespace doris { @@ -101,63 +115,37 @@ class RuntimeState; extern bthread_key_t btls_key; -// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, -// see https://github.com/apache/doris/pull/7911 -// -// If we want to avoid this error, -// 1. For non-trivial variables in thread_local, such as std::string, you need to store them as pointers to -// ensure that thread_local is trivial, these non-trivial pointers will uniformly call destructors elsewhere. -// 2. The default destructor of the thread_local variable cannot be overridden. -// -// This is difficult to implement. Because the destructor is not overwritten, it means that the outside cannot -// be notified when the thread terminates, and the non-trivial pointers in thread_local cannot be released in time. -// The func provided by pthread and std::thread doesn't help either. -// -// So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by -// Thread-scoped thread local + Class-scoped thread local. -// -// This may look very trick, but it's the best way I can find. -// -// refer to: -// https://gcc.gnu.org/onlinedocs/gcc-3.3.1/gcc/Thread-Local.html -// https://stackoverflow.com/questions/12049684/ -// https://sourceware.org/glibc/wiki/Destructor%20support%20for%20thread_local%20variables -// https://www.jianshu.com/p/756240e837dd -// https://man7.org/linux/man-pages/man3/pthread_tryjoin_np.3.html -class ThreadContextPtr { -public: - ThreadContextPtr(); - // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above. - - // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock. - bool init = false; - - DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr); -}; - -inline thread_local ThreadContextPtr thread_context_ptr; - +// Is true after ThreadContext construction. +inline thread_local bool pthread_context_ptr_init = false; +inline thread_local constinit ThreadContext* thread_context_ptr; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS -// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. -inline thread_local ThreadContext* bthread_context; +// cache the key and value of bthread TLS in pthread TLS. +inline thread_local constinit ThreadContext* bthread_context = nullptr; inline thread_local bthread_t bthread_id; // The thread context saves some info about a working thread. // 2 required info: // 1. thread_id: Current thread id, Auto generated. -// 2. type: The type is a enum value indicating which type of task current thread is running. +// 2. type(abolished): The type is a enum value indicating which type of task current thread is running. // For example: QUERY, LOAD, COMPACTION, ... // 3. task id: A unique id to identify this task. maybe query id, load job id, etc. +// 4. ThreadMemTrackerMgr // // There may be other optional info to be added later. class ThreadContext { public: ThreadContext() { thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); - if (ExecEnv::ready()) thread_mem_tracker_mgr->init(); + if (ExecEnv::ready()) { + thread_mem_tracker_mgr->init(); + } } - ~ThreadContext() { thread_context_ptr.init = false; } + ~ThreadContext() { + pthread_context_ptr_init = false; + bthread_context = nullptr; + bthread_id = bthread_self(); // Avoid CONSUME_MEM_TRACKER call pthread_getspecific. + } void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { @@ -179,47 +167,55 @@ class ThreadContext { thread_mem_tracker_mgr->detach_limiter_tracker(); } - const TUniqueId& task_id() const { return _task_id; } - const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + [[nodiscard]] const TUniqueId& task_id() const { return _task_id; } + [[nodiscard]] const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } - std::string get_thread_id() { + static std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread TCMalloc Hook starts to + // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to // consume/release mem_tracker. // Note that the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory - // is released somewhere, the TCMalloc hook is triggered to cause the crash. + // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr thread_mem_tracker_mgr; - MemTrackerLimiter* thread_mem_tracker() { + [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); } void consume_memory(const int64_t size) const { - thread_mem_tracker_mgr->consume(size, large_memory_check); + thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } - int switch_bthread_local_count = 0; + int handle_thread_local_count = 0; int skip_memory_check = 0; - bool large_memory_check = true; + int skip_large_memory_check = 0; private: TUniqueId _task_id; TUniqueId _fragment_instance_id; }; -// Switch thread context from pthread local to bthread local context. -// Cache the pointer of bthread local in pthead local, -// Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems. -class SwitchBthreadLocal { +class ThreadLocalHandle { public: - static void switch_to_bthread_local() { - if (bthread_self() != 0) { - // Very frequent bthread_getspecific will slow, but switch_to_bthread_local is not expected to be much. + static void handle_thread_local() { + if (bthread_self() == 0) { + if (!pthread_context_ptr_init) { + DCHECK(bthread_equal(0, bthread_id)); // Not used in bthread before. + thread_context_ptr = new ThreadContext(); + pthread_context_ptr_init = true; + } + DCHECK(thread_context_ptr != nullptr); + thread_context_ptr->handle_thread_local_count++; + } else { + // Avoid calling bthread_getspecific frequently to get bthread local. + // Very frequent bthread_getspecific will slow, but handle_thread_local is not expected to be much. + // Cache the pointer of bthread local in pthead local. + bthread_id = bthread_self(); bthread_context = static_cast(bthread_getspecific(btls_key)); if (bthread_context == nullptr) { // A new bthread starts, two scenarios: @@ -230,67 +226,92 @@ class SwitchBthreadLocal { // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. // So tracker call reset 0 like reuses btls. // during this period, stop the use of thread_context. - thread_context_ptr.init = false; bthread_context = new ThreadContext; // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); - thread_context_ptr.init = true; } - bthread_id = bthread_self(); - bthread_context->switch_bthread_local_count++; + DCHECK(bthread_context != nullptr); + bthread_context->handle_thread_local_count++; } } - // `switch_to_bthread_local` and `switch_back_pthread_local` should be used in pairs, - // `switch_to_bthread_local` should only be called if `switch_to_bthread_local` returns true - static void switch_back_pthread_local() { - if (bthread_self() != 0) { + // `handle_thread_local` and `handle_thread_local` should be used in pairs, + // `release_thread_local` should only be called if `handle_thread_local` returns true + static void release_thread_local() { + if (bthread_self() == 0) { + DCHECK(pthread_context_ptr_init); + thread_context_ptr->handle_thread_local_count--; + if (thread_context_ptr->handle_thread_local_count == 0) { + pthread_context_ptr_init = false; + delete doris::thread_context_ptr; + thread_context_ptr = nullptr; + } + } else { if (!bthread_equal(bthread_self(), bthread_id)) { bthread_id = bthread_self(); bthread_context = static_cast(bthread_getspecific(btls_key)); DCHECK(bthread_context != nullptr); } - bthread_context->switch_bthread_local_count--; - if (bthread_context->switch_bthread_local_count == 0) { - bthread_context = thread_context_ptr._ptr; + bthread_context->handle_thread_local_count--; + if (bthread_context->handle_thread_local_count == 0) { + bthread_id = 0; + bthread_context = nullptr; } } } }; -// Note: All use of thread_context() in bthread requires the use of SwitchBthreadLocal. -static ThreadContext* thread_context() { - if (bthread_self() != 0) { +static bool is_thread_context_init() { + if (pthread_context_ptr_init) { + // in pthread + DCHECK(thread_context_ptr != nullptr); + DCHECK(bthread_equal(0, bthread_id)); // Not used in bthread before. + return true; + } else if (bthread_self() != 0) { // in bthread + DCHECK(!pthread_context_ptr_init); if (!bthread_equal(bthread_self(), bthread_id)) { // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. bthread_id = bthread_self(); bthread_context = static_cast(bthread_getspecific(btls_key)); - // if nullptr, a new bthread task start and no reusable bthread local, - // or bthread switch pthread but not call switch_to_bthread_local, use pthread local context - // else, bthread switch pthread and called switch_to_bthread_local, use bthread local context. - if (bthread_context == nullptr) { - bthread_context = thread_context_ptr._ptr; - } } - return bthread_context; + if (doris::bthread_context == nullptr) { + return false; + } else { + return true; + } } else { + return false; + } +} + +// must call handle_thread_local() and is_thread_context_init() before use thread_context(). +static ThreadContext* thread_context() { + if (bthread_self() == 0) { // in pthread - return thread_context_ptr._ptr; + DCHECK(pthread_context_ptr_init); + DCHECK(thread_context_ptr != nullptr); + return thread_context_ptr; + } else { + // in bthread + DCHECK(bthread_context != nullptr); + return bthread_context; } } class ScopeMemCount { public: explicit ScopeMemCount(int64_t* scope_mem) { + ThreadLocalHandle::handle_thread_local(); _scope_mem = scope_mem; thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); } ~ScopeMemCount() { *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); + ThreadLocalHandle::release_thread_local(); } private: @@ -311,14 +332,14 @@ class AttachTask { class SwitchThreadMemTrackerLimiter { public: explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { - SwitchBthreadLocal::switch_to_bthread_local(); + ThreadLocalHandle::handle_thread_local(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId()); } ~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - SwitchBthreadLocal::switch_back_pthread_local(); + ThreadLocalHandle::release_thread_local(); } private: @@ -328,24 +349,29 @@ class SwitchThreadMemTrackerLimiter { class TrackMemoryToUnknown { public: explicit TrackMemoryToUnknown() { - if (bthread_self() != 0) { - _tid = std::this_thread::get_id(); // save pthread id + if (is_thread_context_init()) { + if (bthread_self() != 0) { + _tid = std::this_thread::get_id(); // save pthread id + } + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( + ExecEnv::GetInstance()->orphan_mem_tracker(), TUniqueId()); } - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - ExecEnv::GetInstance()->orphan_mem_tracker(), TUniqueId()); } ~TrackMemoryToUnknown() { - if (bthread_self() != 0) { - // make sure pthread is not switch, if switch, mem tracker will be wrong, but not crash in release - DCHECK(_tid == std::this_thread::get_id()); + if (is_thread_context_init()) { + DCHECK(_old_mem_tracker != nullptr); + if (bthread_self() != 0) { + // make sure pthread is not switch, if switch, mem tracker will be wrong, but not crash in release + DCHECK(_tid == std::this_thread::get_id()); + } + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); } - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); } private: - std::shared_ptr _old_mem_tracker; + std::shared_ptr _old_mem_tracker = nullptr; std::thread::id _tid; }; @@ -367,43 +393,60 @@ class AddThreadMemTrackerConsumer { // Basic macros for mem tracker, usually do not need to be modified and used. #ifdef USE_MEM_TRACKER -// For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. -#define CONSUME_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(size) -#define RELEASE_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(-size) - // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ - size, tracker) -#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - tracker->transfer_to( \ - size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()) +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + do { \ + if (is_thread_context_init()) { \ + doris::thread_context() \ + ->thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ + ->transfer_to(size, tracker); \ + } else { \ + doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->transfer_to(size, tracker); \ + } \ + } while (0) + +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ + do { \ + if (is_thread_context_init()) { \ + tracker->transfer_to( \ + size, \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \ + } else { \ + tracker->transfer_to(size, doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()); \ + } \ + } while (0) // Mem Hook to consume thread mem tracker // TODO: In the original design, the MemTracker consume method is called before the memory is allocated. // If the consume succeeds, the memory is actually allocated, otherwise an exception is thrown. -// But the statistics of memory through TCMalloc new/delete Hook are after the memory is actually allocated, +// But the statistics of memory through new/delete Hook are after the memory is actually allocated, // which is different from the previous behavior. #define CONSUME_MEM_TRACKER(size) \ do { \ - if (doris::thread_context_ptr.init) { \ - doris::thread_context()->consume_memory(size); \ + if (doris::pthread_context_ptr_init) { \ + DCHECK(bthread_self() == 0); \ + DCHECK(doris::thread_context_ptr != nullptr); \ + DCHECK(bthread_equal(0, doris::bthread_id)); \ + doris::thread_context_ptr->consume_memory(size); \ + } else if (bthread_self() != 0) { \ + DCHECK(!doris::pthread_context_ptr_init); \ + if (!bthread_equal(bthread_self(), doris::bthread_id)) { \ + doris::bthread_id = bthread_self(); \ + doris::bthread_context = \ + static_cast(bthread_getspecific(doris::btls_key)); \ + } \ + if (doris::bthread_context == nullptr) { \ + doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak( \ + size); \ + } else { \ + doris::bthread_context->consume_memory(size); \ + } \ } else if (doris::ExecEnv::ready()) { \ doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ } \ } while (0) -#define RELEASE_MEM_TRACKER(size) \ - do { \ - if (doris::thread_context_ptr.init) { \ - doris::thread_context()->consume_memory(-size); \ - } else if (doris::ExecEnv::ready()) { \ - doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak( \ - -size); \ - } \ - } while (0) +#define RELEASE_MEM_TRACKER(size) CONSUME_MEM_TRACKER(-size) #else -#define CONSUME_THREAD_MEM_TRACKER(size) (void)0 -#define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 #define CONSUME_MEM_TRACKER(size) (void)0 diff --git a/be/src/runtime/threadlocal.cc b/be/src/runtime/threadlocal.cc deleted file mode 100644 index 69b57607730c750..000000000000000 --- a/be/src/runtime/threadlocal.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "runtime/threadlocal.h" - -#include - -#include -#include -#include -#include - -#include "common/logging.h" -#include "util/errno.h" - -namespace doris { - -// One key used by the entire process to attach destructors on thread exit. -static pthread_key_t destructors_key; - -// The above key must only be initialized once per process. -static std::once_flag once; - -namespace { - -// List of destructors for all thread locals instantiated on a given thread. -struct PerThreadDestructorList { - void (*destructor)(void*); - void* arg; - PerThreadDestructorList* next; -}; - -} // anonymous namespace - -// Call all the destructors associated with all THREAD_LOCAL instances in this -// thread. -static void invoke_destructors(void* t) { - PerThreadDestructorList* d = reinterpret_cast(t); - while (d != nullptr) { - d->destructor(d->arg); - PerThreadDestructorList* next = d->next; - delete d; - d = next; - } -} - -// This key must be initialized only once. -static void create_key() { - int ret = pthread_key_create(&destructors_key, &invoke_destructors); - // Linux supports up to 1024 keys, we will use only one for all thread locals. - CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: " - << "error " << ret << ": " << errno_to_string(ret); -} - -// Adds a destructor to the list. -void add_destructor(void (*destructor)(void*), void* arg) { - std::call_once(once, create_key); - - // Returns NULL if nothing is set yet. - std::unique_ptr p(new PerThreadDestructorList()); - p->destructor = destructor; - p->arg = arg; - p->next = reinterpret_cast(pthread_getspecific(destructors_key)); - int ret = pthread_setspecific(destructors_key, p.release()); - // The only time this check should fail is if we are out of memory, or if - // somehow key creation failed, which should be caught by the above CHECK. - CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: " - << "error " << ret << ": " << errno_to_string(ret); -} - -} // namespace doris diff --git a/be/src/runtime/threadlocal.h b/be/src/runtime/threadlocal.h deleted file mode 100644 index 3842b2fcad6f755..000000000000000 --- a/be/src/runtime/threadlocal.h +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Reference from kudu, Solve the problem of gcc11 compiling -// non-trivial thread_local variables on lower versions of GLIBC. -// see https://github.com/apache/doris/pull/7911 -// -// Block-scoped static thread local implementation. -// -// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro -// defines a thread-local pointer to the specified type, which is lazily -// instantiated by any thread entering the block for the first time. The -// constructor for the type T is invoked at macro execution time, as expected, -// and its destructor is invoked when the corresponding thread's Runnable -// returns, or when the thread exits. -// -// Inspired by Poco , -// Andrew Tomazos , and -// the C++11 thread_local API. -// -// Example usage: -// -// // Invokes a 3-arg constructor on SomeClass: -// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3); -// instance->DoSomething(); -// - -#pragma once - -#include - -#include "gutil/port.h" - -#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \ - static __thread T* t; \ - do { \ - if (PREDICT_FALSE(t == NULL)) { \ - t = new T(__VA_ARGS__); \ - add_destructor(destroy, t); \ - } \ - } while (false) - -// Class-scoped static thread local implementation. -// -// Very similar in implementation to the above block-scoped version, but -// requires a bit more syntax and vigilance to use properly. -// -// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the -// class header, as usual for variable declarations. -// -// Because these variables are static, they must also be defined in the impl -// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_), -// which is very much like defining any static member, i.e. int Foo::member_. -// -// Finally, each thread must initialize the instance before using it by calling -// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap -// call, and may be invoked at the top of any method which may reference a -// thread-local variable. -// -// Due to all of these requirements, you should probably declare TLS members -// as private. -// -// Example usage: -// -// // foo.h -// #include "kudu/utils/file.h" -// class Foo { -// public: -// void DoSomething(std::string s); -// private: -// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_); -// }; -// -// // foo.cc -// #include "kudu/foo.h" -// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_); -// void Foo::WriteToFile(std::string s) { -// // Call constructor if necessary. -// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt"); -// file_->Write(s); -// } - -// Goes in the class declaration (usually in a header file). -// dtor must be destructed _after_ t, so it gets defined first. -// Uses a mangled variable name for dtor since it must also be a member of the -// class. -#define DECLARE_STATIC_THREAD_LOCAL(T, t) static __thread T* t - -// You must also define the instance in the .cc file. -#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) __thread T* Class::t - -// Must be invoked at least once by each thread that will access t. -#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \ - do { \ - if (PREDICT_FALSE(t == NULL)) { \ - t = new T(__VA_ARGS__); \ - add_destructor(destroy, t); \ - } \ - } while (false) - -// Internal implementation below. - -namespace doris { - -// Add a destructor to the list. -void add_destructor(void (*destructor)(void*), void* arg); - -// Destroy the passed object of type T. -template -void destroy(void* t) { - // With tcmalloc, this should be pretty cheap (same thread as new). - delete reinterpret_cast(t); -} - -} // namespace doris diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 33fe22b12e77469..a7c14480c24ca7b 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -480,6 +480,8 @@ int main(int argc, char** argv) { exit(-1); } + doris::ThreadLocalHandle::handle_thread_local(); + // init exec env auto exec_env(doris::ExecEnv::GetInstance()); status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, broken_paths); @@ -583,6 +585,7 @@ int main(int argc, char** argv) { brpc_service.reset(nullptr); LOG(INFO) << "Brpc service stopped"; exec_env->destroy(); + doris::ThreadLocalHandle::release_thread_local(); LOG(INFO) << "Doris main exited."; return 0; } diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index d2fbd2fd14e863c..5145f20ee813710 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -39,7 +39,6 @@ class RefCountClosure : public google::protobuf::Closure { bool unref() { return _refs.fetch_sub(1) == 1; } void Run() override { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); if (unref()) { delete this; } diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp index cf7bd63a878b612..f295c7904a022f5 100644 --- a/be/src/util/thread.cpp +++ b/be/src/util/thread.cpp @@ -59,6 +59,7 @@ #include "gutil/stringprintf.h" #include "gutil/strings/substitute.h" #include "http/web_page_handler.h" +#include "runtime/thread_context.h" #include "util/debug/sanitizer_scopes.h" #include "util/easy_json.h" #include "util/os_util.h" @@ -479,6 +480,9 @@ void* Thread::supervise_thread(void* arg) { // already incremented the reference count in StartThread. Thread::_tls = t; + // Create thread context, there is no need to create it when func is executed. + ThreadLocalHandle::handle_thread_local(); + // Publish our tid to '_tid', which unblocks any callers waiting in // WaitForTid(). Release_Store(&t->_tid, system_tid); @@ -514,6 +518,8 @@ void Thread::finish_thread(void* arg) { // NOTE: the above 'Release' call could be the last reference to 'this', // so 'this' could be destructed at this point. Do not add any code // following here! + + ThreadLocalHandle::release_thread_local(); } void Thread::init_threadmgr() { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index b4025807be6ed36..2040455be0b4d74 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -38,7 +38,9 @@ template void Allocator::sys_memory_check(size_t size) const { - if (doris::thread_context()->skip_memory_check) return; + if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check) { + return; + } if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. @@ -46,10 +48,19 @@ void Allocator::sys_memory_check(size_t auto err_msg = fmt::format( "Allocator sys memory check failed: Cannot alloc:{}, consuming " "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker()->peak_consumption(), - doris::thread_context()->thread_mem_tracker()->consumption(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + size, + doris::is_thread_context_init() + ? doris::thread_context()->thread_mem_tracker()->label() + : "Orphan", + doris::is_thread_context_init() + ? doris::thread_context()->thread_mem_tracker()->peak_consumption() + : "", + doris::is_thread_context_init() + ? doris::thread_context()->thread_mem_tracker()->consumption() + : "", + doris::is_thread_context_init() + ? doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker() + : "", doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc && !doris::config::disable_memory_gc) { // 1G @@ -57,14 +68,15 @@ void Allocator::sys_memory_check(size_t } // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr. - if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( + if (doris::is_thread_context_init() && + doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( doris::thread_context()->task_id())) { if (doris::enable_thread_catch_bad_alloc) { throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } return; } - if (!doris::config::disable_memory_gc && + if (doris::is_thread_context_init() && !doris::config::disable_memory_gc && doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = 0; @@ -120,7 +132,9 @@ void Allocator::sys_memory_check(size_t template void Allocator::memory_tracker_check(size_t size) const { - if (doris::thread_context()->skip_memory_check) return; + if (doris::is_thread_context_init() && doris::thread_context()->skip_memory_check) { + return; + } auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); @@ -155,12 +169,12 @@ void Allocator::memory_check(size_t size template void Allocator::consume_memory(size_t size) const { - CONSUME_THREAD_MEM_TRACKER(size); + CONSUME_MEM_TRACKER(size); } template void Allocator::release_memory(size_t size) const { - RELEASE_THREAD_MEM_TRACKER(size); + RELEASE_MEM_TRACKER(size); } template diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index ba1d6c8c330d5c9..2b33c116f9f5a22 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -78,7 +78,6 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" #include "service/backend_options.h" #include "service/brpc.h" #include "util/binary_cast.hpp" diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 6dd65bf2ece81f5..321e54aafb7d921 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -39,6 +39,7 @@ #include "util/mem_info.h" int main(int argc, char** argv) { + doris::ThreadLocalHandle::handle_thread_local(); doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());