From 75b36c24f28a964be1eaed608870c533ff9e63d6 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 12 Sep 2024 11:17:14 +0800 Subject: [PATCH] 3 --- be/src/cloud/cloud_tablet_mgr.cpp | 2 +- .../cloud/cloud_txn_delete_bitmap_cache.cpp | 4 +- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 2 +- be/src/http/default_path_handlers.cpp | 5 +- be/src/olap/memtable.cpp | 5 +- be/src/olap/memtable_memory_limiter.cpp | 5 +- be/src/olap/memtable_memory_limiter.h | 3 - be/src/olap/memtable_writer.cpp | 13 - be/src/olap/page_cache.h | 26 +- .../segment_v2/inverted_index_cache.cpp | 6 +- .../rowset/segment_v2/inverted_index_cache.h | 31 ++- be/src/olap/schema_cache.h | 7 +- be/src/olap/segment_loader.cpp | 6 +- be/src/olap/segment_loader.h | 11 +- be/src/olap/storage_engine.h | 8 +- be/src/olap/tablet_manager.cpp | 3 +- be/src/olap/tablet_meta.h | 9 +- be/src/olap/tablet_schema_cache.cpp | 4 +- be/src/olap/tablet_schema_cache.h | 9 +- be/src/olap/txn_manager.h | 9 +- be/src/runtime/exec_env.h | 9 +- be/src/runtime/exec_env_init.cpp | 13 +- be/src/runtime/load_channel_mgr.h | 9 +- .../memory/global_memory_arbitrator.cpp | 8 +- .../runtime/memory/global_memory_arbitrator.h | 12 +- be/src/runtime/memory/lru_cache_policy.h | 4 +- be/src/runtime/memory/mem_tracker.cpp | 87 ++---- be/src/runtime/memory/mem_tracker.h | 234 +++++----------- be/src/runtime/memory/mem_tracker_limiter.cpp | 114 ++++---- be/src/runtime/memory/mem_tracker_limiter.h | 253 ++++++++++++------ .../runtime/memory/thread_mem_tracker_mgr.h | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 3 +- .../workload_group/workload_group_manager.cpp | 4 +- be/src/service/point_query_executor.cpp | 10 +- be/src/service/point_query_executor.h | 12 +- be/src/util/obj_lru_cache.cpp | 6 +- be/src/util/obj_lru_cache.h | 8 +- be/test/olap/lru_cache_test.cpp | 6 +- 38 files changed, 421 insertions(+), 541 deletions(-) diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 0fe050d02dbd3da..e5c31785c1eb1c0 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -136,7 +136,7 @@ class CloudTabletMgr::TabletMap { CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) : _engine(engine), _tablet_map(std::make_unique()), - _cache(std::make_unique( + _cache(std::make_unique( CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index c6a3b54edc3f67f..cc27df13013c7cd 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -32,8 +32,8 @@ namespace doris { CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, - size_in_bytes, LRUCacheType::SIZE, 86400, 4), + : LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes, + LRUCacheType::SIZE, 86400, 4), _stop_latch(1) {} CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 75577ae2e3fee0a..206975c2f81b65c 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -30,7 +30,7 @@ namespace doris { // Record transaction related delete bitmaps using a lru cache. -class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { +class CloudTxnDeleteBitmapCache : public LRUCachePolicy { public: CloudTxnDeleteBitmapCache(size_t size_in_bytes); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index ded72d9f28f7df9..02f8b9f92c406de 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -142,7 +142,7 @@ void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson* // Registered to handle "/mem_tracker", and prints out memory tracker information. void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) { (*output) << "

Memory usage by subsystem

\n"; - std::vector snapshots; + std::vector snapshots; auto iter = args.find("type"); if (iter != args.end()) { if (iter->second == "global") { @@ -191,7 +191,6 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr (*output) << "" "Type" "Label" - "Parent Label" "Limit" "Current Consumption(Bytes)" @@ -208,7 +207,7 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr (*output) << strings::Substitute( "$0$1$2$3$4$5$6$7\n", - item.type, item.label, item.parent_label, limit_str, item.cur_consumption, + item.type, item.label, limit_str, item.cur_consumption, current_consumption_normalize, item.peak_consumption, peak_consumption_normalize); } (*output) << "\n"; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 4f66a361650875e..1c4718258748575 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -161,9 +161,8 @@ MemTable::~MemTable() { std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete()); _insert_mem_tracker->release(_mem_usage); _flush_mem_tracker->set_consumption(0); - DCHECK_EQ(_insert_mem_tracker->consumption(), 0) - << std::endl - << MemTracker::log_usage(_insert_mem_tracker->make_snapshot()); + DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl + << _insert_mem_tracker.log_usage(); DCHECK_EQ(_flush_mem_tracker->consumption(), 0); _arena.reset(); _agg_buffer_pool.clear(); diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 23b760284b8985e..ea045b1e53e30a6 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -62,10 +62,7 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100; g_load_hard_mem_limit.set_value(_load_hard_mem_limit); g_load_soft_mem_limit.set_value(_load_soft_mem_limit); - _memtable_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "MemTableTrackerSet"); - _mem_tracker = std::make_unique("AllMemTableMemory", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_unique("AllMemTableMemory"); REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _log_timer.start(); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 2e8271bab35c158..0aed433d5ad629d 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -45,7 +45,6 @@ class MemTableMemoryLimiter { void refresh_mem_tracker(); - MemTrackerLimiter* memtable_tracker_set() { return _memtable_tracker_set.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } int64_t mem_usage() const { return _mem_usage; } @@ -68,8 +67,6 @@ class MemTableMemoryLimiter { int64_t _write_mem_usage = 0; int64_t _active_mem_usage = 0; - // mem tracker collection of all mem tables. - std::shared_ptr _memtable_tracker_set; // sum of all mem table memory. std::unique_ptr _mem_tracker; int64_t _load_hard_mem_limit = -1; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 114a7841b922044..59916d5f1cc57b1 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -187,25 +187,12 @@ Status MemTableWriter::wait_flush() { } void MemTableWriter::_reset_mem_table() { -#ifndef BE_TEST - auto mem_table_insert_tracker = std::make_shared( - fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); - auto mem_table_flush_tracker = std::make_shared( - fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num++, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); -#else auto mem_table_insert_tracker = std::make_shared(fmt::format( "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string())); auto mem_table_flush_tracker = std::make_shared(fmt::format( "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, UniqueId(_req.load_id).to_string())); -#endif { std::lock_guard l(_mem_table_tracker_lock); _mem_table_insert_trackers.push_back(mem_table_insert_tracker); diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 09fc689959ce4cc..32b6683e7823b04 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -92,28 +92,28 @@ class StoragePageCache { } }; - class DataPageCache : public LRUCachePolicyTrackingAllocator { + class DataPageCache : public LRUCachePolicy { public: DataPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::data_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, + LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class IndexPageCache : public LRUCachePolicyTrackingAllocator { + class IndexPageCache : public LRUCachePolicy { public: IndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, LRUCacheType::SIZE, - config::index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, + LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class PKIndexPageCache : public LRUCachePolicyTrackingAllocator { + class PKIndexPageCache : public LRUCachePolicy { public: PKIndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, + LRUCacheType::SIZE, + config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} }; static constexpr uint32_t kDefaultNumShards = 16; @@ -164,7 +164,7 @@ class StoragePageCache { // delete bitmap in unique key with mow std::unique_ptr _pk_index_page_cache; - LRUCachePolicyTrackingAllocator* _get_page_cache(segment_v2::PageTypePB page_type) { + LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { return _data_page_cache.get(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index b2930d2867b05fd..e42c02860f5d004 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -135,9 +135,9 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrgetSizeInBytes(), - bitmap->getSizeInBytes(), CachePriority::NORMAL); + auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), bitmap->getSizeInBytes(), + CachePriority::NORMAL); *handle = InvertedIndexQueryCacheHandle(this, lru_handle); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 5423ea044a2e581..b80f2c01027b6ec 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -99,23 +99,23 @@ class InvertedIndexSearcherCache { private: InvertedIndexSearcherCache() = default; - class InvertedIndexSearcherCachePolicy : public LRUCachePolicyTrackingManual { + class InvertedIndexSearcherCachePolicy : public LRUCachePolicy { public: InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, true) {} InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity, CacheValueTimeExtractor cache_value_time_extractor, bool cache_value_check_timestamp) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, - LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, cache_value_time_extractor, + cache_value_check_timestamp, true) {} }; // Insert a cache entry by key. // And the cache entry will be returned in handle. @@ -179,9 +179,9 @@ class InvertedIndexCacheHandle { class InvertedIndexQueryCacheHandle; -class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { +class InvertedIndexQueryCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // cache key struct CacheKey { @@ -227,10 +227,9 @@ class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { InvertedIndexQueryCache() = delete; InvertedIndexQueryCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity, + LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, + num_shards) {} bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle); diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index 7bb18a59c349a04..68cd809ed226f40 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr; // eliminating the need for frequent allocation and deallocation during usage. // This caching mechanism proves immensely advantageous, particularly in scenarios // with high concurrency, where queries are executed simultaneously. -class SchemaCache : public LRUCachePolicyTrackingManual { +class SchemaCache : public LRUCachePolicy { public: static SchemaCache* instance(); @@ -86,9 +86,8 @@ class SchemaCache : public LRUCachePolicyTrackingManual { }; SchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::schema_cache_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER, + config::schema_cache_sweep_time_sec) {} private: static constexpr char SCHEMA_DELIMITER = '-'; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 12ab89af0be283a..fd7e3f476ad0829 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -40,9 +40,9 @@ bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, SegmentCacheHandle* handle) { - auto* lru_handle = LRUCachePolicyTrackingManual::insert( - key.encode(), &value, value.segment->meta_mem_usage(), value.segment->meta_mem_usage(), - CachePriority::NORMAL); + auto* lru_handle = + LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(), + value.segment->meta_mem_usage(), CachePriority::NORMAL); handle->push_segment(this, lru_handle); } diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 5bb8fae3c418775..d177024242db33c 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -55,9 +55,9 @@ class BetaRowset; // Make sure that cache_handle is valid during the segment usage period. using BetaRowsetSharedPtr = std::shared_ptr; -class SegmentCache : public LRUCachePolicyTrackingManual { +class SegmentCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key or segment lru cache struct CacheKey { CacheKey(RowsetId rowset_id_, int64_t segment_id_) @@ -81,10 +81,9 @@ class SegmentCache : public LRUCachePolicyTrackingManual { }; SegmentCache(size_t memory_bytes_limit, size_t segment_num_limit) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SEGMENT_CACHE, - memory_bytes_limit, LRUCacheType::SIZE, - config::tablet_rowset_stale_sweep_time_sec, - DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} + : LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, memory_bytes_limit, + LRUCacheType::SIZE, config::tablet_rowset_stale_sweep_time_sec, + DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} // Lookup the given segment in the cache. // If the segment is found, the cache entry will be written into handle. diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d7ccd4597d6ef31..b2a313adcdbb7ee 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -540,7 +540,7 @@ class StorageEngine final : public BaseStorageEngine { // lru cache for create tabelt round robin in disks // key: partitionId_medium // value: index -class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { +class CreateTabletIdxCache : public LRUCachePolicy { public: // get key, delimiter with DELIMITER '-' static std::string get_key(int64_t partition_id, TStorageMedium::type medium) { @@ -558,9 +558,9 @@ class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { }; CreateTabletIdxCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, - capacity, LRUCacheType::NUMBER, - /*stale_sweep_time_s*/ 30 * 60) {} + : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity, + LRUCacheType::NUMBER, + /*stale_sweep_time_s*/ 30 * 60) {} }; struct DirInfo { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index bc883185465629f..5b304114017090b 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -90,8 +90,7 @@ bvar::Adder g_tablet_meta_schema_columns_count("tablet_meta_schema_colu TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) : _engine(engine), - _tablet_meta_mem_tracker(std::make_shared( - "TabletMeta(experimental)", ExecEnv::GetInstance()->details_mem_tracker_set())), + _tablet_meta_mem_tracker(std::make_shared("TabletMeta(experimental)")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 74ab71d0586fa01..f754f885abe639b 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -520,13 +520,12 @@ class DeleteBitmap { void remove_sentinel_marks(); - class AggCachePolicy : public LRUCachePolicyTrackingManual { + class AggCachePolicy : public LRUCachePolicy { public: AggCachePolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, - capacity, LRUCacheType::SIZE, - config::delete_bitmap_agg_cache_stale_sweep_time_sec, - 256) {} + : LRUCachePolicy(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity, + LRUCacheType::SIZE, + config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {} }; class AggCache { diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 51618f590a7dd2e..e339c947bb97a4e 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -40,8 +40,8 @@ std::pair TabletSchemaCache::insert(const std: pb.ParseFromString(key); tablet_schema_ptr->init_from_pb(pb); value->tablet_schema = tablet_schema_ptr; - lru_handle = LRUCachePolicyTrackingManual::insert( - key, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL); + lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0, + CachePriority::NORMAL); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); } diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 10462804ed20124..e18892a3ca5f062 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -23,14 +23,13 @@ namespace doris { -class TabletSchemaCache : public LRUCachePolicyTrackingManual { +class TabletSchemaCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; TabletSchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::tablet_schema_cache_recycle_interval) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, + LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {} static TabletSchemaCache* create_global_schema_cache(size_t capacity) { auto* res = new TabletSchemaCache(capacity); diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5944bbf0fc31368..88ee97c5f6a3b90 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -282,13 +282,12 @@ class TxnManager { void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); - class TabletVersionCache : public LRUCachePolicyTrackingManual { + class TabletVersionCache : public LRUCachePolicy { public: TabletVersionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_VERSION_CACHE, - capacity, LRUCacheType::NUMBER, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, + LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; private: diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 98d82f274e746de..8eba1b1d3b72e44 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -171,11 +171,13 @@ class ExecEnv { // Each group corresponds to several MemTrackerLimiters and has a lock. // Multiple groups are used to reduce the impact of locks. std::vector mem_tracker_limiter_pool; + std::unordered_map mem_type_sum; void init_mem_tracker(); std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } - MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); } std::shared_ptr page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } - MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } + std::shared_ptr brpc_iobuf_block_memory_tracker() { + return _brpc_iobuf_block_memory_tracker; + } std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } @@ -354,10 +356,9 @@ class ExecEnv { // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; - std::shared_ptr _details_mem_tracker_set; // page size not in cache, data page/index page/etc. std::shared_ptr _page_no_cache_mem_tracker; - std::shared_ptr _brpc_iobuf_block_memory_tracker; + std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; std::shared_ptr _stream_load_pipe_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 834e402ca1ebb4a..738f994f2c1d0ff 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -584,15 +584,18 @@ Status ExecEnv::_init_mem_env() { void ExecEnv::init_mem_tracker() { mem_tracker_limiter_pool.resize(MEM_TRACKER_GROUP_NUM, TrackerLimiterGroup()); // before all mem tracker init. + mem_type_sum.emplace(MemTrackerLimiter::Type::GLOBAL, MemTracker()); + mem_type_sum.emplace(MemTrackerLimiter::Type::QUERY, MemTracker()); + mem_type_sum.emplace(MemTrackerLimiter::Type::LOAD, MemTracker()); + mem_type_sum.emplace(MemTrackerLimiter::Type::COMPACTION, MemTracker()); + mem_type_sum.emplace(MemTrackerLimiter::Type::SCHEMA_CHANGE, MemTracker()); + mem_type_sum.emplace(MemTrackerLimiter::Type::OTHER, MemTracker()); _s_tracking_memory = true; _orphan_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _details_mem_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet"); - _page_no_cache_mem_tracker = - std::make_shared("PageNoCache", _details_mem_tracker_set.get()); + _page_no_cache_mem_tracker = std::make_shared("PageNoCache"); _brpc_iobuf_block_memory_tracker = - std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory"); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); _point_query_executor_mem_tracker = diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9c8f4c2a0f3cce..94bd210f262557f 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -72,13 +72,12 @@ class LoadChannelMgr { Status _start_bg_worker(); - class LastSuccessChannelCache : public LRUCachePolicyTrackingManual { + class LastSuccessChannelCache : public LRUCachePolicy { public: LastSuccessChannelCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, - capacity, LRUCacheType::SIZE, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity, + LRUCacheType::SIZE, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; protected: diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 344bcbc59846d9c..be0aa54cc7d1229 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -24,7 +24,7 @@ namespace doris { std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock; -std::unordered_map GlobalMemoryArbitrator::_reserved_trackers; +std::unordered_map GlobalMemoryArbitrator::_reserved_trackers; bvar::PassiveStatus g_vm_rss_sub_allocator_cache( "meminfo_vm_rss_sub_allocator_cache", @@ -57,7 +57,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { std::memory_order_relaxed)); { std::lock_guard l(_reserved_trackers_lock); - _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes); + _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].consume(bytes); } return true; } @@ -72,8 +72,8 @@ void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) { DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes; return; } - _reserved_trackers[label].sub(bytes); - if (_reserved_trackers[label].current_value() == 0) { + _reserved_trackers[label].release(bytes); + if (_reserved_trackers[label].consumption() == 0) { _reserved_trackers.erase(it); } } diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index f8fda18d0e9a0c3..2a845370a1c6ebf 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -17,7 +17,7 @@ #pragma once -#include "runtime/memory/mem_tracker.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" namespace doris { @@ -107,15 +107,15 @@ class GlobalMemoryArbitrator { static void release_process_reserved_memory(int64_t bytes); static inline void make_reserved_memory_snapshots( - std::vector* snapshots) { + std::vector* snapshots) { std::lock_guard l(_reserved_trackers_lock); for (const auto& pair : _reserved_trackers) { - MemTracker::Snapshot snapshot; + MemTrackerLimiter::Snapshot snapshot; snapshot.type = "reserved_memory"; snapshot.label = pair.first; snapshot.limit = -1; - snapshot.cur_consumption = pair.second.current_value(); - snapshot.peak_consumption = pair.second.peak_value(); + snapshot.cur_consumption = pair.second.consumption(); + snapshot.peak_consumption = pair.second.peak_consumption(); (*snapshots).emplace_back(snapshot); } } @@ -177,7 +177,7 @@ class GlobalMemoryArbitrator { static std::atomic _s_process_reserved_memory; static std::mutex _reserved_trackers_lock; - static std::unordered_map _reserved_trackers; + static std::unordered_map _reserved_trackers; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 1b6c9ead6d00867..20aac2249b319ab 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -237,6 +237,7 @@ class LRUCachePolicyTrackingAllocator : public LRUCachePolicy { return _mem_tracker->consumption(); } + // No need to track memory manually, it is usually already tracked in the Allocator. Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, CachePriority priority = CachePriority::NORMAL) override { return _cache->insert(key, value, charge, priority); @@ -302,8 +303,7 @@ class LRUCachePolicyTrackingManual : public LRUCachePolicy { private: void _init_mem_tracker(const std::string& type_name) { _mem_tracker = - std::make_unique(fmt::format("{}[{}]", type_string(_type), type_name), - ExecEnv::GetInstance()->details_mem_tracker_set()); + std::make_unique(fmt::format("{}[{}]", type_string(_type), type_name)); } // LRUCacheType::SIZE equal to total_size. diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 5655d6ef02e65d0..e0cabaf2796c403 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -20,94 +20,43 @@ #include "runtime/memory/mem_tracker.h" +#include #include #include -#include "bvar/bvar.h" -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/thread_context.h" - namespace doris { +constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000; bvar::Adder g_memtracker_cnt("memtracker_cnt"); -// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter. -// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together. -// Each group corresponds to several MemTrackerLimiters and has a lock. +// Each group corresponds to several MemCountes and has a lock. // Multiple groups are used to reduce the impact of locks. -std::vector MemTracker::mem_tracker_pool(1000); +std::vector MemTracker::trackers_group_pool(MEM_TRACKERS_GROUP_NUM); -MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { - bind_parent(parent); +MemTracker::MemTracker() { + g_memtracker_cnt << 1; } -void MemTracker::bind_parent(MemTrackerLimiter* parent) { - if (parent) { - _type = parent->type(); - _parent_label = parent->label(); - _parent_group_num = parent->group_num(); - } else { - _type = thread_context()->thread_mem_tracker()->type(); - _parent_label = thread_context()->thread_mem_tracker()->label(); - _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); - } +MemTracker::MemTracker(const std::string& label) : MemTracker() { + _label = label; + _group_num = random() % MEM_TRACKERS_GROUP_NUM; { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.insert( - mem_tracker_pool[_parent_group_num].trackers.end(), this); + std::lock_guard l(trackers_group_pool[_group_num].group_lock); + _trackers_group_it = trackers_group_pool[_group_num].trackers.insert( + trackers_group_pool[_group_num].trackers.end(), this); } - g_memtracker_cnt << 1; } MemTracker::~MemTracker() { - if (_parent_group_num != -1) { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - if (_tracker_group_it != mem_tracker_pool[_parent_group_num].trackers.end()) { - mem_tracker_pool[_parent_group_num].trackers.erase(_tracker_group_it); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.end(); - } - g_memtracker_cnt << -1; - } -} - -MemTracker::Snapshot MemTracker::make_snapshot() const { - Snapshot snapshot; - snapshot.type = type_string(_type); - snapshot.label = _label; - snapshot.parent_label = _parent_label; - snapshot.limit = -1; - snapshot.cur_consumption = consumption(); - snapshot.peak_consumption = peak_consumption(); - return snapshot; -} - -void MemTracker::make_group_snapshot(std::vector* snapshots, - int64_t group_num, std::string parent_label) { - std::lock_guard l(mem_tracker_pool[group_num].group_lock); - for (auto* tracker : mem_tracker_pool[group_num].trackers) { - if (tracker->parent_label() == parent_label && tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } - } -} - -void MemTracker::make_all_trackers_snapshots(std::vector* snapshots) { - for (auto& i : mem_tracker_pool) { - std::lock_guard l(i.group_lock); - for (auto* tracker : i.trackers) { - if (tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } + { + std::lock_guard l(trackers_group_pool[_group_num].group_lock); + if (_trackers_group_it != trackers_group_pool[_group_num].trackers.end()) { + trackers_group_pool[_group_num].trackers.erase(_trackers_group_it); + _trackers_group_it = trackers_group_pool[_group_num].trackers.end(); } } -} - -std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption), - snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), - snapshot.peak_consumption); + g_memtracker_cnt << -1; } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 3f0932a891e5211..b19fdb2be0f44bb 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -15,216 +15,110 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h -// and modified by Doris #pragma once #include -#include #include +#include // IWYU pragma: no_include #include // IWYU pragma: keep -#include -#include -#include #include -#include #include "common/compiler_util.h" // IWYU pragma: keep -#include "runtime/query_statistics.h" #include "util/pretty_printer.h" namespace doris { -class MemTrackerLimiter; - -// Used to track memory usage. -// -// MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, -// which will automatically track all memory usage of the code segment where it is located. -// -// This class is thread-safe. +/* + * A tracker that keeps track of the current and peak memory usage seen. + * Relaxed ordering, not accurate in real time. + * + * can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, + * which will automatically track all memory usage of the code segment where it is located. + * + * This class is thread-safe. +*/ class MemTracker { public: - struct Snapshot { - std::string type; - std::string label; - std::string parent_label; - int64_t limit = 0; - int64_t cur_consumption = 0; - int64_t peak_consumption = 0; - - bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } - }; + MemTracker(); + MemTracker(const std::string& label); + ~MemTracker(); - struct TrackerGroup { - std::list trackers; - std::mutex group_lock; - }; - - enum class Type { - GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan - QUERY = 1, // Count the memory consumption of all Query tasks. - LOAD = 2, // Count the memory consumption of all Load tasks. - COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. - SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. - OTHER = 5 - }; - - static std::string type_string(Type type) { - switch (type) { - case Type::GLOBAL: - return "global"; - case Type::QUERY: - return "query"; - case Type::LOAD: - return "load"; - case Type::COMPACTION: - return "compaction"; - case Type::SCHEMA_CHANGE: - return "schema_change"; - case Type::OTHER: - return "other"; - default: - LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); + void consume(int64_t delta) { + if (UNLIKELY(delta == 0)) { + return; } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); + int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; + update_peak(value); } - // A counter that keeps track of the current and peak value seen. - // Relaxed ordering, not accurate in real time. - class MemCounter { - public: - MemCounter() : _current_value(0), _peak_value(0) {} - - void add(int64_t delta) { - int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; - update_peak(value); - } - - void add_no_update_peak(int64_t delta) { - _current_value.fetch_add(delta, std::memory_order_relaxed); - } + void consume_no_update_peak(int64_t delta) { // need extreme fast + _current_value.fetch_add(delta, std::memory_order_relaxed); + } - bool try_add(int64_t delta, int64_t max) { - int64_t cur_val = _current_value.load(std::memory_order_relaxed); - int64_t new_val = 0; - do { - new_val = cur_val + delta; - if (UNLIKELY(new_val > max)) { - return false; - } - } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, - std::memory_order_relaxed))); - update_peak(new_val); + bool try_consume(int64_t delta, int64_t max) { + if (UNLIKELY(delta == 0)) { return true; } - - void sub(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } - - void set(int64_t v) { - _current_value.store(v, std::memory_order_relaxed); - update_peak(v); - } - - void update_peak(int64_t value) { - int64_t pre_value = _peak_value.load(std::memory_order_relaxed); - while (value > pre_value && !_peak_value.compare_exchange_weak( - pre_value, value, std::memory_order_relaxed)) { + int64_t cur_val = _current_value.load(std::memory_order_relaxed); + int64_t new_val = 0; + do { + new_val = cur_val + delta; + if (UNLIKELY(new_val > max)) { + return false; } - } - - int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } - int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } + } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, + std::memory_order_relaxed))); + update_peak(new_val); + return true; + } - private: - std::atomic _current_value; - std::atomic _peak_value; - }; + void release(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } - // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); + void set_consumption(int64_t v) { + _current_value.store(v, std::memory_order_relaxed); + update_peak(v); + } - virtual ~MemTracker(); + // Returns the memory consumed in bytes. + int64_t consumption() const { return _current_value.load(std::memory_order_relaxed); } + int64_t peak_consumption() const { return _peak_value.load(std::memory_order_relaxed); } static std::string print_bytes(int64_t bytes) { return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); } -public: - Type type() const { return _type; } const std::string& label() const { return _label; } - const std::string& parent_label() const { return _parent_label; } - const std::string& set_parent_label() const { return _parent_label; } - // Returns the memory consumed in bytes. - int64_t consumption() const { return _consumption.current_value(); } - int64_t peak_consumption() const { return _consumption.peak_value(); } - - void consume(int64_t bytes) { - if (UNLIKELY(bytes == 0)) { - return; - } - _consumption.add(bytes); - if (_query_statistics) { - _query_statistics->set_max_peak_memory_bytes(peak_consumption()); - _query_statistics->set_current_used_memory_bytes(consumption()); - } - } - - void consume_no_update_peak(int64_t bytes) { // need extreme fast - _consumption.add_no_update_peak(bytes); + std::string log_usage() const { + return fmt::format("MemTracker Lame={}, Used={}({} B), Peak={}({} B)", + print_bytes(consumption()), consumption(), print_bytes(peak_consumption()), + peak_consumption()); } - void release(int64_t bytes) { _consumption.sub(bytes); } - - void set_consumption(int64_t bytes) { _consumption.set(bytes); } - - std::shared_ptr get_query_statistics() { return _query_statistics; } - -public: - virtual Snapshot make_snapshot() const; - // Specify group_num from mem_tracker_pool to generate snapshot. - static void make_group_snapshot(std::vector* snapshots, int64_t group_num, - std::string parent_label); - static void make_all_trackers_snapshots(std::vector* snapshots); - static std::string log_usage(MemTracker::Snapshot snapshot); - - virtual std::string debug_string() { - std::stringstream msg; - msg << "label: " << _label << "; " - << "consumption: " << consumption() << "; " - << "peak_consumption: " << peak_consumption() << "; "; - return msg.str(); +private: + void update_peak(int64_t value) { + int64_t pre_value = _peak_value.load(std::memory_order_relaxed); + while (value > pre_value && + !_peak_value.compare_exchange_weak(pre_value, value, std::memory_order_relaxed)) { + } } -protected: - // Only used by MemTrackerLimiter - MemTracker() { _parent_group_num = -1; } - - void bind_parent(MemTrackerLimiter* parent); - - Type _type; - - // label used in the make snapshot, not guaranteed unique. - std::string _label; - - MemCounter _consumption; + std::string _label {"None"}; + std::atomic _current_value {0}; + std::atomic _peak_value {0}; - // Tracker is located in group num in mem_tracker_pool - int64_t _parent_group_num = 0; - - // Use _parent_label to correlate with parent limiter tracker. - std::string _parent_label = "-"; - - static std::vector mem_tracker_pool; - - // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. - std::list::iterator _tracker_group_it; + struct TrackersGroup { + std::list trackers; + std::mutex group_lock; + }; - std::shared_ptr _query_statistics = nullptr; + static std::vector trackers_group_pool; + // Group number in trackers_group_pool, generated by the timestamp. + int64_t _group_num; + // Iterator into trackers_group_pool for this object. Stored to have O(1) remove. + std::list::iterator _trackers_group_it; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 121bda983e2bd5f..ea6d49069fe0f45 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -19,14 +19,12 @@ #include #include -#include #include #include #include #include -#include "bvar/bvar.h" #include "common/config.h" #include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" @@ -37,9 +35,7 @@ #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" -#include "util/pretty_printer.h" #include "util/runtime_profile.h" -#include "util/stack_util.h" namespace doris { @@ -82,7 +78,7 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ if (_type == Type::GLOBAL) { _group_num = 0; } else { - _group_num = random() % 999 + 1; + _group_num = random() % MEM_TRACKER_GROUP_NUM + 1; } // currently only select/load need runtime query statistics @@ -129,23 +125,21 @@ MemTrackerLimiter::~MemTrackerLimiter() { // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. #ifndef NDEBUG if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { - std::string err_msg = - fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", - label(), consumption(), peak_consumption(), - mem_tracker_inaccurate_msg); + std::string err_msg = fmt::format( + "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), + consumption(), peak_consumption(), mem_tracker_inaccurate_msg); LOG(FATAL) << err_msg << print_address_sanitizers(); } #endif if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption()); } - _consumption.set(0); + _counter.set_consumption(0); #ifndef NDEBUG } else if (!_address_sanitizers.empty() && !is_group_commit_load) { LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " << ", mem tracker label: " << _label - << ", peak consumption: " << peak_consumption() - << print_address_sanitizers(); + << ", peak consumption: " << peak_consumption() << print_address_sanitizers(); #endif } memory_memtrackerlimiter_cnt << -1; @@ -161,9 +155,9 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, consumption(), peak_consumption(), - buf, size, it->first, it->second.size, - get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -184,8 +178,8 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, consumption(), peak_consumption(), buf, - size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } _address_sanitizers.erase(buf); @@ -207,8 +201,8 @@ std::string MemTrackerLimiter::print_address_sanitizers() { auto msg = fmt::format( "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", - _label, consumption(), peak_consumption(), it.first, - it.second.size, it.second.stack_trace); + _label, consumption(), peak_consumption(), it.first, it.second.size, + it.second.stack_trace); LOG(INFO) << msg; detail += msg; } @@ -221,7 +215,7 @@ std::string MemTrackerLimiter::print_address_sanitizers() { } #endif -MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { +MemTrackerLimiter::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); snapshot.label = _label; @@ -247,7 +241,7 @@ void MemTrackerLimiter::refresh_global_counter() { } int64_t all_trackers_mem_sum = 0; for (auto it : type_mem_sum) { - MemTrackerLimiter::TypeMemSum[it.first]->set(it.second); + ExecEnv::GetInstance()->mem_type_sum[it.first].set_consumption(it.second); all_trackers_mem_sum += it.second; switch (it.first) { case Type::GLOBAL: @@ -299,18 +293,18 @@ void MemTrackerLimiter::clean_tracker_limiter_group() { #endif } -void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { MemTrackerLimiter::refresh_global_counter(); int64_t all_trackers_mem_sum = 0; Snapshot snapshot; - for (auto it : MemTrackerLimiter::TypeMemSum) { + for (const auto& it : ExecEnv::GetInstance()->mem_type_sum) { snapshot.type = "overview"; snapshot.label = type_string(it.first); snapshot.limit = -1; - snapshot.cur_consumption = it.second->current_value(); - snapshot.peak_consumption = it.second->peak_value(); + snapshot.cur_consumption = it.second.consumption(); + snapshot.peak_consumption = it.second.peak_consumption(); (*snapshots).emplace_back(snapshot); - all_trackers_mem_sum += it.second->current_value(); + all_trackers_mem_sum += it.second.consumption(); } snapshot.type = "overview"; @@ -362,7 +356,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector (*snapshots).emplace_back(snapshot); } -void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, MemTrackerLimiter::Type type) { if (type == Type::GLOBAL) { std::lock_guard l( @@ -371,7 +365,6 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); } } } else { @@ -382,17 +375,15 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr && tracker->type() == type) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), - tracker->label()); } } } } } -void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, int top_num) { - std::priority_queue max_pq; + std::priority_queue max_pq; // not include global type. for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { std::lock_guard l( @@ -412,7 +403,7 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { std::lock_guard l(i.group_lock); for (auto trackerWptr : i.trackers) { @@ -424,25 +415,24 @@ void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_memory_state_snapshots(std::vector* snapshots) { make_process_snapshots(snapshots); make_all_trackers_snapshots(snapshots); - MemTracker::make_all_trackers_snapshots(snapshots); } -std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format( - "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.type, print_bytes(snapshot.limit), snapshot.limit, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); +std::string MemTrackerLimiter::log_usage(Snapshot snapshot) { + return fmt::format("MemTracker Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", + snapshot.label, snapshot.type, MemTracker::print_bytes(snapshot.limit), + snapshot.limit, MemTracker::print_bytes(snapshot.cur_consumption), + snapshot.cur_consumption, MemTracker::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } -std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) { +std::string MemTrackerLimiter::type_log_usage(Snapshot snapshot) { return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); + MemTracker::print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, + MemTracker::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type type) { @@ -466,16 +456,6 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string detail = msg; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); detail += "\nMemory Tracker Summary: " + log_usage(); - std::string child_trackers_usage; - std::vector snapshots; - MemTracker::make_group_snapshot(&snapshots, _group_num, _label); - for (const auto& snapshot : snapshots) { - child_trackers_usage += "\n " + MemTracker::log_usage(snapshot); - } - if (!child_trackers_usage.empty()) { - detail += child_trackers_usage; - } - LOG(WARNING) << detail; } } @@ -483,25 +463,23 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string MemTrackerLimiter::log_process_usage_str() { std::string detail; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); - std::vector snapshots; + std::vector snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); MemTrackerLimiter::make_top_consumption_snapshots(&snapshots, 15); - // Add additional tracker printed when memory exceeds limit. - snapshots.emplace_back( - ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot()); - detail += "\nMemory Tracker Summary:"; for (const auto& snapshot : snapshots) { - if (snapshot.label.empty() && snapshot.parent_label.empty()) { + if (snapshot.label.empty()) { detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label.empty()) { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } else { - detail += "\n " + MemTracker::log_usage(snapshot); + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } } + + // Add additional tracker printed when memory exceeds limit. + detail += "\n " + + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->log_usage(); return detail; } @@ -517,8 +495,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { std::string err_msg = fmt::format( "memory tracker limit exceeded, tracker label:{}, type:{}, limit " "{}, peak used {}, current used {}. backend {}, {}.", - label(), type_string(_type), print_bytes(limit()), - print_bytes(peak_consumption()), print_bytes(consumption()), + label(), type_string(_type), MemTracker::print_bytes(limit()), + MemTracker::print_bytes(peak_consumption()), MemTracker::print_bytes(consumption()), BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( @@ -543,7 +521,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, "Process memory not enough, cancel top memory used {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemTracker::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); @@ -664,7 +642,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, "Process memory not enough, cancel top memory overcommit {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemTracker::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 2afb3307a0bd5e1..62b570239784ccf 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -20,15 +20,14 @@ #include #include #include -#include #include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include #include #include -#include #include #include #include @@ -36,12 +35,14 @@ #include "common/config.h" #include "common/status.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/query_statistics.h" #include "util/string_util.h" #include "util/uid_util.h" namespace doris { class RuntimeProfile; +class MemTrackerLimiter; constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; @@ -64,96 +65,82 @@ struct TrackerLimiterGroup { // Automatically track every once malloc/free of the system memory allocator (Currently, based on TCMlloc hook). // Put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,all memory used by this thread // will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default. -class MemTrackerLimiter final : public MemTracker { +class MemTrackerLimiter final { public: + /* + * Part 1, Type definition + */ + // TODO There are more and more GC codes and there should be a separate manager class. enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 }; - inline static std::unordered_map> TypeMemSum = { - {Type::GLOBAL, std::make_shared()}, - {Type::QUERY, std::make_shared()}, - {Type::LOAD, std::make_shared()}, - {Type::COMPACTION, std::make_shared()}, - {Type::SCHEMA_CHANGE, std::make_shared()}, - {Type::OTHER, std::make_shared()}}; + enum class Type { + GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan + QUERY = 1, // Count the memory consumption of all Query tasks. + LOAD = 2, // Count the memory consumption of all Load tasks. + COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. + SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. + OTHER = 5 + }; + + struct Snapshot { + std::string type; + std::string label; + int64_t limit = 0; + int64_t cur_consumption = 0; + int64_t peak_consumption = 0; + + bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } + }; + + /* + * Part 2, Constructors and property methods + */ -public: static std::shared_ptr create_shared( MemTrackerLimiter::Type type, const std::string& label = std::string(), int64_t byte_limit = -1); // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); - ~MemTrackerLimiter() override; - - static std::string gc_type_string(GCType type) { - switch (type) { - case GCType::PROCESS: - return "process"; - case GCType::WORK_LOAD_GROUP: - return "work load group"; - default: - LOG(FATAL) << "not match gc type:" << static_cast(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } + ~MemTrackerLimiter(); - void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } + Type type() const { return _type; } + const std::string& label() const { return _label; } + std::shared_ptr get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); } - - bool try_consume(int64_t bytes) { - if (UNLIKELY(bytes == 0)) { - return true; - } - bool st = true; - if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) { - st = _consumption.try_add(bytes, _limit); - } else { - _consumption.add(bytes); - } - if (st && _query_statistics) { - _query_statistics->set_max_peak_memory_bytes(peak_consumption()); - _query_statistics->set_current_used_memory_bytes(consumption()); - } - return st; - } - Status check_limit(int64_t bytes = 0); bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } - bool is_query_cancelled() { return _is_query_cancelled; } - void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } -public: - // Transfer 'bytes' of consumption from this tracker to 'dst'. - void transfer_to(int64_t size, MemTrackerLimiter* dst) { - if (label() == dst->label()) { - return; - } - cache_consume(-size); - dst->cache_consume(size); - } + int64_t consumption() const { return _counter.consumption(); } + int64_t peak_consumption() const { return _counter.peak_consumption(); } + + // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. + std::list>::iterator wg_tracker_limiter_group_it; + + /* + * Part 3, Memory snapshot and log method + */ static void refresh_global_counter(); static void clean_tracker_limiter_group(); - Snapshot make_snapshot() const override; + Snapshot make_snapshot() const; // Returns a list of all the valid tracker snapshots. - static void make_process_snapshots(std::vector* snapshots); - static void make_type_snapshots(std::vector* snapshots, Type type); - static void make_all_trackers_snapshots(std::vector* snapshots); - static void make_all_memory_state_snapshots(std::vector* snapshots); - static void make_top_consumption_snapshots(std::vector* snapshots, - int top_num); - - static std::string log_usage(MemTracker::Snapshot snapshot); + static void make_process_snapshots(std::vector* snapshots); + static void make_type_snapshots(std::vector* snapshots, Type type); + static void make_all_trackers_snapshots(std::vector* snapshots); + static void make_all_memory_state_snapshots(std::vector* snapshots); + static void make_top_consumption_snapshots(std::vector* snapshots, int top_num); + + static std::string log_usage(Snapshot snapshot); std::string log_usage() const { return log_usage(make_snapshot()); } - static std::string type_log_usage(MemTracker::Snapshot snapshot); + static std::string type_log_usage(Snapshot snapshot); static std::string type_detail_usage(const std::string& msg, Type type); void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } @@ -161,6 +148,12 @@ class MemTrackerLimiter final : public MemTracker { static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } static std::string log_process_usage_str(); static void print_log_process_usage(); + // Log the memory usage when memory limit is exceeded. + std::string tracker_limit_exceeded_str(); + + /* + * Part 4, Memory GC method + */ // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. // cancel_reason recorded when gc is triggered, for log printing. @@ -191,19 +184,9 @@ class MemTrackerLimiter final : public MemTracker { return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD); } - // only for Type::QUERY or Type::LOAD. - static TUniqueId label_to_queryid(const std::string& label) { - if (label.find("#Id=") == std::string::npos) { - return {}; - } - auto queryid = split(label, "#Id=")[1]; - TUniqueId querytid; - parse_id(queryid, &querytid); - return querytid; - } - - // Log the memory usage when memory limit is exceeded. - std::string tracker_limit_exceeded_str(); + /* + * Part 5, Memory debug method + */ #ifndef NDEBUG void add_address_sanitizers(void* buf, size_t size); @@ -212,7 +195,7 @@ class MemTrackerLimiter final : public MemTracker { bool is_group_commit_load {false}; #endif - std::string debug_string() override { + std::string debug_string() { std::stringstream msg; msg << "limit: " << _limit << "; " << "consumption: " << consumption() << "; " @@ -221,12 +204,50 @@ class MemTrackerLimiter final : public MemTracker { return msg.str(); } - // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list>::iterator wg_tracker_limiter_group_it; -private: friend class ThreadMemTrackerMgr; + /* + * Part 6, Memory counting method + * + * Note! : Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release(). + * Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or + * SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER. + */ + void consume(int64_t bytes) { + _counter.consume(bytes); + if (_query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); + } + } + + void consume_no_update_peak(int64_t bytes) { _counter.consume_no_update_peak(bytes); } + + void release(int64_t bytes) { _counter.release(bytes); } + + bool try_consume(int64_t bytes) { + bool st = true; + if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) { + st = _counter.try_consume(bytes, _limit); + } else { + _counter.consume(bytes); + } + if (st && _query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); + } + return st; + } + // Transfer 'bytes' of consumption from this tracker to 'dst'. + void transfer_to(int64_t size, MemTrackerLimiter* dst) { + if (label() == dst->label()) { + return; + } + cache_consume(-size); + dst->cache_consume(size); + } + // If need to consume the tracker frequently, use it void cache_consume(int64_t bytes); @@ -236,6 +257,66 @@ class MemTrackerLimiter final : public MemTracker { int64_t add_untracked_mem(int64_t bytes); private: + /* + * Part 7, Private method + */ + + static std::string type_string(Type type) { + switch (type) { + case Type::GLOBAL: + return "global"; + case Type::QUERY: + return "query"; + case Type::LOAD: + return "load"; + case Type::COMPACTION: + return "compaction"; + case Type::SCHEMA_CHANGE: + return "schema_change"; + case Type::OTHER: + return "other"; + default: + LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + static std::string gc_type_string(GCType type) { + switch (type) { + case GCType::PROCESS: + return "process"; + case GCType::WORK_LOAD_GROUP: + return "work load group"; + default: + LOG(FATAL) << "not match gc type:" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + // only for Type::QUERY or Type::LOAD. + static TUniqueId label_to_queryid(const std::string& label) { + if (label.find("#Id=") == std::string::npos) { + return {}; + } + auto queryid = split(label, "#Id=")[1]; + TUniqueId querytid; + parse_id(queryid, &querytid); + return querytid; + } + + /* + * Part 8, Property definition + */ + + Type _type; + + // label used in the make snapshot, not guaranteed unique. + std::string _label; + + MemTracker _counter; + // Limit on memory consumption, in bytes. int64_t _limit; @@ -253,6 +334,8 @@ class MemTrackerLimiter final : public MemTracker { bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + std::shared_ptr _query_statistics = nullptr; + #ifndef NDEBUG struct AddressSanitizer { size_t size; @@ -274,7 +357,9 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { } inline void MemTrackerLimiter::cache_consume(int64_t bytes) { - if (bytes == 0) return; + if (bytes == 0) { + return; + } int64_t consume_bytes = add_untracked_mem(bytes); consume(consume_bytes); } @@ -285,7 +370,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { } if (_limit > 0 && consumption() + bytes > _limit) { return Status::MemoryLimitExceeded(fmt::format( - "failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str())); + "failed alloc size {}, {}", MemTracker::print_bytes(bytes), tracker_limit_exceeded_str())); } return Status::OK(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index bb0091f2e6d6fb0..4f67e1ea176dfdf 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -106,7 +106,7 @@ class ThreadMemTrackerMgr { std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; for (const auto& v : _consumer_tracker_stack) { - fmt::format_to(consumer_tracker_buf, "{}, ", MemTracker::log_usage(v->make_snapshot())); + fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage()); } return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b9bd01095f34e86..c8b8c216c8b641e 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -264,8 +264,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, const TQueryOptions& query_options) { _query_id = query_id; - _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (runtime_filter_params.__isset.rid_to_runtime_filter) { for (const auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 393d327e7c42ca4..56aff72bf3b75c4 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -232,9 +232,9 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { // check whether queries need to revoke memory for task group for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( - "\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, " + "\n MemTracker Label={}, Used={}, SpillThreshold={}, " "Peak={}", - query_mem_tracker->label(), query_mem_tracker->parent_label(), + query_mem_tracker->label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), PrettyPrinter::print(query_spill_threshold, TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 8058e1f1be63024..ef16ec9e6f6914c 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -191,9 +191,9 @@ LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capa } RowCache::RowCache(int64_t capacity, int num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, LRUCacheType::SIZE, - config::point_query_row_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, + LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, + num_shards) {} // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { @@ -223,8 +223,8 @@ void RowCache::insert(const RowCacheKey& key, const Slice& value) { auto* row_cache_value = new RowCacheValue; row_cache_value->cache_value = cache_value; const std::string& encoded_key = key.encode(); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, row_cache_value, value.size, - value.size, CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, + CachePriority::NORMAL); // handle will released auto tmp = CacheHandle {this, handle}; } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 19954479c97ec78..02e5cace0dceda8 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -126,9 +126,9 @@ class Reusable { }; // RowCache is a LRU cache for row store -class RowCache : public LRUCachePolicyTrackingManual { +class RowCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key for row lru cache struct RowCacheKey { @@ -220,7 +220,7 @@ class RowCache : public LRUCachePolicyTrackingManual { // A cache used for prepare stmt. // One connection per stmt perf uuid -class LookupConnectionCache : public LRUCachePolicyTrackingManual { +class LookupConnectionCache : public LRUCachePolicy { public: static LookupConnectionCache* instance() { return ExecEnv::GetInstance()->get_lookup_connection_cache(); @@ -231,9 +231,9 @@ class LookupConnectionCache : public LRUCachePolicyTrackingManual { private: friend class PointQueryExecutor; LookupConnectionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, - capacity, LRUCacheType::NUMBER, - config::tablet_lookup_cache_stale_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, + LRUCacheType::NUMBER, + config::tablet_lookup_cache_stale_sweep_time_sec) {} static std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index 05b8b8824b5448d..600ffdb647ce440 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -20,9 +20,9 @@ namespace doris { ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, LRUCacheType::NUMBER, - config::common_obj_lru_cache_stale_sweep_time_sec, num_shards) { + : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, + LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec, + num_shards) { _enabled = (capacity > 0); } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index c7f805fc3a1de2d..680a32e79bc9913 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -25,9 +25,9 @@ namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. -class ObjLRUCache : public LRUCachePolicyTrackingManual { +class ObjLRUCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; struct ObjKey { ObjKey(const std::string& key_) : key(key_) {} @@ -94,8 +94,8 @@ class ObjLRUCache : public LRUCachePolicyTrackingManual { if (_enabled) { const std::string& encoded_key = key.key; auto* obj_value = new ObjValue(value); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, obj_value, 1, - sizeof(T), CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, obj_value, 1, sizeof(T), + CachePriority::NORMAL); *cache_handle = CacheHandle {this, handle}; } else { cache_handle = nullptr; diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index 4fc096380c754b0..cfd9e2c881d153b 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -88,11 +88,11 @@ class CacheTest : public testing::Test { void* value; }; - class CacheTestPolicy : public LRUCachePolicyTrackingManual { + class CacheTestPolicy : public LRUCachePolicy { public: CacheTestPolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::FOR_UT, capacity, - LRUCacheType::SIZE, -1) {} + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT, capacity, LRUCacheType::SIZE, -1) { + } }; // there is 16 shards in ShardedLRUCache