Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 28, 2024
1 parent 390cdcc commit 7b9b20b
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 108 deletions.
6 changes: 2 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,11 @@ DEFINE_mString(process_full_gc_size, "20%");
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DEFINE_mBool(enable_query_memory_overcommit, "true");

DEFINE_mBool(disable_memory_gc, "false");
DEFINE_mBool(enable_memory_reclamation, "true");

DEFINE_mBool(enable_stacktrace, "true");

DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, "false");

DEFINE_mInt64(large_memory_check_bytes, "2147483648");
DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mBool(enable_memory_orphan_check, "false");

Expand Down
19 changes: 9 additions & 10 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,19 @@ DECLARE_mString(process_full_gc_size);
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DECLARE_mBool(enable_query_memory_overcommit);

// gc will release cache, cancel task, and task will wait for gc to release memory,
// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true
DECLARE_mBool(disable_memory_gc);
// memory reclamation will release cache, cancel task, and task will wait for gc to release memory,
// default reclamation strategy is conservative,
// if you want to exclude the interference of memory reclamation, let it be false
DECLARE_mBool(enable_memory_reclamation);

// if false, turn off all stacktrace
DECLARE_mBool(enable_stacktrace);

// Allocator check failed log stacktrace if not catch exception
DECLARE_mBool(enable_stacktrace_in_allocator_check_failed);

// malloc or new large memory larger than large_memory_check_bytes, default 2G,
// will print a warning containing the stacktrace, but not prevent memory alloc.
// If is -1, disable large memory check.
DECLARE_mInt64(large_memory_check_bytes);
// when alloc memory larger than stacktrace_in_alloc_large_memory_bytes, default 2G,
// if alloc successful, will print a warning with stacktrace, but not prevent memory alloc.
// if alloc failed using Doris Allocator, will print stacktrace in error log.
// if is -1, disable print stacktrace when alloc large memory.
DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);

// default is true. if any memory tracking in Orphan mem tracker will report error.
DECLARE_mBool(enable_memory_orphan_check);
Expand Down
25 changes: 24 additions & 1 deletion be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void Daemon::memory_gc_thread() {
int32_t memory_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(interval_milliseconds))) {
if (config::disable_memory_gc) {
if (!config::enable_memory_reclamation) {
continue;
}
auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available();
Expand Down Expand Up @@ -387,11 +387,29 @@ void Daemon::je_purge_dirty_pages_thread() const {
if (_stop_background_threads_latch.count() == 0) {
break;
}
if (!config::enable_memory_reclamation) {
continue;
}
doris::MemInfo::je_purge_all_arena_dirty_pages();
doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
} while (true);
}

void Daemon::cache_prune_stale_thread() {
int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
if (interval <= 0) {
LOG(WARNING) << "config of cache clean interval is illegal: [" << interval
<< "], force set to 3600 ";
interval = 3600;
}
if (!config::enable_memory_reclamation) {
continue;
}
CacheManager::instance()->for_each_cache_prune_stale();
}
}

void Daemon::wg_weighted_memory_ratio_refresh_thread() {
// Refresh weighted memory ratio of workload groups
while (!_stop_background_threads_latch.wait_for(
Expand Down Expand Up @@ -435,6 +453,11 @@ void Daemon::start() {
st = Thread::create(
"Daemon", "je_purge_dirty_pages_thread",
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Daemon {
void memtable_memory_refresh_thread();
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void cache_prune_stale_thread();
void report_runtime_query_statistics_thread();
void wg_weighted_memory_ratio_refresh_thread();
void be_proc_monitor_thread();
Expand Down
33 changes: 30 additions & 3 deletions be/src/http/action/clear_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,37 @@ namespace doris {

const static std::string HEADER_JSON = "application/json";

void ClearDataCacheAction::handle(HttpRequest* req) {
void ClearCacheAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
CacheManager::instance()->clear_once();
HttpChannel::send_reply(req, HttpStatus::OK, "");
std::string cache_type_str = req->param("type");
fmt::memory_buffer return_string_buffer;
int64_t freed_size = 0;
if (cache_type_str == "all") {
freed_size = CacheManager::instance()->for_each_cache_prune_all(nullptr, true);
} else {
CachePolicy::CacheType cache_type = CachePolicy::string_to_type(cache_type_str);
if (cache_type == CachePolicy::CacheType::NONE) {
fmt::format_to(return_string_buffer,
"ClearCacheAction not match type:{} of cache policy", cache_type_str);
LOG(WARNING) << fmt::to_string(return_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(return_string_buffer));
return;
}
freed_size = CacheManager::instance()->cache_prune_all(cache_type, true);
if (freed_size == -1) {
fmt::format_to(return_string_buffer,
"ClearCacheAction cache:{} is not allowed to be pruned", cache_type_str);
LOG(WARNING) << fmt::to_string(return_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(return_string_buffer));
return;
}
}
fmt::format_to(return_string_buffer, "ClearCacheAction cache:{} prune win, freed size {}",
cache_type_str, freed_size);
LOG(WARNING) << fmt::to_string(return_string_buffer);
HttpChannel::send_reply(req, HttpStatus::OK, fmt::to_string(return_string_buffer));
}

} // end namespace doris
6 changes: 3 additions & 3 deletions be/src/http/action/clear_cache_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ namespace doris {

class HttpRequest;

class ClearDataCacheAction : public HttpHandler {
class ClearCacheAction : public HttpHandler {
public:
ClearDataCacheAction() = default;
ClearCacheAction() = default;

~ClearDataCacheAction() override = default;
~ClearCacheAction() override = default;

void handle(HttpRequest* req) override;
};
Expand Down
42 changes: 0 additions & 42 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,6 @@ Status StorageEngine::start_bg_threads() {
[this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread));
LOG(INFO) << "tablet path check thread started";

// cache clean thread
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "cache_clean_thread", [this]() { this->_cache_clean_callback(); },
&_cache_clean_thread));
LOG(INFO) << "cache clean thread started";

// path scan and gc thread
if (config::path_gc_check) {
for (auto data_dir : get_stores()) {
Expand Down Expand Up @@ -359,42 +353,6 @@ Status StorageEngine::start_bg_threads() {
return Status::OK();
}

void StorageEngine::_cache_clean_callback() {
int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
if (interval <= 0) {
LOG(WARNING) << "config of cache clean interval is illegal: [" << interval
<< "], force set to 3600 ";
interval = 3600;
}
if (config::disable_memory_gc) {
continue;
}

CacheManager::instance()->for_each_cache_prune_stale();

// Dynamically modify the config to clear the cache, each time the disable cache will only be cleared once.
if (config::disable_segment_cache) {
if (!_clear_segment_cache) {
CacheManager::instance()->clear_once(CachePolicy::CacheType::SEGMENT_CACHE);
_clear_segment_cache = true;
}
} else {
_clear_segment_cache = false;
}
if (config::disable_storage_page_cache) {
if (!_clear_page_cache) {
CacheManager::instance()->clear_once(CachePolicy::CacheType::DATA_PAGE_CACHE);
CacheManager::instance()->clear_once(CachePolicy::CacheType::INDEXPAGE_CACHE);
CacheManager::instance()->clear_once(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE);
_clear_page_cache = true;
}
} else {
_clear_page_cache = false;
}
}
}

void StorageEngine::_garbage_sweeper_thread_callback() {
uint32_t max_interval = config::max_garbage_sweep_interval;
uint32_t min_interval = config::min_garbage_sweep_interval;
Expand Down
6 changes: 0 additions & 6 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,6 @@ class StorageEngine final : public BaseStorageEngine {
// delete tablet with io error process function
void _disk_stat_monitor_thread_callback();

// clean file descriptors cache
void _cache_clean_callback();

// path gc process function
void _path_gc_thread_callback(DataDir* data_dir);

Expand Down Expand Up @@ -528,9 +525,6 @@ class StorageEngine final : public BaseStorageEngine {
scoped_refptr<Thread> _async_publish_thread;
std::shared_mutex _async_publish_lock;

bool _clear_segment_cache = false;
bool _clear_page_cache = false;

std::atomic<bool> _need_clean_trash {false};

// next index for create tablet
Expand Down
20 changes: 9 additions & 11 deletions be/src/runtime/memory/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,22 @@ int64_t CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) {
return 0;
}

int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) {
if (need_prune(&_last_prune_all_timestamp, "all")) {
int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile, bool force) {
if (force || need_prune(&_last_prune_all_timestamp, "all")) {
return for_each_cache_prune_stale_wrap(
[](CachePolicy* cache_policy) { cache_policy->prune_all(false); }, profile);
[force](CachePolicy* cache_policy) { cache_policy->prune_all(force); }, profile);
}
return 0;
}

void CacheManager::clear_once() {
int64_t CacheManager::cache_prune_all(CachePolicy::CacheType type, bool force) {
std::lock_guard<std::mutex> l(_caches_lock);
for (const auto& pair : _caches) {
pair.second->prune_all(true);
auto* cache_policy = _caches[type];
if (!cache_policy->enable_prune()) {
return -1;
}
}

void CacheManager::clear_once(CachePolicy::CacheType type) {
std::lock_guard<std::mutex> l(_caches_lock);
_caches[type]->prune_all(true); // will print log
cache_policy->prune_all(force);
return cache_policy->profile()->get_counter("FreedMemory")->value();
}

} // namespace doris
7 changes: 3 additions & 4 deletions be/src/runtime/memory/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ class CacheManager {

int64_t for_each_cache_prune_stale(RuntimeProfile* profile = nullptr);

int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr);

void clear_once();
void clear_once(CachePolicy::CacheType type);
// if force is true, regardless of the two prune interval and cache size, cache will be pruned this time.
int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr, bool force = false);
int64_t cache_prune_all(CachePolicy::CacheType type, bool force = false);

bool need_prune(int64_t* last_timestamp, const std::string& type) {
int64_t now = UnixSeconds();
Expand Down
29 changes: 29 additions & 0 deletions be/src/runtime/memory/cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CachePolicy {
CREATE_TABLET_RR_IDX_CACHE = 15,
CLOUD_TABLET_CACHE = 16,
CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
NONE = 18, // not be used
};

static std::string type_string(CacheType type) {
Expand Down Expand Up @@ -94,6 +95,34 @@ class CachePolicy {
__builtin_unreachable();
}

inline static std::unordered_map<std::string, CacheType> StringToType = {
{"DataPageCache", CacheType::DATA_PAGE_CACHE},
{"IndexPageCache", CacheType::INDEXPAGE_CACHE},
{"PKIndexPageCache", CacheType::PK_INDEX_PAGE_CACHE},
{"SchemaCache", CacheType::SCHEMA_CACHE},
{"SegmentCache", CacheType::SEGMENT_CACHE},
{"InvertedIndexSearcherCache", CacheType::INVERTEDINDEX_SEARCHER_CACHE},
{"InvertedIndexQueryCache", CacheType::INVERTEDINDEX_QUERY_CACHE},
{"PointQueryLookupConnectionCache", CacheType::LOOKUP_CONNECTION_CACHE},
{"PointQueryRowCache", CacheType::POINT_QUERY_ROW_CACHE},
{"MowDeleteBitmapAggCache", CacheType::DELETE_BITMAP_AGG_CACHE},
{"MowTabletVersionCache", CacheType::TABLET_VERSION_CACHE},
{"LastSuccessChannelCache", CacheType::LAST_SUCCESS_CHANNEL_CACHE},
{"CommonObjLRUCache", CacheType::COMMON_OBJ_LRU_CACHE},
{"ForUT", CacheType::FOR_UT},
{"TabletSchemaCache", CacheType::TABLET_SCHEMA_CACHE},
{"CreateTabletRRIdxCache", CacheType::CREATE_TABLET_RR_IDX_CACHE},
{"CloudTabletCache", CacheType::CLOUD_TABLET_CACHE},
{"CloudTxnDeleteBitmapCache", CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE}};

static CacheType string_to_type(std::string type) {
if (StringToType.contains(type)) {
return StringToType[type];
} else {
return CacheType::NONE;
}
}

CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune);
virtual ~CachePolicy();

Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/memory/memory_reclamation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ bool MemoryReclamation::process_minor_gc(std::string mem_info) {
}};

freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
MemInfo::notify_je_purge_dirty_pages();
if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
Expand Down Expand Up @@ -98,7 +97,6 @@ bool MemoryReclamation::process_full_gc(std::string mem_info) {
}};

freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
MemInfo::notify_je_purge_dirty_pages();
if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
flush_untracked_mem();
}

if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 &&
size > doris::config::large_memory_check_bytes) {
if (skip_large_memory_check == 0 && doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&
size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
"malloc or new large memory: {}, {}, this is just a warning, not prevent memory "
Expand Down
8 changes: 4 additions & 4 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ Status HttpService::start() {
HealthAction* health_action = _pool.add(new HealthAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action);

// Dump all running pipeline tasks
ClearDataCacheAction* clear_data_cache_action = _pool.add(new ClearDataCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/clear_data_cache",
clear_data_cache_action);
// Clear cache action
ClearCacheAction* clear_cache_action = _pool.add(new ClearCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/clear_cache/{type}",
clear_cache_action);

// Dump all running pipeline tasks
PipelineTaskAction* pipeline_task_action = _pool.add(new PipelineTaskAction());
Expand Down
Loading

0 comments on commit 7b9b20b

Please sign in to comment.