From 986350b9b1df6bff60f84de28140235d2457d96a Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 16 Sep 2024 23:09:07 +0800 Subject: [PATCH] [fix](memory) Refactor MemCounter (#40542) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Add a new class MemCounter. 2. MemTracker and MemTrackerLimiter no longer have inheritance and dependencies. 3. MemTrackerLimiter is used to count process memory, the BE web page `/mem_tracker` also only displays MemTrackerLimiter. 4. MemTracker is used to count the memory of operators and some data structures. It is not counted in the process memory and is used for logic control and profile. In addition, it seems that the crash is caused by memory abnormality, not sure whether this PR can fix the problem, but this will help locate the problem and more elegant. ``` ==6641==ERROR: AddressSanitizer: heap-use-after-free on address 0x603000980e70 at pc 0x559be21880be bp 0x7fd7cfc75070 sp 0x7fd7cfc75068 READ of size 8 at 0x603000980e70 thread T1453 (memory_maintena) #0 0x559be21880bd in std::__atomic_base::load(std::memory_order) const /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:481:9 #1 0x559be21880bd in doris::MemTracker::MemCounter::current_value() const /home/zcp/repo_center/doris_master/doris/be/src/runtime/memory/mem_tracker.h:139:63 #2 0x559be21880bd in doris::MemTracker::consumption() const /home/zcp/repo_center/doris_master/doris/be/src/runtime/memory/mem_tracker.h:165:56 #3 0x559be3985133 in doris::MemTrackerLimiter::refresh_global_counter() /home/zcp/repo_center/doris_master/doris/be/src/runtime/memory/mem_tracker_limiter.cpp:245:59 #4 0x559bdfcd52d5 in doris::Daemon::memory_maintenance_thread() /home/zcp/repo_center/doris_master/doris/be/src/common/daemon.cpp:239:13 #5 0x559be40e6c17 in doris::Thread::supervise_thread(void*) /home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:498:5 #6 0x7fdf563e3ac2 in start_thread nptl/pthread_create.c:442:8 #7 0x7fdf5647584f misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 0x603000980e74 is located 0 bytes after 20-byte region [0x603000980e60,0x603000980e74) 04:19:48  freed by thread T1491 (Scan_normal [wo) here: 04:19:48  #0 0x559bdfb0dd9d in operator delete(void*) (/mnt/hdd01/ci/master-deploy/be/lib/doris_be+0x33546d9d) (BuildId: f170e92ad3c55512) 04:19:48  #1 0x559bdfb1d42b in __gnu_cxx::new_allocator::deallocate(char*, unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/new_allocator.h:139:2 04:19:48  #2 0x559bdfb1d42b in std::allocator::deallocate(char*, unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/allocator.h:187:27 04:19:48  #3 0x559bdfb1d42b in std::allocator_traits>::deallocate(std::allocator&, char*, unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:492:13 04:19:48  #4 0x559bdfb1d42b in std::__cxx11::basic_string, std::allocator>::_M_destroy(unsigned long) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:237:9 04:19:48  #5 0x559bdfb1d42b in std::__cxx11::basic_string, std::allocator>::_M_dispose() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:232:4 04:19:48  #6 0x559bdfb1d42b in std::__cxx11::basic_string, std::allocator>::~basic_string() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:658:9 04:19:48  #7 0x559bdfb22501 in void std::destroy_at, std::allocator>>(std::__cxx11::basic_string, std::allocator>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15 04:19:48  #8 0x559bdfb22501 in void std::_Destroy, std::allocator>>(std::__cxx11::basic_string, std::allocator>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7 04:19:48  #9 0x559bdfb22501 in void std::_Destroy_aux::__destroy, std::allocator>*>(std::__cxx11::basic_string, std::allocator>*, std::__cxx11::basic_string, std::allocator>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6 04:19:48  #10 0x559bdfb22501 in void std::_Destroy, std::allocator>*>(std::__cxx11::basic_string, std::allocator>*, std::__cxx11::basic_string, std::allocator>*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7 04:19:48  #11 0x559bdfb22501 in void std::_Destroy, std::allocator>*, std::__cxx11::basic_string, std::allocator>>(std::__cxx11::basic_string, std::allocator>*, std::__cxx11::basic_string, std::allocator>*, std::allocator, std::allocator>>&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7 04:19:48  #12 0x559bdfb22501 in std::vector, std::allocator>, std::allocator, std::allocator>>>::~vector() /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:680:2 04:19:48  #13 0x559be5515262 in doris::TCondition::~TCondition() /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/PaloInternalService_types.cpp:9867:1 04:19:48  #14 0x559be5515262 in doris::TCondition::~TCondition() /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/PaloInternalService_types.cpp:9866:36 04:19:48  #15 0x559be0047ee6 in doris::Status doris::DeleteHandler::_parse_column_pred(std::shared_ptr, std::shared_ptr, google::protobuf::RepeatedPtrField const&, doris::DeleteConditions*) /home/zcp/repo_center/doris_master/doris/be/src/olap/delete_handler.cpp:372:5 04:19:48  #16 0x559be003ba59 in doris::DeleteHandler::init(std::shared_ptr, std::vector, std::allocator>> const&, long, bool) /home/zcp/repo_center/doris_master/doris/be/src/olap/delete_handler.cpp:404:13 04:19:48  #17 0x559be30ed6e3 in doris::TabletReader::_init_delete_condition(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:636:28 04:19:48  #18 0x559be30e23d1 in doris::TabletReader::_init_params(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:290:18 04:19:48  #19 0x559be30e1592 in doris::TabletReader::init(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:124:18 04:19:48  #20 0x559c129840c8 in doris::vectorized::BlockReader::init(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/vec/olap/block_reader.cpp:210:5 04:19:48  #21 0x559c15a4ea09 in doris::vectorized::NewOlapScanner::open(doris::RuntimeState*) /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/new_olap_scanner.cpp:232:32 04:19:48  #22 0x559bfb4acd94 in doris::vectorized::ScannerScheduler::_scanner_scan(std::shared_ptr, std::shared_ptr) /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:236:5 04:19:48  #23 0x559bfb4b10be in doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()::operator()() const::'lambda'()::operator()() const /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:176:21 04:19:48  #24 0x559bfb4b10be in doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()::operator()() const /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:175:31 04:19:48  #25 0x559bfb4b10be in void std::__invoke_impl, std::shared_ptr)::$_1::operator()() const::'lambda'()&>(std::__invoke_other, doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 04:19:48  #26 0x559bfb4b10be in std::enable_if, std::shared_ptr)::$_1::operator()() const::'lambda'()&>, void>::type std::__invoke_r, std::shared_ptr)::$_1::operator()() const::'lambda'()&>(doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:111:2 04:19:48  #27 0x559bfb4b10be in std::_Function_handler, std::shared_ptr)::$_1::operator()() const::'lambda'()>::_M_invoke(std::_Any_data const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291:9 04:19:48  #28 0x559be410ed1b in doris::ThreadPool::dispatch_thread() /home/zcp/repo_center/doris_master/doris/be/src/util/threadpool.cpp:543:24 04:19:48  #29 0x559be40e6c17 in doris::Thread::supervise_thread(void*) /home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:498:5 04:19:48  #30 0x7fdf563e3ac2 in start_thread nptl/pthread_create.c:442:8 04:19:48  04:19:48  previously allocated by thread T1491 (Scan_normal [wo) here: 04:19:48  #0 0x559bdfb0d53d in operator new(unsigned long) (/mnt/hdd01/ci/master-deploy/be/lib/doris_be+0x3354653d) (BuildId: f170e92ad3c55512) 04:19:48  #1 0x559bdfb33f74 in void std::__cxx11::basic_string, std::allocator>::_M_construct(char*, char*, std::forward_iterator_tag) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.tcc:219:14 04:19:48  #2 0x559bdfb32705 in void std::__cxx11::basic_string, std::allocator>::_M_construct_aux(char*, char*, std::__false_type) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:247:11 04:19:48  #3 0x559bdfb32705 in void std::__cxx11::basic_string, std::allocator>::_M_construct(char*, char*) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:266:4 04:19:48  #4 0x559bdfb32705 in std::__cxx11::basic_string, std::allocator>::basic_string(std::__cxx11::basic_string, std::allocator> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:451:9 04:19:48  #5 0x559bdfc326e6 in decltype(::new((void*)(0)) std::__cxx11::basic_string, std::allocator>(std::declval, std::allocator> const&>())) std::construct_at, std::allocator>, std::__cxx11::basic_string, std::allocator> const&>(std::__cxx11::basic_string, std::allocator>*, std::__cxx11::basic_string, std::allocator> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:97:39 04:19:48  #6 0x559bdfc326e6 in void std::allocator_traits, std::allocator>>>::construct, std::allocator>, std::__cxx11::basic_string, std::allocator> const&>(std::allocator, std::allocator>>&, std::__cxx11::basic_string, std::allocator>*, std::__cxx11::basic_string, std::allocator> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:514:4 04:19:48  #7 0x559bdfc326e6 in void std::vector, std::allocator>, std::allocator, std::allocator>>>::_M_realloc_insert, std::allocator> const&>(__gnu_cxx::__normal_iterator, std::allocator>*, std::vector, std::allocator>, std::allocator, std::allocator>>>>, std::__cxx11::basic_string, std::allocator> const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/vector.tcc:449:4 04:19:48  #8 0x559be0032206 in doris::DeleteHandler::parse_condition(doris::DeleteSubPredicatePB const&, doris::TCondition*) /home/zcp/repo_center/doris_master/doris/be/src/olap/delete_handler.cpp:300:33 04:19:48  #9 0x559be0047aa3 in doris::Status doris::DeleteHandler::_parse_column_pred(std::shared_ptr, std::shared_ptr, google::protobuf::RepeatedPtrField const&, doris::DeleteConditions*) /home/zcp/repo_center/doris_master/doris/be/src/olap/delete_handler.cpp:355:9 04:19:48  #10 0x559be003ba59 in doris::DeleteHandler::init(std::shared_ptr, std::vector, std::allocator>> const&, long, bool) /home/zcp/repo_center/doris_master/doris/be/src/olap/delete_handler.cpp:404:13 04:19:48  #11 0x559be30ed6e3 in doris::TabletReader::_init_delete_condition(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:636:28 04:19:48  #12 0x559be30e23d1 in doris::TabletReader::_init_params(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:290:18 04:19:48  #13 0x559be30e1592 in doris::TabletReader::init(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/olap/tablet_reader.cpp:124:18 04:19:48  #14 0x559c129840c8 in doris::vectorized::BlockReader::init(doris::TabletReader::ReaderParams const&) /home/zcp/repo_center/doris_master/doris/be/src/vec/olap/block_reader.cpp:210:5 04:19:48  #15 0x559c15a4ea09 in doris::vectorized::NewOlapScanner::open(doris::RuntimeState*) /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/new_olap_scanner.cpp:232:32 04:19:48  #16 0x559bfb4acd94 in doris::vectorized::ScannerScheduler::_scanner_scan(std::shared_ptr, std::shared_ptr) /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:236:5 04:19:48  #17 0x559bfb4b10be in doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()::operator()() const::'lambda'()::operator()() const /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:176:21 04:19:48  #18 0x559bfb4b10be in doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()::operator()() const /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:175:31 04:19:48  #19 0x559bfb4b10be in void std::__invoke_impl, std::shared_ptr)::$_1::operator()() const::'lambda'()&>(std::__invoke_other, doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:61:14 04:19:48  #20 0x559bfb4b10be in std::enable_if, std::shared_ptr)::$_1::operator()() const::'lambda'()&>, void>::type std::__invoke_r, std::shared_ptr)::$_1::operator()() const::'lambda'()&>(doris::vectorized::ScannerScheduler::submit(std::shared_ptr, std::shared_ptr)::$_1::operator()() const::'lambda'()&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/invoke.h:111:2 04:19:48  #21 0x559bfb4b10be in std::_Function_handler, std::shared_ptr)::$_1::operator()() const::'lambda'()>::_M_invoke(std::_Any_data const&) /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291:9 04:19:48  #22 0x559be410ed1b in doris::ThreadPool::dispatch_thread() /home/zcp/repo_center/doris_master/doris/be/src/util/threadpool.cpp:543:24 04:19:48  #23 0x559be40e6c17 in doris::Thread::supervise_thread(void*) /home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:498:5 04:19:48  #24 0x7fdf563e3ac2 in start_thread nptl/pthread_create.c:442:8 ``` --- be/src/cloud/cloud_tablet_mgr.cpp | 2 +- .../cloud/cloud_txn_delete_bitmap_cache.cpp | 4 +- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 2 +- be/src/http/default_path_handlers.cpp | 9 +- be/src/olap/memtable.cpp | 5 +- be/src/olap/memtable_memory_limiter.cpp | 5 +- be/src/olap/memtable_memory_limiter.h | 5 +- be/src/olap/memtable_writer.cpp | 13 - be/src/olap/page_cache.h | 26 +- .../segment_v2/inverted_index_cache.cpp | 6 +- .../rowset/segment_v2/inverted_index_cache.h | 31 +- be/src/olap/schema_cache.h | 7 +- be/src/olap/segment_loader.cpp | 6 +- be/src/olap/segment_loader.h | 11 +- be/src/olap/storage_engine.h | 8 +- be/src/olap/tablet_manager.cpp | 3 +- be/src/olap/tablet_meta.h | 9 +- be/src/olap/tablet_schema_cache.cpp | 4 +- be/src/olap/tablet_schema_cache.h | 9 +- be/src/olap/txn_manager.h | 9 +- be/src/runtime/exec_env.h | 9 +- be/src/runtime/exec_env_init.cpp | 7 +- be/src/runtime/load_channel_mgr.h | 9 +- .../memory/global_memory_arbitrator.cpp | 20 -- .../runtime/memory/global_memory_arbitrator.h | 19 +- be/src/runtime/memory/lru_cache_policy.h | 153 +++------- be/src/runtime/memory/lru_cache_value_base.h | 7 +- be/src/runtime/memory/mem_counter.h | 95 ++++++ be/src/runtime/memory/mem_tracker.cpp | 91 +----- be/src/runtime/memory/mem_tracker.h | 233 +++----------- be/src/runtime/memory/mem_tracker_limiter.cpp | 157 +++++----- be/src/runtime/memory/mem_tracker_limiter.h | 287 ++++++++++++------ .../runtime/memory/thread_mem_tracker_mgr.cpp | 10 +- .../runtime/memory/thread_mem_tracker_mgr.h | 21 +- be/src/runtime/query_context.cpp | 6 +- .../routine_load_task_executor.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 6 +- .../runtime/workload_group/workload_group.cpp | 16 +- .../workload_group/workload_group_manager.cpp | 4 +- be/src/service/point_query_executor.cpp | 10 +- be/src/service/point_query_executor.h | 12 +- be/src/util/obj_lru_cache.cpp | 6 +- be/src/util/obj_lru_cache.h | 8 +- be/src/vec/common/allocator.cpp | 2 - be/src/vec/common/allocator.h | 19 +- be/test/olap/lru_cache_test.cpp | 12 +- .../memory/thread_mem_tracker_mgr_test.cpp | 4 +- 47 files changed, 616 insertions(+), 783 deletions(-) create mode 100644 be/src/runtime/memory/mem_counter.h diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 0fe050d02dbd3d..e5c31785c1eb1c 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -136,7 +136,7 @@ class CloudTabletMgr::TabletMap { CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) : _engine(engine), _tablet_map(std::make_unique()), - _cache(std::make_unique( + _cache(std::make_unique( CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 84ea82b1bce016..55f76316fb7f26 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -33,8 +33,8 @@ namespace doris { CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, - size_in_bytes, LRUCacheType::SIZE, 86400, 4), + : LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes, + LRUCacheType::SIZE, 86400, 4), _stop_latch(1) {} CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index db5f8867263168..91a0531c60ae04 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -30,7 +30,7 @@ namespace doris { // Record transaction related delete bitmaps using a lru cache. -class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { +class CloudTxnDeleteBitmapCache : public LRUCachePolicy { public: CloudTxnDeleteBitmapCache(size_t size_in_bytes); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index ded72d9f28f7df..2ece1e3fdcd20a 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -142,7 +142,7 @@ void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson* // Registered to handle "/mem_tracker", and prints out memory tracker information. void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) { (*output) << "

Memory usage by subsystem

\n"; - std::vector snapshots; + std::vector snapshots; auto iter = args.find("type"); if (iter != args.end()) { if (iter->second == "global") { @@ -159,7 +159,7 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr } else if (iter->second == "other") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER); } else if (iter->second == "reserved_memory") { - GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots); + MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots); } else if (iter->second == "all") { MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots); } @@ -191,7 +191,6 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr (*output) << "" "Type" "Label" - "Parent Label" "Limit" "Current Consumption(Bytes)" @@ -207,8 +206,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr string peak_consumption_normalize = AccurateItoaKMGT(item.peak_consumption); (*output) << strings::Substitute( "$0$1$2$3$4$5$6$7\n", - item.type, item.label, item.parent_label, limit_str, item.cur_consumption, + "td>\n", + item.type, item.label, limit_str, item.cur_consumption, current_consumption_normalize, item.peak_consumption, peak_consumption_normalize); } (*output) << "\n"; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a7a90851192944..0040e00ffc9819 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -161,9 +161,8 @@ MemTable::~MemTable() { std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete()); _insert_mem_tracker->release(_mem_usage); _flush_mem_tracker->set_consumption(0); - DCHECK_EQ(_insert_mem_tracker->consumption(), 0) - << std::endl - << MemTracker::log_usage(_insert_mem_tracker->make_snapshot()); + DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl + << _insert_mem_tracker->log_usage(); DCHECK_EQ(_flush_mem_tracker->consumption(), 0); _arena.reset(); _agg_buffer_pool.clear(); diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 23b760284b8985..ea045b1e53e30a 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -62,10 +62,7 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100; g_load_hard_mem_limit.set_value(_load_hard_mem_limit); g_load_soft_mem_limit.set_value(_load_soft_mem_limit); - _memtable_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "MemTableTrackerSet"); - _mem_tracker = std::make_unique("AllMemTableMemory", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_unique("AllMemTableMemory"); REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _log_timer.start(); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 2e8271bab35c15..66f5fb2a8d0c20 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -20,7 +20,7 @@ #include #include "common/status.h" -#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/memory/mem_tracker.h" #include "util/countdown_latch.h" #include "util/stopwatch.hpp" @@ -45,7 +45,6 @@ class MemTableMemoryLimiter { void refresh_mem_tracker(); - MemTrackerLimiter* memtable_tracker_set() { return _memtable_tracker_set.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } int64_t mem_usage() const { return _mem_usage; } @@ -68,8 +67,6 @@ class MemTableMemoryLimiter { int64_t _write_mem_usage = 0; int64_t _active_mem_usage = 0; - // mem tracker collection of all mem tables. - std::shared_ptr _memtable_tracker_set; // sum of all mem table memory. std::unique_ptr _mem_tracker; int64_t _load_hard_mem_limit = -1; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 114a7841b92204..59916d5f1cc57b 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -187,25 +187,12 @@ Status MemTableWriter::wait_flush() { } void MemTableWriter::_reset_mem_table() { -#ifndef BE_TEST - auto mem_table_insert_tracker = std::make_shared( - fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); - auto mem_table_flush_tracker = std::make_shared( - fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num++, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); -#else auto mem_table_insert_tracker = std::make_shared(fmt::format( "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string())); auto mem_table_flush_tracker = std::make_shared(fmt::format( "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, UniqueId(_req.load_id).to_string())); -#endif { std::lock_guard l(_mem_table_tracker_lock); _mem_table_insert_trackers.push_back(mem_table_insert_tracker); diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 09fc689959ce4c..32b6683e7823b0 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -92,28 +92,28 @@ class StoragePageCache { } }; - class DataPageCache : public LRUCachePolicyTrackingAllocator { + class DataPageCache : public LRUCachePolicy { public: DataPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::data_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, + LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class IndexPageCache : public LRUCachePolicyTrackingAllocator { + class IndexPageCache : public LRUCachePolicy { public: IndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, LRUCacheType::SIZE, - config::index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, + LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class PKIndexPageCache : public LRUCachePolicyTrackingAllocator { + class PKIndexPageCache : public LRUCachePolicy { public: PKIndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, + LRUCacheType::SIZE, + config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} }; static constexpr uint32_t kDefaultNumShards = 16; @@ -164,7 +164,7 @@ class StoragePageCache { // delete bitmap in unique key with mow std::unique_ptr _pk_index_page_cache; - LRUCachePolicyTrackingAllocator* _get_page_cache(segment_v2::PageTypePB page_type) { + LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { return _data_page_cache.get(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index b2930d2867b05f..e42c02860f5d00 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -135,9 +135,9 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrgetSizeInBytes(), - bitmap->getSizeInBytes(), CachePriority::NORMAL); + auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), bitmap->getSizeInBytes(), + CachePriority::NORMAL); *handle = InvertedIndexQueryCacheHandle(this, lru_handle); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 5423ea044a2e58..b80f2c01027b6e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -99,23 +99,23 @@ class InvertedIndexSearcherCache { private: InvertedIndexSearcherCache() = default; - class InvertedIndexSearcherCachePolicy : public LRUCachePolicyTrackingManual { + class InvertedIndexSearcherCachePolicy : public LRUCachePolicy { public: InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, true) {} InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity, CacheValueTimeExtractor cache_value_time_extractor, bool cache_value_check_timestamp) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, - LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, cache_value_time_extractor, + cache_value_check_timestamp, true) {} }; // Insert a cache entry by key. // And the cache entry will be returned in handle. @@ -179,9 +179,9 @@ class InvertedIndexCacheHandle { class InvertedIndexQueryCacheHandle; -class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { +class InvertedIndexQueryCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // cache key struct CacheKey { @@ -227,10 +227,9 @@ class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { InvertedIndexQueryCache() = delete; InvertedIndexQueryCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity, + LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, + num_shards) {} bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle); diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index 7bb18a59c349a0..68cd809ed226f4 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr; // eliminating the need for frequent allocation and deallocation during usage. // This caching mechanism proves immensely advantageous, particularly in scenarios // with high concurrency, where queries are executed simultaneously. -class SchemaCache : public LRUCachePolicyTrackingManual { +class SchemaCache : public LRUCachePolicy { public: static SchemaCache* instance(); @@ -86,9 +86,8 @@ class SchemaCache : public LRUCachePolicyTrackingManual { }; SchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::schema_cache_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER, + config::schema_cache_sweep_time_sec) {} private: static constexpr char SCHEMA_DELIMITER = '-'; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 12ab89af0be283..fd7e3f476ad082 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -40,9 +40,9 @@ bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, SegmentCacheHandle* handle) { - auto* lru_handle = LRUCachePolicyTrackingManual::insert( - key.encode(), &value, value.segment->meta_mem_usage(), value.segment->meta_mem_usage(), - CachePriority::NORMAL); + auto* lru_handle = + LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(), + value.segment->meta_mem_usage(), CachePriority::NORMAL); handle->push_segment(this, lru_handle); } diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 5bb8fae3c41877..d177024242db33 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -55,9 +55,9 @@ class BetaRowset; // Make sure that cache_handle is valid during the segment usage period. using BetaRowsetSharedPtr = std::shared_ptr; -class SegmentCache : public LRUCachePolicyTrackingManual { +class SegmentCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key or segment lru cache struct CacheKey { CacheKey(RowsetId rowset_id_, int64_t segment_id_) @@ -81,10 +81,9 @@ class SegmentCache : public LRUCachePolicyTrackingManual { }; SegmentCache(size_t memory_bytes_limit, size_t segment_num_limit) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SEGMENT_CACHE, - memory_bytes_limit, LRUCacheType::SIZE, - config::tablet_rowset_stale_sweep_time_sec, - DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} + : LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, memory_bytes_limit, + LRUCacheType::SIZE, config::tablet_rowset_stale_sweep_time_sec, + DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} // Lookup the given segment in the cache. // If the segment is found, the cache entry will be written into handle. diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d7ccd4597d6ef3..b2a313adcdbb7e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -540,7 +540,7 @@ class StorageEngine final : public BaseStorageEngine { // lru cache for create tabelt round robin in disks // key: partitionId_medium // value: index -class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { +class CreateTabletIdxCache : public LRUCachePolicy { public: // get key, delimiter with DELIMITER '-' static std::string get_key(int64_t partition_id, TStorageMedium::type medium) { @@ -558,9 +558,9 @@ class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { }; CreateTabletIdxCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, - capacity, LRUCacheType::NUMBER, - /*stale_sweep_time_s*/ 30 * 60) {} + : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity, + LRUCacheType::NUMBER, + /*stale_sweep_time_s*/ 30 * 60) {} }; struct DirInfo { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 0e3fae2c379524..468a6b2fb126f0 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -90,8 +90,7 @@ bvar::Adder g_tablet_meta_schema_columns_count("tablet_meta_schema_colu TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) : _engine(engine), - _tablet_meta_mem_tracker(std::make_shared( - "TabletMeta(experimental)", ExecEnv::GetInstance()->details_mem_tracker_set())), + _tablet_meta_mem_tracker(std::make_shared("TabletMeta(experimental)")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 4afc55aa7e43ff..a453baf745d602 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -534,13 +534,12 @@ class DeleteBitmap { uint64_t get_delete_bitmap_count(); - class AggCachePolicy : public LRUCachePolicyTrackingManual { + class AggCachePolicy : public LRUCachePolicy { public: AggCachePolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, - capacity, LRUCacheType::SIZE, - config::delete_bitmap_agg_cache_stale_sweep_time_sec, - 256) {} + : LRUCachePolicy(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity, + LRUCacheType::SIZE, + config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {} }; class AggCache { diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 51618f590a7dd2..e339c947bb97a4 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -40,8 +40,8 @@ std::pair TabletSchemaCache::insert(const std: pb.ParseFromString(key); tablet_schema_ptr->init_from_pb(pb); value->tablet_schema = tablet_schema_ptr; - lru_handle = LRUCachePolicyTrackingManual::insert( - key, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL); + lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0, + CachePriority::NORMAL); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); } diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 10462804ed2012..e18892a3ca5f06 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -23,14 +23,13 @@ namespace doris { -class TabletSchemaCache : public LRUCachePolicyTrackingManual { +class TabletSchemaCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; TabletSchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::tablet_schema_cache_recycle_interval) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, + LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {} static TabletSchemaCache* create_global_schema_cache(size_t capacity) { auto* res = new TabletSchemaCache(capacity); diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5944bbf0fc3136..88ee97c5f6a3b9 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -282,13 +282,12 @@ class TxnManager { void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); - class TabletVersionCache : public LRUCachePolicyTrackingManual { + class TabletVersionCache : public LRUCachePolicy { public: TabletVersionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_VERSION_CACHE, - capacity, LRUCacheType::NUMBER, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, + LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; private: diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 98d82f274e746d..e77a1c7ae41980 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -77,6 +77,7 @@ class LoadPathMgr; class NewLoadStreamMgr; class MemTrackerLimiter; class MemTracker; +struct TrackerLimiterGroup; class BaseStorageEngine; class ResultBufferMgr; class ResultQueueMgr; @@ -173,9 +174,10 @@ class ExecEnv { std::vector mem_tracker_limiter_pool; void init_mem_tracker(); std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } - MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); } std::shared_ptr page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } - MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } + std::shared_ptr brpc_iobuf_block_memory_tracker() { + return _brpc_iobuf_block_memory_tracker; + } std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } @@ -354,10 +356,9 @@ class ExecEnv { // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; - std::shared_ptr _details_mem_tracker_set; // page size not in cache, data page/index page/etc. std::shared_ptr _page_no_cache_mem_tracker; - std::shared_ptr _brpc_iobuf_block_memory_tracker; + std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; std::shared_ptr _stream_load_pipe_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ac2526b5e61b39..eb9fa12ea4bbdb 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -593,12 +593,9 @@ void ExecEnv::init_mem_tracker() { _s_tracking_memory = true; _orphan_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _details_mem_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet"); - _page_no_cache_mem_tracker = - std::make_shared("PageNoCache", _details_mem_tracker_set.get()); + _page_no_cache_mem_tracker = std::make_shared("PageNoCache"); _brpc_iobuf_block_memory_tracker = - std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory"); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); _point_query_executor_mem_tracker = diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9c8f4c2a0f3cc..94bd210f262557 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -72,13 +72,12 @@ class LoadChannelMgr { Status _start_bg_worker(); - class LastSuccessChannelCache : public LRUCachePolicyTrackingManual { + class LastSuccessChannelCache : public LRUCachePolicy { public: LastSuccessChannelCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, - capacity, LRUCacheType::SIZE, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity, + LRUCacheType::SIZE, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; protected: diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 82b69ca02ef9f3..45d7781786f2d7 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -23,9 +23,6 @@ namespace doris { -std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock; -std::unordered_map GlobalMemoryArbitrator::_reserved_trackers; - bvar::PassiveStatus g_vm_rss_sub_allocator_cache( "meminfo_vm_rss_sub_allocator_cache", [](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr); @@ -62,28 +59,11 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { } } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, std::memory_order_relaxed)); - { - std::lock_guard l(_reserved_trackers_lock); - _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes); - } return true; } void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) { _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); - { - std::lock_guard l(_reserved_trackers_lock); - auto label = doris::thread_context()->thread_mem_tracker()->label(); - auto it = _reserved_trackers.find(label); - if (it == _reserved_trackers.end()) { - DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes; - return; - } - _reserved_trackers[label].sub(bytes); - if (_reserved_trackers[label].current_value() == 0) { - _reserved_trackers.erase(it); - } - } } int64_t GlobalMemoryArbitrator::sub_thread_reserve_memory(int64_t bytes) { diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index f804452956786d..1859f45391fca3 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -17,7 +17,7 @@ #pragma once -#include "runtime/memory/mem_tracker.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" namespace doris { @@ -106,20 +106,6 @@ class GlobalMemoryArbitrator { static bool try_reserve_process_memory(int64_t bytes); static void release_process_reserved_memory(int64_t bytes); - static inline void make_reserved_memory_snapshots( - std::vector* snapshots) { - std::lock_guard l(_reserved_trackers_lock); - for (const auto& pair : _reserved_trackers) { - MemTracker::Snapshot snapshot; - snapshot.type = "reserved_memory"; - snapshot.label = pair.first; - snapshot.limit = -1; - snapshot.cur_consumption = pair.second.current_value(); - snapshot.peak_consumption = pair.second.peak_value(); - (*snapshots).emplace_back(snapshot); - } - } - static inline int64_t process_reserved_memory() { return _s_process_reserved_memory.load(std::memory_order_relaxed); } @@ -207,9 +193,6 @@ class GlobalMemoryArbitrator { private: static std::atomic _s_process_reserved_memory; - - static std::mutex _reserved_trackers_lock; - static std::unordered_map _reserved_trackers; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 419825c85c4538..7b5a8ab9fec6d9 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -47,6 +47,7 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, @@ -65,6 +66,7 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } void reset_cache() { _cache.reset(); } @@ -92,11 +94,33 @@ class LRUCachePolicy : public CachePolicy { } } - virtual int64_t mem_consumption() = 0; + std::shared_ptr mem_tracker() const { + DCHECK(_mem_tracker != nullptr); + return _mem_tracker; + } - virtual Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, - size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) = 0; + int64_t mem_consumption() { + DCHECK(_mem_tracker != nullptr); + return _mem_tracker->consumption(); + } + + // Insert will consume tracking_bytes to _mem_tracker and cache value destroy will release tracking_bytes. + // If LRUCacheType::SIZE, tracking_bytes usually equal to charge. + // If LRUCacheType::NUMBER, tracking_bytes usually not equal to charge, at this time charge is an weight. + // If LRUCacheType::SIZE and tracking_bytes equals 0, memory must be tracked in Doris Allocator, + // cache value is allocated using Alloctor. + // If LRUCacheType::NUMBER and tracking_bytes equals 0, usually currently cannot accurately tracking memory size, + // only tracking handle_size(106). + Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, + CachePriority priority = CachePriority::NORMAL) { + size_t tracking_bytes_with_handle = sizeof(LRUHandle) - 1 + key.size() + tracking_bytes; + if (value != nullptr) { + mem_tracker()->consume(tracking_bytes_with_handle); + ((LRUCacheValueBase*)value) + ->set_tracking_bytes(tracking_bytes_with_handle, _mem_tracker); + } + return _cache->insert(key, value, charge, priority); + } Cache::Handle* lookup(const CacheKey& key) { return _cache->lookup(key); } @@ -238,128 +262,19 @@ class LRUCachePolicy : public CachePolicy { } protected: + void _init_mem_tracker(const std::string& type_name) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::GLOBAL, + fmt::format("{}[{}]", type_string(_type), type_name)); + } + // if check_capacity failed, will return dummy lru cache, // compatible with ShardedLRUCache usage, but will not actually cache. std::shared_ptr _cache; std::mutex _lock; LRUCacheType _lru_cache_type; -}; - -class LRUCachePolicyTrackingAllocator : public LRUCachePolicy { -public: - LRUCachePolicyTrackingAllocator( - CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, - uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, - bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - LRUCachePolicyTrackingAllocator(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards, - uint32_t element_count_capacity, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - ~LRUCachePolicyTrackingAllocator() override { reset_cache(); } - - std::shared_ptr mem_tracker() const { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker; - } - - int64_t mem_consumption() override { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker->consumption(); - } - - Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) override { - return _cache->insert(key, value, charge, priority); - } - -protected: - void _init_mem_tracker(const std::string& type_name) { - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::GLOBAL, - fmt::format("{}[{}](AllocByAllocator)", type_string(_type), type_name)); - } std::shared_ptr _mem_tracker; }; -class LRUCachePolicyTrackingManual : public LRUCachePolicy { -public: - LRUCachePolicyTrackingManual( - CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, - uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, - bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - LRUCachePolicyTrackingManual(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards, - uint32_t element_count_capacity, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - ~LRUCachePolicyTrackingManual() override { reset_cache(); } - - MemTracker* mem_tracker() { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker.get(); - } - - int64_t mem_consumption() override { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker->consumption(); - } - - // Insert and cache value destroy will be manually consume tracking_bytes to mem tracker. - // If lru cache is LRUCacheType::SIZE, tracking_bytes usually equal to charge. - Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) override { - size_t bytes_with_handle = _get_bytes_with_handle(key, charge, tracking_bytes); - if (value != nullptr) { // if tracking_bytes = 0, only tracking handle size. - mem_tracker()->consume(bytes_with_handle); - ((LRUCacheValueBase*)value)->set_tracking_bytes(bytes_with_handle, mem_tracker()); - } - return _cache->insert(key, value, charge, priority); - } - -private: - void _init_mem_tracker(const std::string& type_name) { - _mem_tracker = - std::make_unique(fmt::format("{}[{}]", type_string(_type), type_name), - ExecEnv::GetInstance()->details_mem_tracker_set()); - } - - // LRUCacheType::SIZE equal to total_size. - size_t _get_bytes_with_handle(const CacheKey& key, size_t charge, size_t bytes) { - size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); - DCHECK(_lru_cache_type == LRUCacheType::SIZE || bytes != -1) - << " _type " << type_string(_type); - // if LRUCacheType::NUMBER and bytes equals 0, such as some caches cannot accurately track memory size. - // cache mem tracker value and _usage divided by handle_size(106) will get the number of cache entries. - return _lru_cache_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes; - } - - std::unique_ptr _mem_tracker; -}; - } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_value_base.h b/be/src/runtime/memory/lru_cache_value_base.h index 6d4b2991a023a6..f9e534e6600df8 100644 --- a/be/src/runtime/memory/lru_cache_value_base.h +++ b/be/src/runtime/memory/lru_cache_value_base.h @@ -27,18 +27,19 @@ class LRUCacheValueBase { public: virtual ~LRUCacheValueBase() { if (_tracking_bytes > 0) { - _mem_tracker->consume(-_tracking_bytes); + _mem_tracker->release(_tracking_bytes); } } - void set_tracking_bytes(size_t tracking_bytes, MemTracker* mem_tracker) { + void set_tracking_bytes(size_t tracking_bytes, + const std::shared_ptr& mem_tracker) { this->_tracking_bytes = tracking_bytes; this->_mem_tracker = mem_tracker; } protected: size_t _tracking_bytes = 0; - MemTracker* _mem_tracker = nullptr; + std::shared_ptr _mem_tracker; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_counter.h b/be/src/runtime/memory/mem_counter.h new file mode 100644 index 00000000000000..8964a5dc63f732 --- /dev/null +++ b/be/src/runtime/memory/mem_counter.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +#pragma once + +#include +#include +#include + +#include "common/compiler_util.h" +#include "util/pretty_printer.h" + +namespace doris { + +/* + * A counter that keeps track of the current and peak memory usage seen. + * Relaxed ordering, not accurate in real time. + * + * This class is thread-safe. +*/ +class MemCounter { +public: + MemCounter() = default; + + void add(int64_t delta) { + if (UNLIKELY(delta == 0)) { + return; + } + int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; + update_peak(value); + } + + void add_no_update_peak(int64_t delta) { // need extreme fast + _current_value.fetch_add(delta, std::memory_order_relaxed); + } + + bool try_add(int64_t delta, int64_t max) { + if (UNLIKELY(delta == 0)) { + return true; + } + int64_t cur_val = _current_value.load(std::memory_order_relaxed); + int64_t new_val = 0; + do { + new_val = cur_val + delta; + if (UNLIKELY(new_val > max)) { + return false; + } + } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, + std::memory_order_relaxed))); + update_peak(new_val); + return true; + } + + void sub(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } + + void set(int64_t v) { + _current_value.store(v, std::memory_order_relaxed); + update_peak(v); + } + + void update_peak(int64_t value) { + int64_t pre_value = _peak_value.load(std::memory_order_relaxed); + while (value > pre_value && + !_peak_value.compare_exchange_weak(pre_value, value, std::memory_order_relaxed)) { + } + } + + int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } + int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } + + static std::string print_bytes(int64_t bytes) { + return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) + : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); + } + +private: + std::atomic _current_value {0}; + std::atomic _peak_value {0}; +}; + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index f5a3853f79f84d..796e6c166e04fe 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -15,100 +15,39 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp -// and modified by Doris #include "runtime/memory/mem_tracker.h" -#include - -#include - -#include "bvar/bvar.h" -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/thread_context.h" +#include namespace doris { +constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000; +std::atomic mem_tracker_group_counter(0); bvar::Adder g_memtracker_cnt("memtracker_cnt"); -// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter. -// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together. -// Each group corresponds to several MemTrackerLimiters and has a lock. -// Multiple groups are used to reduce the impact of locks. -std::vector MemTracker::mem_tracker_pool(1000); +std::vector MemTracker::mem_tracker_pool(MEM_TRACKERS_GROUP_NUM); -MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { - _consumption = std::make_shared(); - bind_parent(parent); -} - -void MemTracker::bind_parent(MemTrackerLimiter* parent) { - if (parent) { - _type = parent->type(); - _parent_label = parent->label(); - _parent_group_num = parent->group_num(); - } else { - _type = thread_context()->thread_mem_tracker()->type(); - _parent_label = thread_context()->thread_mem_tracker()->label(); - _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); - } +MemTracker::MemTracker(const std::string& label) { + _label = label; + _group_num = mem_tracker_group_counter.fetch_add(1) % MEM_TRACKERS_GROUP_NUM; { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.insert( - mem_tracker_pool[_parent_group_num].trackers.end(), this); + std::lock_guard l(mem_tracker_pool[_group_num].group_lock); + _trackers_group_it = mem_tracker_pool[_group_num].trackers.insert( + mem_tracker_pool[_group_num].trackers.end(), this); } g_memtracker_cnt << 1; } MemTracker::~MemTracker() { - if (_parent_group_num != -1) { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - if (_tracker_group_it != mem_tracker_pool[_parent_group_num].trackers.end()) { - mem_tracker_pool[_parent_group_num].trackers.erase(_tracker_group_it); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.end(); + if (_group_num != -1) { + std::lock_guard l(mem_tracker_pool[_group_num].group_lock); + if (_trackers_group_it != mem_tracker_pool[_group_num].trackers.end()) { + mem_tracker_pool[_group_num].trackers.erase(_trackers_group_it); + _trackers_group_it = mem_tracker_pool[_group_num].trackers.end(); } g_memtracker_cnt << -1; } } -MemTracker::Snapshot MemTracker::make_snapshot() const { - Snapshot snapshot; - snapshot.type = type_string(_type); - snapshot.label = _label; - snapshot.parent_label = _parent_label; - snapshot.limit = -1; - snapshot.cur_consumption = _consumption->current_value(); - snapshot.peak_consumption = _consumption->peak_value(); - return snapshot; -} - -void MemTracker::make_group_snapshot(std::vector* snapshots, - int64_t group_num, std::string parent_label) { - std::lock_guard l(mem_tracker_pool[group_num].group_lock); - for (auto* tracker : mem_tracker_pool[group_num].trackers) { - if (tracker->parent_label() == parent_label && tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } - } -} - -void MemTracker::make_all_trackers_snapshots(std::vector* snapshots) { - for (auto& i : mem_tracker_pool) { - std::lock_guard l(i.group_lock); - for (auto* tracker : i.trackers) { - if (tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } - } - } -} - -std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption), - snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), - snapshot.peak_consumption); -} - } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 8a977e49388d52..9ea11fa86968ad 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -15,216 +15,59 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h -// and modified by Doris #pragma once -#include -#include - -#include -// IWYU pragma: no_include -#include // IWYU pragma: keep -#include -#include -#include +#include #include -#include -#include "common/compiler_util.h" // IWYU pragma: keep -#include "runtime/query_statistics.h" -#include "util/pretty_printer.h" +#include "runtime/memory/mem_counter.h" namespace doris { -class MemTrackerLimiter; - -// Used to track memory usage. -// -// MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, -// which will automatically track all memory usage of the code segment where it is located. -// -// This class is thread-safe. -class MemTracker { +/* + * can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, + * which will automatically track all memory usage of the code segment where it is located. + * + * This class is thread-safe. +*/ +class MemTracker final { public: - struct Snapshot { - std::string type; - std::string label; - std::string parent_label; - int64_t limit = 0; - int64_t cur_consumption = 0; - int64_t peak_consumption = 0; - - bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } - }; - - struct TrackerGroup { - std::list trackers; - std::mutex group_lock; - }; - - enum class Type { - GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan - QUERY = 1, // Count the memory consumption of all Query tasks. - LOAD = 2, // Count the memory consumption of all Load tasks. - COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. - SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. - OTHER = 5 - }; - - static std::string type_string(Type type) { - switch (type) { - case Type::GLOBAL: - return "global"; - case Type::QUERY: - return "query"; - case Type::LOAD: - return "load"; - case Type::COMPACTION: - return "compaction"; - case Type::SCHEMA_CHANGE: - return "schema_change"; - case Type::OTHER: - return "other"; - default: - LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - - // A counter that keeps track of the current and peak value seen. - // Relaxed ordering, not accurate in real time. - class MemCounter { - public: - MemCounter() : _current_value(0), _peak_value(0) {} - - void add(int64_t delta) { - int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; - update_peak(value); - } - - void add_no_update_peak(int64_t delta) { - _current_value.fetch_add(delta, std::memory_order_relaxed); - } - - bool try_add(int64_t delta, int64_t max) { - int64_t cur_val = _current_value.load(std::memory_order_relaxed); - int64_t new_val = 0; - do { - new_val = cur_val + delta; - if (UNLIKELY(new_val > max)) { - return false; - } - } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, - std::memory_order_relaxed))); - update_peak(new_val); - return true; - } - - void sub(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } - - void set(int64_t v) { - _current_value.store(v, std::memory_order_relaxed); - update_peak(v); - } - - void update_peak(int64_t value) { - int64_t pre_value = _peak_value.load(std::memory_order_relaxed); - while (value > pre_value && !_peak_value.compare_exchange_weak( - pre_value, value, std::memory_order_relaxed)) { - } - } - - int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } - int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } - - private: - std::atomic _current_value; - std::atomic _peak_value; - }; - - // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); + MemTracker() = default; + MemTracker(const std::string& label); + ~MemTracker(); - virtual ~MemTracker(); + void consume(int64_t bytes) { _mem_counter.add(bytes); } + void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } + void release(int64_t bytes) { _mem_counter.sub(bytes); } + void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } + int64_t consumption() const { return _mem_counter.current_value(); } + int64_t peak_consumption() const { return _mem_counter.peak_value(); } - static std::string print_bytes(int64_t bytes) { - return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) - : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); - } - -public: - Type type() const { return _type; } const std::string& label() const { return _label; } - const std::string& parent_label() const { return _parent_label; } - const std::string& set_parent_label() const { return _parent_label; } - // Returns the memory consumed in bytes. - int64_t consumption() const { return _consumption->current_value(); } - int64_t peak_consumption() const { return _consumption->peak_value(); } - - void consume(int64_t bytes) { - if (UNLIKELY(bytes == 0)) { - return; - } - _consumption->add(bytes); - if (_query_statistics) { - _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); - _query_statistics->set_current_used_memory_bytes(_consumption->current_value()); - } - } - - void consume_no_update_peak(int64_t bytes) { // need extreme fast - _consumption->add_no_update_peak(bytes); - } - - void release(int64_t bytes) { _consumption->sub(bytes); } - - void set_consumption(int64_t bytes) { _consumption->set(bytes); } - - std::shared_ptr get_query_statistics() { return _query_statistics; } - -public: - virtual Snapshot make_snapshot() const; - // Specify group_num from mem_tracker_pool to generate snapshot. - static void make_group_snapshot(std::vector* snapshots, int64_t group_num, - std::string parent_label); - static void make_all_trackers_snapshots(std::vector* snapshots); - static std::string log_usage(MemTracker::Snapshot snapshot); - - virtual std::string debug_string() { - std::stringstream msg; - msg << "label: " << _label << "; " - << "consumption: " << consumption() << "; " - << "peak_consumption: " << peak_consumption() << "; "; - return msg.str(); + std::string log_usage() const { + return fmt::format("MemTracker Lame={}, Used={}({} B), Peak={}({} B)", + MemCounter::print_bytes(consumption()), consumption(), + MemCounter::print_bytes(peak_consumption()), peak_consumption()); } -protected: - // Only used by MemTrackerLimiter - MemTracker() { _parent_group_num = -1; } - - void bind_parent(MemTrackerLimiter* parent); - - Type _type; - - // label used in the make snapshot, not guaranteed unique. - std::string _label; - - std::shared_ptr _consumption = nullptr; - - // Tracker is located in group num in mem_tracker_pool - int64_t _parent_group_num = 0; - - // Use _parent_label to correlate with parent limiter tracker. - std::string _parent_label = "-"; - - static std::vector mem_tracker_pool; +private: + MemCounter _mem_counter; + std::string _label {"None"}; + /* + * Save all MemTrackers, used by dump memory info. + */ + struct TrackersGroup { + std::list trackers; + std::mutex group_lock; + }; + // Each group corresponds to several MemCountes and has a lock. + // Multiple groups are used to reduce the impact of locks. + static std::vector mem_tracker_pool; + // Group number in mem_tracker_pool, generated by the timestamp. + int64_t _group_num {-1}; // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. - std::list::iterator _tracker_group_it; - - std::shared_ptr _query_statistics = nullptr; + std::list::iterator _trackers_group_it; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a1eb2ed67d3426..78e66b6a579b79 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -19,16 +19,13 @@ #include #include -#include #include #include #include #include -#include "bvar/bvar.h" #include "common/config.h" -#include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/memory/global_memory_arbitrator.h" @@ -37,9 +34,7 @@ #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" -#include "util/pretty_printer.h" #include "util/runtime_profile.h" -#include "util/stack_util.h" namespace doris { @@ -54,6 +49,7 @@ static bvar::Adder memory_schema_change_trackers_sum_bytes( "memory_schema_change_trackers_sum_bytes"); static bvar::Adder memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes"); +std::atomic mem_tracker_limiter_group_counter(0); constexpr auto GC_MAX_SEEK_TRACKER = 1000; std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; @@ -76,14 +72,14 @@ static RuntimeProfile::Counter* previously_canceling_tasks_counter = MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) { DCHECK_GE(byte_limit, -1); - _consumption = std::make_shared(); _type = type; _label = label; _limit = byte_limit; if (_type == Type::GLOBAL) { _group_num = 0; } else { - _group_num = random() % 999 + 1; + _group_num = + mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 1) + 1; } // currently only select/load need runtime query statistics @@ -132,25 +128,23 @@ MemTrackerLimiter::~MemTrackerLimiter() { "tracker web or log, this indicates that there may be a memory leak. " "4. If you need to " "transfer memory tracking value between two trackers, can use transfer_to."; - if (_consumption->current_value() != 0) { + if (consumption() != 0) { if (open_memory_tracker_inaccurate_detect()) { - std::string err_msg = - fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", - label(), _consumption->current_value(), _consumption->peak_value(), - mem_tracker_inaccurate_msg); + std::string err_msg = fmt::format( + "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), + consumption(), peak_consumption(), mem_tracker_inaccurate_msg); LOG(FATAL) << err_msg << print_address_sanitizers(); } if (ExecEnv::tracking_memory()) { - ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); + ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption()); } - _consumption->set(0); - } else if (doris::config::crash_in_memory_tracker_inaccurate && !_address_sanitizers.empty() && - !is_group_commit_load) { + _mem_counter.set(0); + } else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) { LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + << ", peak consumption: " << peak_consumption() << print_address_sanitizers(); } + DCHECK(reserved_consumption() == 0); memory_memtrackerlimiter_cnt << -1; } @@ -163,9 +157,9 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), - buf, size, it->first, it->second.size, - get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -186,8 +180,8 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), buf, - size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } _address_sanitizers.erase(buf); @@ -195,7 +189,7 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { _error_address_sanitizers.emplace_back(fmt::format( "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + _label, consumption(), peak_consumption(), buf, size, get_stack_trace(1, "FULL_WITH_INLINE"))); } } @@ -209,8 +203,8 @@ std::string MemTrackerLimiter::print_address_sanitizers() { auto msg = fmt::format( "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", - _label, _consumption->current_value(), _consumption->peak_value(), it.first, - it.second.size, it.second.stack_trace); + _label, consumption(), peak_consumption(), it.first, it.second.size, + it.second.stack_trace); LOG(INFO) << msg; detail += msg; } @@ -222,16 +216,38 @@ std::string MemTrackerLimiter::print_address_sanitizers() { return detail; } -MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { +MemTrackerLimiter::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); snapshot.label = _label; snapshot.limit = _limit; - snapshot.cur_consumption = _consumption->current_value(); - snapshot.peak_consumption = _consumption->peak_value(); + snapshot.cur_consumption = consumption(); + snapshot.peak_consumption = peak_consumption(); return snapshot; } +MemTrackerLimiter::Snapshot MemTrackerLimiter::make_reserved_trackers_snapshot() const { + Snapshot snapshot; + snapshot.type = "reserved_memory"; + snapshot.label = _label; + snapshot.limit = -1; + snapshot.cur_consumption = reserved_consumption(); + snapshot.peak_consumption = reserved_peak_consumption(); + return snapshot; +} + +void MemTrackerLimiter::make_all_reserved_trackers_snapshots(std::vector* snapshots) { + for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { + std::lock_guard l(i.group_lock); + for (auto trackerWptr : i.trackers) { + auto tracker = trackerWptr.lock(); + if (tracker != nullptr && tracker->reserved_consumption() != 0) { + (*snapshots).emplace_back(tracker->make_reserved_trackers_snapshot()); + } + } + } +} + void MemTrackerLimiter::refresh_global_counter() { std::unordered_map type_mem_sum = { {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, @@ -248,7 +264,8 @@ void MemTrackerLimiter::refresh_global_counter() { } int64_t all_trackers_mem_sum = 0; for (auto it : type_mem_sum) { - MemTrackerLimiter::TypeMemSum[it.first]->set(it.second); + MemTrackerLimiter::TypeMemSum[it.first].set(it.second); + all_trackers_mem_sum += it.second; switch (it.first) { case Type::GLOBAL: @@ -300,18 +317,18 @@ void MemTrackerLimiter::clean_tracker_limiter_group() { #endif } -void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { MemTrackerLimiter::refresh_global_counter(); int64_t all_trackers_mem_sum = 0; Snapshot snapshot; - for (auto it : MemTrackerLimiter::TypeMemSum) { + for (const auto& it : MemTrackerLimiter::TypeMemSum) { snapshot.type = "overview"; snapshot.label = type_string(it.first); snapshot.limit = -1; - snapshot.cur_consumption = it.second->current_value(); - snapshot.peak_consumption = it.second->peak_value(); + snapshot.cur_consumption = it.second.current_value(); + snapshot.peak_consumption = it.second.peak_value(); (*snapshots).emplace_back(snapshot); - all_trackers_mem_sum += it.second->current_value(); + all_trackers_mem_sum += it.second.current_value(); } snapshot.type = "overview"; @@ -363,7 +380,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector (*snapshots).emplace_back(snapshot); } -void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, MemTrackerLimiter::Type type) { if (type == Type::GLOBAL) { std::lock_guard l( @@ -372,7 +389,6 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); } } } else { @@ -383,17 +399,15 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr && tracker->type() == type) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), - tracker->label()); } } } } } -void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, int top_num) { - std::priority_queue max_pq; + std::priority_queue max_pq; // not include global type. for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { std::lock_guard l( @@ -413,7 +427,7 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { std::lock_guard l(i.group_lock); for (auto trackerWptr : i.trackers) { @@ -425,25 +439,25 @@ void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_memory_state_snapshots(std::vector* snapshots) { make_process_snapshots(snapshots); make_all_trackers_snapshots(snapshots); - MemTracker::make_all_trackers_snapshots(snapshots); + make_all_reserved_trackers_snapshots(snapshots); } -std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format( - "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.type, print_bytes(snapshot.limit), snapshot.limit, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); +std::string MemTrackerLimiter::log_usage(Snapshot snapshot) { + return fmt::format("MemTracker Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", + snapshot.label, snapshot.type, MemCounter::print_bytes(snapshot.limit), + snapshot.limit, MemCounter::print_bytes(snapshot.cur_consumption), + snapshot.cur_consumption, MemCounter::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } -std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) { +std::string MemTrackerLimiter::type_log_usage(Snapshot snapshot) { return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); + MemCounter::print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, + MemCounter::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type type) { @@ -467,16 +481,6 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string detail = msg; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); detail += "\nMemory Tracker Summary: " + log_usage(); - std::string child_trackers_usage; - std::vector snapshots; - MemTracker::make_group_snapshot(&snapshots, _group_num, _label); - for (const auto& snapshot : snapshots) { - child_trackers_usage += "\n " + MemTracker::log_usage(snapshot); - } - if (!child_trackers_usage.empty()) { - detail += child_trackers_usage; - } - LOG(WARNING) << detail; } } @@ -484,25 +488,24 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string MemTrackerLimiter::log_process_usage_str() { std::string detail; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); - std::vector snapshots; + std::vector snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); MemTrackerLimiter::make_top_consumption_snapshots(&snapshots, 15); - - // Add additional tracker printed when memory exceeds limit. - snapshots.emplace_back( - ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot()); + MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots); detail += "\nMemory Tracker Summary:"; for (const auto& snapshot : snapshots) { - if (snapshot.label.empty() && snapshot.parent_label.empty()) { + if (snapshot.label.empty()) { detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label.empty()) { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } else { - detail += "\n " + MemTracker::log_usage(snapshot); + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } } + + // Add additional tracker printed when memory exceeds limit. + detail += "\n " + + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->log_usage(); return detail; } @@ -518,8 +521,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { std::string err_msg = fmt::format( "memory tracker limit exceeded, tracker label:{}, type:{}, limit " "{}, peak used {}, current used {}. backend {}, {}.", - label(), type_string(_type), print_bytes(limit()), - print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()), + label(), type_string(_type), MemCounter::print_bytes(limit()), + MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()), BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( @@ -544,7 +547,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, "Process memory not enough, cancel top memory used {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemCounter::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); @@ -665,7 +668,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, "Process memory not enough, cancel top memory overcommit {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemCounter::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index c8a8c845793087..faf354cca4cbf3 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -20,28 +20,29 @@ #include #include #include -#include #include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include #include #include -#include #include #include #include #include "common/config.h" #include "common/status.h" -#include "runtime/memory/mem_tracker.h" +#include "runtime/memory/mem_counter.h" +#include "runtime/query_statistics.h" #include "util/string_util.h" #include "util/uid_util.h" namespace doris { class RuntimeProfile; +class MemTrackerLimiter; constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; @@ -58,78 +59,115 @@ struct TrackerLimiterGroup { std::mutex group_lock; }; -// Track and limit the memory usage of process and query. -// Contains an limit, arranged into a tree structure. -// -// Automatically track every once malloc/free of the system memory allocator (Currently, based on TCMlloc hook). -// Put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,all memory used by this thread -// will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default. -class MemTrackerLimiter final : public MemTracker { +/* + * Track and limit the memory usage of process and query. + * + * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts, + * all memory used by this thread will be recorded on this Query. + * + * This class is thread-safe. +*/ +class MemTrackerLimiter final { public: + /* + * Part 1, Type definition + */ + // TODO There are more and more GC codes and there should be a separate manager class. enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 }; - inline static std::unordered_map> TypeMemSum = { - {Type::GLOBAL, std::make_shared()}, - {Type::QUERY, std::make_shared()}, - {Type::LOAD, std::make_shared()}, - {Type::COMPACTION, std::make_shared()}, - {Type::SCHEMA_CHANGE, std::make_shared()}, - {Type::OTHER, std::make_shared()}}; + enum class Type { + GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan + QUERY = 1, // Count the memory consumption of all Query tasks. + LOAD = 2, // Count the memory consumption of all Load tasks. + COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. + SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. + OTHER = 5, + }; + + struct Snapshot { + std::string type; + std::string label; + int64_t limit = 0; + int64_t cur_consumption = 0; + int64_t peak_consumption = 0; + + bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } + }; + + // Corresponding to MemTrackerLimiter::Type. + // MemCounter contains atomic variables, which are not allowed to be copied or moved. + inline static std::unordered_map TypeMemSum; + + /* + * Part 2, Constructors and property methods + */ -public: static std::shared_ptr create_shared( MemTrackerLimiter::Type type, const std::string& label = std::string(), int64_t byte_limit = -1); // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); - ~MemTrackerLimiter() override; + ~MemTrackerLimiter(); - static std::string gc_type_string(GCType type) { - switch (type) { - case GCType::PROCESS: - return "process"; - case GCType::WORK_LOAD_GROUP: - return "work load group"; - default: - LOG(FATAL) << "not match gc type:" << static_cast(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - - void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } + Type type() const { return _type; } + const std::string& label() const { return _label; } + std::shared_ptr get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); } + Status check_limit(int64_t bytes = 0); + bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } + bool is_query_cancelled() { return _is_query_cancelled; } + void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } + + // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. + std::list>::iterator wg_tracker_limiter_group_it; + + /* + * Part 3, Memory tracking method (use carefully!) + * + * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release(). + * Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or + * SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER. + */ + + int64_t consumption() const { return _mem_counter.current_value(); } + int64_t peak_consumption() const { return _mem_counter.peak_value(); } + + void consume(int64_t bytes) { + _mem_counter.add(bytes); + if (_query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); + } + } + + void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } + + void release(int64_t bytes) { _mem_counter.sub(bytes); } - bool try_consume(int64_t bytes) const { + bool try_consume(int64_t bytes) { if (UNLIKELY(bytes == 0)) { return true; } - bool st = true; + bool rt = true; if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) { - st = _consumption->try_add(bytes, _limit); + rt = _mem_counter.try_add(bytes, _limit); } else { - _consumption->add(bytes); + _mem_counter.add(bytes); } - if (st && _query_statistics) { - _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); - _query_statistics->set_current_used_memory_bytes(_consumption->current_value()); + if (rt && _query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); } - return st; + return rt; } - Status check_limit(int64_t bytes = 0); - bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } - - bool is_query_cancelled() { return _is_query_cancelled; } + void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } - void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } - -public: // Transfer 'bytes' of consumption from this tracker to 'dst'. void transfer_to(int64_t size, MemTrackerLimiter* dst) { if (label() == dst->label()) { @@ -139,21 +177,50 @@ class MemTrackerLimiter final : public MemTracker { dst->cache_consume(size); } + // If need to consume the tracker frequently, use it + void cache_consume(int64_t bytes); + + /* + * Part 4, Reserved memory tracking method + */ + + int64_t reserved_consumption() const { return _reserved_counter.current_value(); } + int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); } + + bool try_reserve(int64_t bytes) { + bool rt = try_consume(bytes); + if (rt) { + _reserved_counter.add(bytes); + } + return rt; + } + + void release_reserved(int64_t bytes) { + _reserved_counter.sub(bytes); + DCHECK(reserved_consumption() >= 0); + } + + Snapshot make_reserved_trackers_snapshot() const; + static void make_all_reserved_trackers_snapshots(std::vector* snapshots); + + /* + * Part 4, Memory snapshot and log method + */ + static void refresh_global_counter(); static void clean_tracker_limiter_group(); - Snapshot make_snapshot() const override; + Snapshot make_snapshot() const; // Returns a list of all the valid tracker snapshots. - static void make_process_snapshots(std::vector* snapshots); - static void make_type_snapshots(std::vector* snapshots, Type type); - static void make_all_trackers_snapshots(std::vector* snapshots); - static void make_all_memory_state_snapshots(std::vector* snapshots); - static void make_top_consumption_snapshots(std::vector* snapshots, - int top_num); - - static std::string log_usage(MemTracker::Snapshot snapshot); + static void make_process_snapshots(std::vector* snapshots); + static void make_type_snapshots(std::vector* snapshots, Type type); + static void make_all_trackers_snapshots(std::vector* snapshots); + static void make_all_memory_state_snapshots(std::vector* snapshots); + static void make_top_consumption_snapshots(std::vector* snapshots, int top_num); + + static std::string log_usage(Snapshot snapshot); std::string log_usage() const { return log_usage(make_snapshot()); } - static std::string type_log_usage(MemTracker::Snapshot snapshot); + static std::string type_log_usage(Snapshot snapshot); static std::string type_detail_usage(const std::string& msg, Type type); void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } @@ -161,6 +228,12 @@ class MemTrackerLimiter final : public MemTracker { static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } static std::string log_process_usage_str(); static void print_log_process_usage(); + // Log the memory usage when memory limit is exceeded. + std::string tracker_limit_exceeded_str(); + + /* + * Part 5, Memory GC method + */ // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. // cancel_reason recorded when gc is triggered, for log printing. @@ -191,6 +264,53 @@ class MemTrackerLimiter final : public MemTracker { return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD); } + /* + * Part 6, Memory debug method + */ + + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + bool is_group_commit_load {false}; + +private: + /* + * Part 7, Private method + */ + + static std::string type_string(Type type) { + switch (type) { + case Type::GLOBAL: + return "global"; + case Type::QUERY: + return "query"; + case Type::LOAD: + return "load"; + case Type::COMPACTION: + return "compaction"; + case Type::SCHEMA_CHANGE: + return "schema_change"; + case Type::OTHER: + return "other"; + default: + LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + static std::string gc_type_string(GCType type) { + switch (type) { + case GCType::PROCESS: + return "process"; + case GCType::WORK_LOAD_GROUP: + return "work load group"; + default: + LOG(FATAL) << "not match gc type:" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { if (label.find("#Id=") == std::string::npos) { @@ -202,37 +322,23 @@ class MemTrackerLimiter final : public MemTracker { return querytid; } - // Log the memory usage when memory limit is exceeded. - std::string tracker_limit_exceeded_str(); - - void add_address_sanitizers(void* buf, size_t size); - void remove_address_sanitizers(void* buf, size_t size); - bool is_group_commit_load {false}; - - std::string debug_string() override { - std::stringstream msg; - msg << "limit: " << _limit << "; " - << "consumption: " << _consumption->current_value() << "; " - << "label: " << _label << "; " - << "type: " << type_string(_type) << "; "; - return msg.str(); - } - - // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list>::iterator wg_tracker_limiter_group_it; - -private: - friend class ThreadMemTrackerMgr; - - // If need to consume the tracker frequently, use it - void cache_consume(int64_t bytes); - // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. // Thread safety. int64_t add_untracked_mem(int64_t bytes); -private: + /* + * Part 8, Property definition + */ + + Type _type; + + // label used in the make snapshot, not guaranteed unique. + std::string _label; + + MemCounter _mem_counter; + MemCounter _reserved_counter; + // Limit on memory consumption, in bytes. int64_t _limit; @@ -250,6 +356,8 @@ class MemTrackerLimiter final : public MemTracker { bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + std::shared_ptr _query_statistics = nullptr; + struct AddressSanitizer { size_t size; std::string stack_trace; @@ -271,7 +379,9 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { } inline void MemTrackerLimiter::cache_consume(int64_t bytes) { - if (bytes == 0) return; + if (bytes == 0) { + return; + } int64_t consume_bytes = add_untracked_mem(bytes); consume(consume_bytes); } @@ -280,9 +390,10 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memory_overcommit)) { return Status::OK(); } - if (_limit > 0 && _consumption->current_value() + bytes > _limit) { - return Status::MemoryLimitExceeded(fmt::format( - "failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str())); + if (_limit > 0 && consumption() + bytes > _limit) { + return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", + MemCounter::print_bytes(bytes), + tracker_limit_exceeded_str())); } return Status::OK(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 33dd0d41822ae1..d036564528534c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -46,7 +46,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( DCHECK(mem_tracker); CHECK(init()); flush_untracked_mem(); - _reserved_mem_stack.push_back(_reserved_mem); + _last_attach_snapshots_stack.push_back({_reserved_mem, _consumer_tracker_stack}); if (_reserved_mem != 0) { // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. @@ -54,6 +54,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _reserved_mem = 0; _untracked_mem = 0; } + _consumer_tracker_stack.clear(); _limiter_tracker = mem_tracker; } @@ -62,9 +63,10 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( CHECK(init()); flush_untracked_mem(); release_reserved(); - DCHECK(!_reserved_mem_stack.empty()); - _reserved_mem = _reserved_mem_stack.back(); - _reserved_mem_stack.pop_back(); + DCHECK(!_last_attach_snapshots_stack.empty()); + _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; + _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; + _last_attach_snapshots_stack.pop_back(); _limiter_tracker = old_mem_tracker; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index bb0091f2e6d6fb..fd14750d8b8ebc 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -106,7 +106,7 @@ class ThreadMemTrackerMgr { std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; for (const auto& v : _consumer_tracker_stack) { - fmt::format_to(consumer_tracker_buf, "{}, ", MemTracker::log_usage(v->make_snapshot())); + fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage()); } return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " @@ -119,6 +119,11 @@ class ThreadMemTrackerMgr { int64_t reserved_mem() const { return _reserved_mem; } private: + struct LastAttachSnapshot { + int64_t reserved_mem = 0; + std::vector consumer_tracker_stack; + }; + // is false: ExecEnv::ready() = false when thread local is initialized bool _init = false; // Cache untracked mem. @@ -126,9 +131,10 @@ class ThreadMemTrackerMgr { int64_t _old_untracked_mem = 0; int64_t _reserved_mem = 0; + // SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used, // so `attach_limiter_tracker` may be nested. - std::vector _reserved_mem_stack; + std::vector _last_attach_snapshots_stack; std::string _failed_consume_msg = std::string(); // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. @@ -194,6 +200,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che // if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, increase process reserved memory. if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); + _limiter_tracker->release_reserved(_untracked_mem); _untracked_mem = 0; } return; @@ -205,6 +212,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che size -= _reserved_mem; doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); + _limiter_tracker->release_reserved(_reserved_mem + _untracked_mem); _reserved_mem = 0; _untracked_mem = 0; } @@ -277,7 +285,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); - if (!_limiter_tracker->try_consume(size)) { + if (!_limiter_tracker->try_reserve(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " "{}", @@ -289,14 +297,16 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, wg_ptr->memory_debug_string()); - _limiter_tracker->release(size); // rollback + _limiter_tracker->release(size); // rollback + _limiter_tracker->release_reserved(size); // rollback return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker->release(size); // rollback + _limiter_tracker->release(size); // rollback + _limiter_tracker->release_reserved(size); // rollback if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback } @@ -310,6 +320,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { if (_reserved_mem != 0) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); + _limiter_tracker->release_reserved(_reserved_mem + _untracked_mem); _limiter_tracker->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2b333057a539e4..497041ac17b456 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -152,9 +152,9 @@ QueryContext::~QueryContext() { mem_tracker_msg = fmt::format( ", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()), - MemTracker::print_bytes(query_mem_tracker->consumption()), - MemTracker::print_bytes(query_mem_tracker->peak_consumption())); + print_id(_query_id), MemCounter::print_bytes(query_mem_tracker->limit()), + MemCounter::print_bytes(query_mem_tracker->consumption()), + MemCounter::print_bytes(query_mem_tracker->peak_consumption())); } uint64_t group_id = 0; if (_workload_group) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index b63495df837d1a..2c69b8a58704bf 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -315,7 +315,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { bool RoutineLoadTaskExecutor::_reach_memory_limit() { bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); auto current_load_mem_value = - MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value(); + MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD].current_value(); if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit << " current_load_mem_value: " << current_load_mem_value diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index d2b55d86bc6bd4..01fcf851321fc1 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -48,8 +48,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams _state = state; _state->runtime_filter_mgr = this; _query_mem_tracker = query_mem_tracker; - _tracker = std::make_unique("RuntimeFilterMgr(experimental)", - _query_mem_tracker.get()); + _tracker = std::make_unique("RuntimeFilterMgr(experimental)"); } RuntimeFilterMgr::~RuntimeFilterMgr() { @@ -264,8 +263,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, const TQueryOptions& query_options) { _query_id = query_id; - _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (runtime_filter_params.__isset.rid_to_runtime_filter) { for (const auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 85f79536b74ec3..6f3b51f09fd1f2 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -210,21 +210,21 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, cancel_str = fmt::format( "MinorGC kill overcommit query, wg id:{}, name:{}, used:{}, limit:{}, " "backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } else { if (_enable_memory_overcommit) { cancel_str = fmt::format( "FullGC release wg overcommit mem, wg id:{}, name:{}, " "used:{},limit:{},backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } else { cancel_str = fmt::format( "GC wg for hard limit, wg id:{}, name:{}, used:{}, limit:{}, " "backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } } auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, @@ -232,14 +232,14 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, return fmt::format( "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute " "again after enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption), + cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); }; auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again " "after enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption), + cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); }; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 32470fed5ab929..65a8e3685c80ed 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -232,9 +232,9 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { // check whether queries need to revoke memory for task group for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( - "\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, " + "\n MemTracker Label={}, Used={}, SpillThreshold={}, " "Peak={}", - query_mem_tracker->label(), query_mem_tracker->parent_label(), + query_mem_tracker->label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), PrettyPrinter::print(query_spill_threshold, TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 0a27c415a48c0a..9719a672b8dff4 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -191,9 +191,9 @@ LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capa } RowCache::RowCache(int64_t capacity, int num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, LRUCacheType::SIZE, - config::point_query_row_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, + LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, + num_shards) {} // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { @@ -223,8 +223,8 @@ void RowCache::insert(const RowCacheKey& key, const Slice& value) { auto* row_cache_value = new RowCacheValue; row_cache_value->cache_value = cache_value; const std::string& encoded_key = key.encode(); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, row_cache_value, value.size, - value.size, CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, + CachePriority::NORMAL); // handle will released auto tmp = CacheHandle {this, handle}; } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 6c6fb28f95a378..b22dc5bfd1d73f 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -126,9 +126,9 @@ class Reusable { }; // RowCache is a LRU cache for row store -class RowCache : public LRUCachePolicyTrackingManual { +class RowCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key for row lru cache struct RowCacheKey { @@ -220,7 +220,7 @@ class RowCache : public LRUCachePolicyTrackingManual { // A cache used for prepare stmt. // One connection per stmt perf uuid -class LookupConnectionCache : public LRUCachePolicyTrackingManual { +class LookupConnectionCache : public LRUCachePolicy { public: static LookupConnectionCache* instance() { return ExecEnv::GetInstance()->get_lookup_connection_cache(); @@ -231,9 +231,9 @@ class LookupConnectionCache : public LRUCachePolicyTrackingManual { private: friend class PointQueryExecutor; LookupConnectionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, - capacity, LRUCacheType::NUMBER, - config::tablet_lookup_cache_stale_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, + LRUCacheType::NUMBER, + config::tablet_lookup_cache_stale_sweep_time_sec) {} static std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index 05b8b8824b5448..600ffdb647ce44 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -20,9 +20,9 @@ namespace doris { ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, LRUCacheType::NUMBER, - config::common_obj_lru_cache_stale_sweep_time_sec, num_shards) { + : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, + LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec, + num_shards) { _enabled = (capacity > 0); } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index c7f805fc3a1de2..680a32e79bc991 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -25,9 +25,9 @@ namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. -class ObjLRUCache : public LRUCachePolicyTrackingManual { +class ObjLRUCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; struct ObjKey { ObjKey(const std::string& key_) : key(key_) {} @@ -94,8 +94,8 @@ class ObjLRUCache : public LRUCachePolicyTrackingManual { if (_enabled) { const std::string& encoded_key = key.key; auto* obj_value = new ObjValue(value); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, obj_value, 1, - sizeof(T), CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, obj_value, 1, sizeof(T), + CachePriority::NORMAL); *cache_handle = CacheHandle {this, handle}; } else { cache_handle = nullptr; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 47afe9f14d44b5..d7bbb761c88cc7 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -227,7 +227,6 @@ void Allocator::throw_b throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } -#ifndef NDEBUG template void Allocator::add_address_sanitizers( void* buf, size_t size) const { @@ -249,7 +248,6 @@ void Allocator::remove_ #endif doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); } -#endif template void* Allocator::alloc(size_t size, diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 0427d0c968df7e..b05128bc6933cc 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -242,10 +242,8 @@ class Allocator { void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; -#ifndef NDEBUG void add_address_sanitizers(void* buf, size_t size) const; void remove_address_sanitizers(void* buf, size_t size) const; -#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -289,9 +287,7 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } -#ifndef NDEBUG add_address_sanitizers(buf, record_size); -#endif } else { buf = nullptr; int res = MemoryAllocator::posix_memalign(&buf, alignment, size); @@ -307,9 +303,7 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } -#ifndef NDEBUG add_address_sanitizers(buf, record_size); -#endif } } if constexpr (MemoryAllocator::need_record_actual_size()) { @@ -325,9 +319,7 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { -#ifndef NDEBUG remove_address_sanitizers(buf, size); -#endif MemoryAllocator::free(buf); } release_memory(size); @@ -351,9 +343,7 @@ class Allocator { if (!use_mmap || (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { -#ifndef NDEBUG remove_address_sanitizers(buf, old_size); -#endif /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { @@ -361,11 +351,8 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } -#ifndef NDEBUG - add_address_sanitizers( - new_buf, - new_size); // usually, buf addr = new_buf addr, asan maybe not equal. -#endif + // usually, buf addr = new_buf addr, asan maybe not equal. + add_address_sanitizers(new_buf, new_size); buf = new_buf; if constexpr (clear_memory) @@ -395,10 +382,8 @@ class Allocator { // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); -#ifndef NDEBUG add_address_sanitizers(new_buf, new_size); remove_address_sanitizers(buf, old_size); -#endif free(buf, old_size); buf = new_buf; } diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index 9adb30b93054f4..1acc38f2b9e084 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -88,18 +88,18 @@ class CacheTest : public testing::Test { void* value; }; - class CacheTestSizePolicy : public LRUCachePolicyTrackingManual { + class CacheTestSizePolicy : public LRUCachePolicy { public: CacheTestSizePolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::FOR_UT_CACHE_SIZE, capacity, - LRUCacheType::SIZE, -1) {} + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT_CACHE_SIZE, capacity, + LRUCacheType::SIZE, -1) {} }; - class CacheTestNumberPolicy : public LRUCachePolicyTrackingManual { + class CacheTestNumberPolicy : public LRUCachePolicy { public: CacheTestNumberPolicy(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::FOR_UT_CACHE_NUMBER, - capacity, LRUCacheType::NUMBER, -1, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT_CACHE_NUMBER, capacity, + LRUCacheType::NUMBER, -1, num_shards) {} }; // there is 16 shards in ShardedLRUCache diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index d4624273b0b854..fad2116fca7630 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -167,8 +167,8 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { std::unique_ptr thread_context = std::make_unique(); std::shared_ptr t1 = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1"); - std::shared_ptr t2 = std::make_shared("UT-MultiMemTracker2", t1.get()); - std::shared_ptr t3 = std::make_shared("UT-MultiMemTracker3", t1.get()); + std::shared_ptr t2 = std::make_shared("UT-MultiMemTracker2"); + std::shared_ptr t3 = std::make_shared("UT-MultiMemTracker3"); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024;