diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f66a7dd17c5e093..5098c08e15b864a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -133,13 +133,11 @@ DEFINE_mString(process_full_gc_size, "20%"); // If false, cancel query when the memory used exceeds exec_mem_limit, same as before. DEFINE_mBool(enable_query_memory_overcommit, "true"); -DEFINE_mBool(disable_memory_gc, "false"); +DEFINE_mBool(enable_memory_reclamation, "true"); DEFINE_mBool(enable_stacktrace, "true"); -DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, "false"); - -DEFINE_mInt64(large_memory_check_bytes, "2147483648"); +DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648"); DEFINE_mBool(enable_memory_orphan_check, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index fd38924f47e74ec..cded974e9d9e3b9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -179,20 +179,19 @@ DECLARE_mString(process_full_gc_size); // If false, cancel query when the memory used exceeds exec_mem_limit, same as before. DECLARE_mBool(enable_query_memory_overcommit); -// gc will release cache, cancel task, and task will wait for gc to release memory, -// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true -DECLARE_mBool(disable_memory_gc); +// memory reclamation will release cache, cancel task, and task will wait for gc to release memory, +// default reclamation strategy is conservative, +// if you want to exclude the interference of memory reclamation, let it be false +DECLARE_mBool(enable_memory_reclamation); // if false, turn off all stacktrace DECLARE_mBool(enable_stacktrace); -// Allocator check failed log stacktrace if not catch exception -DECLARE_mBool(enable_stacktrace_in_allocator_check_failed); - -// malloc or new large memory larger than large_memory_check_bytes, default 2G, -// will print a warning containing the stacktrace, but not prevent memory alloc. -// If is -1, disable large memory check. -DECLARE_mInt64(large_memory_check_bytes); +// when alloc memory larger than stacktrace_in_alloc_large_memory_bytes, default 2G, +// if alloc successful, will print a warning with stacktrace, but not prevent memory alloc. +// if alloc failed using Doris Allocator, will print stacktrace in error log. +// if is -1, disable print stacktrace when alloc large memory. +DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes); // default is true. if any memory tracking in Orphan mem tracker will report error. DECLARE_mBool(enable_memory_orphan_check); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 7667820b83f84fc..c15736b2f1d1a0b 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -249,7 +249,7 @@ void Daemon::memory_gc_thread() { int32_t memory_gc_sleep_time_ms = config::memory_gc_sleep_time_ms; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(interval_milliseconds))) { - if (config::disable_memory_gc) { + if (!config::enable_memory_reclamation) { continue; } auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available(); @@ -387,11 +387,29 @@ void Daemon::je_purge_dirty_pages_thread() const { if (_stop_background_threads_latch.count() == 0) { break; } + if (!config::enable_memory_reclamation) { + continue; + } doris::MemInfo::je_purge_all_arena_dirty_pages(); doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed); } while (true); } +void Daemon::cache_prune_stale_thread() { + int32_t interval = config::cache_periodic_prune_stale_sweep_sec; + while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { + if (interval <= 0) { + LOG(WARNING) << "config of cache clean interval is illegal: [" << interval + << "], force set to 3600 "; + interval = 3600; + } + if (!config::enable_memory_reclamation) { + continue; + } + CacheManager::instance()->for_each_cache_prune_stale(); + } +} + void Daemon::wg_weighted_memory_ratio_refresh_thread() { // Refresh weighted memory ratio of workload groups while (!_stop_background_threads_latch.wait_for( @@ -435,6 +453,11 @@ void Daemon::start() { st = Thread::create( "Daemon", "je_purge_dirty_pages_thread", [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back()); + CHECK(st.ok()) << st; + st = Thread::create( + "Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); }, + &_threads.emplace_back()); + CHECK(st.ok()) << st; st = Thread::create( "Daemon", "query_runtime_statistics_thread", [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back()); diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 2a8adf20e4681ae..64c9f0c8993ae38 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -43,6 +43,7 @@ class Daemon { void memtable_memory_refresh_thread(); void calculate_metrics_thread(); void je_purge_dirty_pages_thread() const; + void cache_prune_stale_thread(); void report_runtime_query_statistics_thread(); void wg_weighted_memory_ratio_refresh_thread(); void be_proc_monitor_thread(); diff --git a/be/src/http/action/clear_cache_action.cpp b/be/src/http/action/clear_cache_action.cpp index f42499090c42ae0..cb183a99cf15029 100644 --- a/be/src/http/action/clear_cache_action.cpp +++ b/be/src/http/action/clear_cache_action.cpp @@ -30,10 +30,37 @@ namespace doris { const static std::string HEADER_JSON = "application/json"; -void ClearDataCacheAction::handle(HttpRequest* req) { +void ClearCacheAction::handle(HttpRequest* req) { req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); - CacheManager::instance()->clear_once(); - HttpChannel::send_reply(req, HttpStatus::OK, ""); + std::string cache_type_str = req->param("type"); + fmt::memory_buffer return_string_buffer; + int64_t freed_size = 0; + if (cache_type_str == "all") { + freed_size = CacheManager::instance()->for_each_cache_prune_all(nullptr, true); + } else { + CachePolicy::CacheType cache_type = CachePolicy::string_to_type(cache_type_str); + if (cache_type == CachePolicy::CacheType::NONE) { + fmt::format_to(return_string_buffer, + "ClearCacheAction not match type:{} of cache policy", cache_type_str); + LOG(WARNING) << fmt::to_string(return_string_buffer); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + fmt::to_string(return_string_buffer)); + return; + } + freed_size = CacheManager::instance()->cache_prune_all(cache_type, true); + if (freed_size == -1) { + fmt::format_to(return_string_buffer, + "ClearCacheAction cache:{} is not allowed to be pruned", cache_type_str); + LOG(WARNING) << fmt::to_string(return_string_buffer); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + fmt::to_string(return_string_buffer)); + return; + } + } + fmt::format_to(return_string_buffer, "ClearCacheAction cache:{} prune win, freed size {}", + cache_type_str, freed_size); + LOG(WARNING) << fmt::to_string(return_string_buffer); + HttpChannel::send_reply(req, HttpStatus::OK, fmt::to_string(return_string_buffer)); } } // end namespace doris diff --git a/be/src/http/action/clear_cache_action.h b/be/src/http/action/clear_cache_action.h index 3840f63593f98f5..3795a87b5d76ffc 100644 --- a/be/src/http/action/clear_cache_action.h +++ b/be/src/http/action/clear_cache_action.h @@ -23,11 +23,11 @@ namespace doris { class HttpRequest; -class ClearDataCacheAction : public HttpHandler { +class ClearCacheAction : public HttpHandler { public: - ClearDataCacheAction() = default; + ClearCacheAction() = default; - ~ClearDataCacheAction() override = default; + ~ClearCacheAction() override = default; void handle(HttpRequest* req) override; }; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 345c1bed4ff21f9..7c88156f74cef5b 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -301,12 +301,6 @@ Status StorageEngine::start_bg_threads() { [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread)); LOG(INFO) << "tablet path check thread started"; - // cache clean thread - RETURN_IF_ERROR(Thread::create( - "StorageEngine", "cache_clean_thread", [this]() { this->_cache_clean_callback(); }, - &_cache_clean_thread)); - LOG(INFO) << "cache clean thread started"; - // path scan and gc thread if (config::path_gc_check) { for (auto data_dir : get_stores()) { @@ -359,42 +353,6 @@ Status StorageEngine::start_bg_threads() { return Status::OK(); } -void StorageEngine::_cache_clean_callback() { - int32_t interval = config::cache_periodic_prune_stale_sweep_sec; - while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { - if (interval <= 0) { - LOG(WARNING) << "config of cache clean interval is illegal: [" << interval - << "], force set to 3600 "; - interval = 3600; - } - if (config::disable_memory_gc) { - continue; - } - - CacheManager::instance()->for_each_cache_prune_stale(); - - // Dynamically modify the config to clear the cache, each time the disable cache will only be cleared once. - if (config::disable_segment_cache) { - if (!_clear_segment_cache) { - CacheManager::instance()->clear_once(CachePolicy::CacheType::SEGMENT_CACHE); - _clear_segment_cache = true; - } - } else { - _clear_segment_cache = false; - } - if (config::disable_storage_page_cache) { - if (!_clear_page_cache) { - CacheManager::instance()->clear_once(CachePolicy::CacheType::DATA_PAGE_CACHE); - CacheManager::instance()->clear_once(CachePolicy::CacheType::INDEXPAGE_CACHE); - CacheManager::instance()->clear_once(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE); - _clear_page_cache = true; - } - } else { - _clear_page_cache = false; - } - } -} - void StorageEngine::_garbage_sweeper_thread_callback() { uint32_t max_interval = config::max_garbage_sweep_interval; uint32_t min_interval = config::min_garbage_sweep_interval; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 3105c4f53d0aae6..7eb940477567705 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -361,9 +361,6 @@ class StorageEngine final : public BaseStorageEngine { // delete tablet with io error process function void _disk_stat_monitor_thread_callback(); - // clean file descriptors cache - void _cache_clean_callback(); - // path gc process function void _path_gc_thread_callback(DataDir* data_dir); @@ -528,9 +525,6 @@ class StorageEngine final : public BaseStorageEngine { scoped_refptr _async_publish_thread; std::shared_mutex _async_publish_lock; - bool _clear_segment_cache = false; - bool _clear_page_cache = false; - std::atomic _need_clean_trash {false}; // next index for create tablet diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp index 9bf3d1e12d0c8cc..a6516c40a35770a 100644 --- a/be/src/runtime/memory/cache_manager.cpp +++ b/be/src/runtime/memory/cache_manager.cpp @@ -48,24 +48,22 @@ int64_t CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) { return 0; } -int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) { - if (need_prune(&_last_prune_all_timestamp, "all")) { +int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile, bool force) { + if (force || need_prune(&_last_prune_all_timestamp, "all")) { return for_each_cache_prune_stale_wrap( - [](CachePolicy* cache_policy) { cache_policy->prune_all(false); }, profile); + [force](CachePolicy* cache_policy) { cache_policy->prune_all(force); }, profile); } return 0; } -void CacheManager::clear_once() { +int64_t CacheManager::cache_prune_all(CachePolicy::CacheType type, bool force) { std::lock_guard l(_caches_lock); - for (const auto& pair : _caches) { - pair.second->prune_all(true); + auto* cache_policy = _caches[type]; + if (!cache_policy->enable_prune()) { + return -1; } -} - -void CacheManager::clear_once(CachePolicy::CacheType type) { - std::lock_guard l(_caches_lock); - _caches[type]->prune_all(true); // will print log + cache_policy->prune_all(force); + return cache_policy->profile()->get_counter("FreedMemory")->value(); } } // namespace doris diff --git a/be/src/runtime/memory/cache_manager.h b/be/src/runtime/memory/cache_manager.h index 20372366aa1a7d4..d94dca501670bf1 100644 --- a/be/src/runtime/memory/cache_manager.h +++ b/be/src/runtime/memory/cache_manager.h @@ -64,10 +64,9 @@ class CacheManager { int64_t for_each_cache_prune_stale(RuntimeProfile* profile = nullptr); - int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr); - - void clear_once(); - void clear_once(CachePolicy::CacheType type); + // if force is true, regardless of the two prune interval and cache size, cache will be pruned this time. + int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr, bool force = false); + int64_t cache_prune_all(CachePolicy::CacheType type, bool force = false); bool need_prune(int64_t* last_timestamp, const std::string& type) { int64_t now = UnixSeconds(); diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index e59c5c7ac3e9787..c457afd86898f27 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -47,6 +47,7 @@ class CachePolicy { CREATE_TABLET_RR_IDX_CACHE = 15, CLOUD_TABLET_CACHE = 16, CLOUD_TXN_DELETE_BITMAP_CACHE = 17, + NONE = 18, // not be used }; static std::string type_string(CacheType type) { @@ -94,6 +95,34 @@ class CachePolicy { __builtin_unreachable(); } + inline static std::unordered_map StringToType = { + {"DataPageCache", CacheType::DATA_PAGE_CACHE}, + {"IndexPageCache", CacheType::INDEXPAGE_CACHE}, + {"PKIndexPageCache", CacheType::PK_INDEX_PAGE_CACHE}, + {"SchemaCache", CacheType::SCHEMA_CACHE}, + {"SegmentCache", CacheType::SEGMENT_CACHE}, + {"InvertedIndexSearcherCache", CacheType::INVERTEDINDEX_SEARCHER_CACHE}, + {"InvertedIndexQueryCache", CacheType::INVERTEDINDEX_QUERY_CACHE}, + {"PointQueryLookupConnectionCache", CacheType::LOOKUP_CONNECTION_CACHE}, + {"PointQueryRowCache", CacheType::POINT_QUERY_ROW_CACHE}, + {"MowDeleteBitmapAggCache", CacheType::DELETE_BITMAP_AGG_CACHE}, + {"MowTabletVersionCache", CacheType::TABLET_VERSION_CACHE}, + {"LastSuccessChannelCache", CacheType::LAST_SUCCESS_CHANNEL_CACHE}, + {"CommonObjLRUCache", CacheType::COMMON_OBJ_LRU_CACHE}, + {"ForUT", CacheType::FOR_UT}, + {"TabletSchemaCache", CacheType::TABLET_SCHEMA_CACHE}, + {"CreateTabletRRIdxCache", CacheType::CREATE_TABLET_RR_IDX_CACHE}, + {"CloudTabletCache", CacheType::CLOUD_TABLET_CACHE}, + {"CloudTxnDeleteBitmapCache", CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE}}; + + static CacheType string_to_type(std::string type) { + if (StringToType.contains(type)) { + return StringToType[type]; + } else { + return CacheType::NONE; + } + } + CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune); virtual ~CachePolicy(); diff --git a/be/src/runtime/memory/memory_reclamation.cpp b/be/src/runtime/memory/memory_reclamation.cpp index 536c4658c8c515d..3adf1d1ac75718c 100644 --- a/be/src/runtime/memory/memory_reclamation.cpp +++ b/be/src/runtime/memory/memory_reclamation.cpp @@ -47,7 +47,6 @@ bool MemoryReclamation::process_minor_gc(std::string mem_info) { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - MemInfo::notify_je_purge_dirty_pages(); if (freed_mem > MemInfo::process_minor_gc_size()) { return true; } @@ -98,7 +97,6 @@ bool MemoryReclamation::process_full_gc(std::string mem_info) { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - MemInfo::notify_je_purge_dirty_pages(); if (freed_mem > MemInfo::process_full_gc_size()) { return true; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 39e896d0f18d2d9..55ba70ee43b81c4 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -242,8 +242,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che flush_untracked_mem(); } - if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 && - size > doris::config::large_memory_check_bytes) { + if (skip_large_memory_check == 0 && doris::config::stacktrace_in_alloc_large_memory_bytes > 0 && + size > doris::config::stacktrace_in_alloc_large_memory_bytes) { _stop_consume = true; LOG(WARNING) << fmt::format( "malloc or new large memory: {}, {}, this is just a warning, not prevent memory " diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 5cea6cb67acac59..42791d3d791714e 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -157,10 +157,10 @@ Status HttpService::start() { HealthAction* health_action = _pool.add(new HealthAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); - // Dump all running pipeline tasks - ClearDataCacheAction* clear_data_cache_action = _pool.add(new ClearDataCacheAction()); - _ev_http_server->register_handler(HttpMethod::GET, "/api/clear_data_cache", - clear_data_cache_action); + // Clear cache action + ClearCacheAction* clear_cache_action = _pool.add(new ClearCacheAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/clear_cache/{type}", + clear_cache_action); // Dump all running pipeline tasks PipelineTaskAction* pipeline_task_action = _pool.add(new PipelineTaskAction()); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 2b1c05533cd5049..03d4d43925f40bd 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -89,9 +89,8 @@ void Allocator::sys_memory_check(size_t doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); - if (!doris::enable_thread_catch_bad_alloc && - (size > 1024L * 1024 * 1024 || - doris::config::enable_stacktrace_in_allocator_check_failed)) { + if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 && + size > doris::config::stacktrace_in_alloc_large_memory_bytes) { err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); } @@ -102,8 +101,11 @@ void Allocator::sys_memory_check(size_t } return; } - if (!doris::config::disable_memory_gc && - doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && + + // no significant impact on performance is expected. + doris::MemInfo::notify_je_purge_dirty_pages(); + + if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = 0; LOG(INFO) << fmt::format( @@ -111,19 +113,21 @@ void Allocator::sys_memory_check(size_t print_id(doris::thread_context()->task_id()), doris::thread_context()->get_thread_id(), doris::config::thread_wait_gc_max_milliseconds, err_msg); - while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - if (!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) { - doris::GlobalMemoryArbitrator::refresh_interval_memory_growth += size; - break; - } - if (doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) { - if (doris::enable_thread_catch_bad_alloc) { - throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + if (doris::config::enable_memory_reclamation) { + while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if (!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) { + doris::GlobalMemoryArbitrator::refresh_interval_memory_growth += size; + break; + } + if (doris::thread_context()->thread_mem_tracker_mgr->is_query_cancelled()) { + if (doris::enable_thread_catch_bad_alloc) { + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } + return; } - return; + wait_milliseconds += 100; } - wait_milliseconds += 100; } if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once.