From bbd2b6a85f86a16bd70df3af5920176a4dc68a9b Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 12 Sep 2024 21:43:55 +0800 Subject: [PATCH] 2 --- be/src/runtime/memory/mem_tracker.cpp | 14 ++++++-------- be/src/runtime/memory/mem_tracker.h | 4 ++-- be/src/runtime/memory/mem_tracker_limiter.cpp | 18 +++++++++--------- be/src/runtime/memory/mem_tracker_limiter.h | 4 +--- .../runtime/memory/thread_mem_tracker_mgr.cpp | 10 ++++++---- be/src/runtime/memory/thread_mem_tracker_mgr.h | 8 +++++++- .../routine_load_task_executor.cpp | 3 +-- 7 files changed, 32 insertions(+), 29 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 26378a0049f9468..721dd0faa43392b 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -28,33 +28,31 @@ namespace doris { constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000; +std::atomic mem_tracker_group_counter(0); bvar::Adder g_memtracker_cnt("memtracker_cnt"); std::vector MemTracker::mem_tracker_pool(MEM_TRACKERS_GROUP_NUM); -MemTracker::MemTracker() { - g_memtracker_cnt << 1; -} - -MemTracker::MemTracker(const std::string& label) : MemTracker() { +MemTracker::MemTracker(const std::string& label) { _label = label; - _group_num = random() % MEM_TRACKERS_GROUP_NUM; + _group_num = mem_tracker_group_counter.fetch_add(1) % MEM_TRACKERS_GROUP_NUM; { std::lock_guard l(mem_tracker_pool[_group_num].group_lock); _trackers_group_it = mem_tracker_pool[_group_num].trackers.insert( mem_tracker_pool[_group_num].trackers.end(), this); } + g_memtracker_cnt << 1; } MemTracker::~MemTracker() { - { + if (_group_num != -1) { std::lock_guard l(mem_tracker_pool[_group_num].group_lock); if (_trackers_group_it != mem_tracker_pool[_group_num].trackers.end()) { mem_tracker_pool[_group_num].trackers.erase(_trackers_group_it); _trackers_group_it = mem_tracker_pool[_group_num].trackers.end(); } + g_memtracker_cnt << -1; } - g_memtracker_cnt << -1; } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 005f5b50bb51622..7afd16410d74741 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -41,7 +41,7 @@ namespace doris { */ class MemTracker { public: - MemTracker(); + MemTracker() = default; MemTracker(const std::string& label); ~MemTracker(); @@ -120,7 +120,7 @@ class MemTracker { // Multiple groups are used to reduce the impact of locks. static std::vector mem_tracker_pool; // Group number in mem_tracker_pool, generated by the timestamp. - int64_t _group_num; + int64_t _group_num {-1}; // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. std::list::iterator _trackers_group_it; }; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index ba2bbd5e1a3779f..687f8fc40184191 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -50,6 +50,7 @@ static bvar::Adder memory_schema_change_trackers_sum_bytes( "memory_schema_change_trackers_sum_bytes"); static bvar::Adder memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes"); +std::atomic mem_tracker_limiter_group_counter(0); constexpr auto GC_MAX_SEEK_TRACKER = 1000; std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; @@ -78,7 +79,8 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ if (_type == Type::GLOBAL) { _group_num = 0; } else { - _group_num = random() % MEM_TRACKER_GROUP_NUM + 1; + _group_num = + mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 1) + 1; } // currently only select/load need runtime query statistics @@ -239,7 +241,7 @@ void MemTrackerLimiter::refresh_global_counter() { } int64_t all_trackers_mem_sum = 0; for (auto it : type_mem_sum) { - TypeMemSum[static_cast(it.first)].set_consumption(it.second); + MemTrackerLimiter::TypeMemSum[it.first].set_consumption(it.second); all_trackers_mem_sum += it.second; switch (it.first) { @@ -266,8 +268,6 @@ void MemTrackerLimiter::refresh_global_counter() { case Type::OTHER: memory_other_trackers_sum_bytes << it.second - memory_other_trackers_sum_bytes.get_value(); - case Type::TYPE_COUNT: - break; } } all_trackers_mem_sum += MemInfo::allocator_cache_mem(); @@ -298,14 +298,14 @@ void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) MemTrackerLimiter::refresh_global_counter(); int64_t all_trackers_mem_sum = 0; Snapshot snapshot; - for (size_t i = 0; i < static_cast(Type::TYPE_COUNT); ++i) { + for (const auto& it : MemTrackerLimiter::TypeMemSum) { snapshot.type = "overview"; - snapshot.label = type_string(static_cast(i)); + snapshot.label = type_string(it.first); snapshot.limit = -1; - snapshot.cur_consumption = TypeMemSum[i].consumption(); - snapshot.peak_consumption = TypeMemSum[i].peak_consumption(); + snapshot.cur_consumption = it.second.consumption(); + snapshot.peak_consumption = it.second.peak_consumption(); (*snapshots).emplace_back(snapshot); - all_trackers_mem_sum += TypeMemSum[i].consumption(); + all_trackers_mem_sum += it.second.consumption(); } snapshot.type = "overview"; diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index ef5b63aeef51e5d..8178c7cbd1a5703 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -81,13 +81,11 @@ class MemTrackerLimiter final { COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. OTHER = 5, - TYPE_COUNT }; // Corresponding to MemTrackerLimiter::Type. // MemTracker contains atomic variables, which are not allowed to be copied or moved. - inline static MemTracker TypeMemSum[] = {MemTracker(), MemTracker(), MemTracker(), - MemTracker(), MemTracker(), MemTracker()}; + inline static std::unordered_map TypeMemSum; struct Snapshot { std::string type; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 33dd0d41822ae17..d036564528534ca 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -46,7 +46,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( DCHECK(mem_tracker); CHECK(init()); flush_untracked_mem(); - _reserved_mem_stack.push_back(_reserved_mem); + _last_attach_snapshots_stack.push_back({_reserved_mem, _consumer_tracker_stack}); if (_reserved_mem != 0) { // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. @@ -54,6 +54,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _reserved_mem = 0; _untracked_mem = 0; } + _consumer_tracker_stack.clear(); _limiter_tracker = mem_tracker; } @@ -62,9 +63,10 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( CHECK(init()); flush_untracked_mem(); release_reserved(); - DCHECK(!_reserved_mem_stack.empty()); - _reserved_mem = _reserved_mem_stack.back(); - _reserved_mem_stack.pop_back(); + DCHECK(!_last_attach_snapshots_stack.empty()); + _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; + _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; + _last_attach_snapshots_stack.pop_back(); _limiter_tracker = old_mem_tracker; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 4f67e1ea176dfdf..7f6fc119a5ff847 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -119,6 +119,11 @@ class ThreadMemTrackerMgr { int64_t reserved_mem() const { return _reserved_mem; } private: + struct LastAttachSnapshot { + int64_t reserved_mem = 0; + std::vector consumer_tracker_stack; + }; + // is false: ExecEnv::ready() = false when thread local is initialized bool _init = false; // Cache untracked mem. @@ -126,9 +131,10 @@ class ThreadMemTrackerMgr { int64_t _old_untracked_mem = 0; int64_t _reserved_mem = 0; + // SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used, // so `attach_limiter_tracker` may be nested. - std::vector _reserved_mem_stack; + std::vector _last_attach_snapshots_stack; std::string _failed_consume_msg = std::string(); // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 615c2343c5ba685..e64bc419f010338 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -315,8 +315,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { bool RoutineLoadTaskExecutor::_reach_memory_limit() { bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); auto current_load_mem_value = - MemTrackerLimiter::TypeMemSum[static_cast(MemTrackerLimiter::Type::LOAD)] - .consumption(); + MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD].consumption(); if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit << " current_load_mem_value: " << current_load_mem_value