Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 10, 2024
1 parent a9caf05 commit 7bf941e
Show file tree
Hide file tree
Showing 32 changed files with 806 additions and 482 deletions.
5 changes: 4 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

// If memory tracker value is inaccurate, BE will crash. usually used in test environments, default value is false.
// The actual meaning of this parameter is `debug_memory`.
// 1. crash in memory tracker inaccurate, if memory tracker value is inaccurate, BE will crash.
// usually used in test environments, default value is false.
// 2. print more memory logs.
DEFINE_mBool(crash_in_memory_tracker_inaccurate, "false");

// default is true. if any memory tracking in Orphan mem tracker will report error.
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
// modify this parameter to crash when large memory allocation occur will help
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// If memory tracker value is inaccurate, BE will crash. usually used in test environments, default value is false.
// The actual meaning of this parameter is `debug_memory`.
// 1. crash in memory tracker inaccurate, if memory tracker value is inaccurate, BE will crash.
// usually used in test environments, default value is false.
// 2. print more memory logs.
DECLARE_mBool(crash_in_memory_tracker_inaccurate);

// default is true. if any memory tracking in Orphan mem tracker will report error.
Expand Down
30 changes: 11 additions & 19 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,37 @@
// IWYU pragma: no_include <bits/std_abs.h>
#include <butil/iobuf.h>
#include <math.h>
#include <signal.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
#include <set>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/be_proc_monitor.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
#include "runtime/process_profile.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/algorithm_util.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
#include "util/disk_info.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/network_util.h"
#include "util/perf_counters.h"
#include "util/system_metrics.h"
#include "util/thrift_util.h"
#include "util/time.h"

namespace doris {
Expand Down Expand Up @@ -233,9 +222,8 @@ void refresh_memory_state_after_memory_change() {
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();
// Refresh mem tracker each type counter.
doris::MemTrackerLimiter::refresh_global_counter();
doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
Expand Down Expand Up @@ -339,10 +327,12 @@ void Daemon::memory_gc_thread() {
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
doris::ProcessProfile::instance()
->memory_profile()
->enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
Expand All @@ -352,9 +342,11 @@ void Daemon::memory_gc_thread() {
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
doris::ProcessProfile::instance()
->memory_profile()
->enable_print_log_process_usage();
}
} else {
if (memory_full_gc_sleep_time_ms > 0) {
Expand Down
151 changes: 58 additions & 93 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@
#include <vector>

#include "common/config.h"
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
#include "http/action/tablets_info_action.h"
#include "http/web_page_handler.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/process_profile.h"
#include "util/easy_json.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
Expand Down Expand Up @@ -97,16 +93,56 @@ void config_handler(const WebPageHandler::ArgumentMap& args, std::stringstream*
(*output) << "</pre>";
}

// Registered to handle "/memz", and prints out memory allocation statistics.
void mem_usage_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<pre>"
<< "Mem Limit: " << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< std::endl
<< "Physical Mem From Perf: "
<< PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES) << std::endl
<< "</pre>";
// Registered to handle "/profile".
void process_profile_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<h2 id=\"processProfileTitle\">Process Profile</h2>\n";
doris::ProcessProfile::instance()->refresh_profile();
(*output) << "<pre id=\"processProfile\">"
<< doris::ProcessProfile::instance()->print_process_profile() << "</pre>";

(*output) << "<h2>Memory Info</h2>\n";
(*output) << "<pre>";
(*output) << "<h4>Memory Documents</h4>\n"
<< "<a "
"href=https://doris.apache.org/zh-CN/docs/dev/admin-manual/memory-management/"
"overview>Memory Management Overview</a>"
<< std::endl
<< "<a "
"href=https://doris.apache.org/zh-CN/docs/dev/admin-manual/memory-management/"
"memory-issue-faq>Memory Issue FAQ</a>"
<< std::endl;

(*output) << "<h4 id=\"memoryPropertiesTitle\">Memory Properties</h4>\n"
<< "System Physical Mem: "
<< PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES) << std::endl
<< "System Page Size: " << MemInfo::get_page_size() << std::endl
<< "Mem Limit: " << MemInfo::mem_limit_str() << std::endl
<< "Soft Mem Limit: " << MemInfo::soft_mem_limit_str() << std::endl
<< "System Mem Available Low Water Mark: "
<< PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)
<< std::endl
<< "System Mem Available Warning Water Mark: "
<< PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)
<< std::endl
<< "Cgroup Mem Limit: "
<< PrettyPrinter::print(MemInfo::cgroup_mem_limit(), TUnit::BYTES) << std::endl
<< "Cgroup Mem Usage: "
<< PrettyPrinter::print(MemInfo::cgroup_mem_usage(), TUnit::BYTES) << std::endl
<< "Cgroup Mem Refresh State: " << MemInfo::cgroup_mem_refresh_state() << std::endl;

(*output) << "<h4 id=\"memoryOptionSettingsTitle\">Memory Option Settings</h4>\n";
{
std::lock_guard<std::mutex> lock(*config::get_mutable_string_config_lock());
for (const auto& it : *(config::full_conf_map)) {
if (it.first.find("memory") != std::string::npos ||
it.first.find("cache") != std::string::npos ||
it.first.find("mem") != std::string::npos) {
(*output) << it.first << "=" << it.second << std::endl;
}
}
}

(*output) << "<h4 id=\"jemallocProfilesTitle\">Jemalloc Profiles</h4>\n";
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
(*output) << "Memory tracking is not available with address sanitizer builds.";
#elif defined(USE_JEMALLOC)
Expand All @@ -117,15 +153,16 @@ void mem_usage_handler(const WebPageHandler::ArgumentMap& args, std::stringstrea
};
jemalloc_stats_print(write_cb, &tmp, "a");
boost::replace_all(tmp, "\n", "<br>");
(*output) << tmp << "</pre>";
(*output) << tmp;
#else
char buf[2048];
MallocExtension::instance()->GetStats(buf, 2048);
// Replace new lines with <br> for html
std::string tmp(buf);
boost::replace_all(tmp, "\n", "<br>");
(*output) << tmp << "</pre>";
(*output) << tmp;
#endif
(*output) << "</pre>";
}

void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) {
Expand All @@ -141,76 +178,8 @@ void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson*

// Registered to handle "/mem_tracker", and prints out memory tracker information.
void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<h1>Memory usage by subsystem</h1>\n";
std::vector<MemTrackerLimiter::Snapshot> snapshots;
auto iter = args.find("type");
if (iter != args.end()) {
if (iter->second == "global") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL);
} else if (iter->second == "query") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::QUERY);
} else if (iter->second == "load") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::LOAD);
} else if (iter->second == "compaction") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::COMPACTION);
} else if (iter->second == "schema_change") {
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "other") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER);
} else if (iter->second == "reserved_memory") {
MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots);
} else if (iter->second == "all") {
MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
}
} else {
(*output) << "<h4>*Notice:</h4>\n";
(*output) << "<h4> 1. MemTracker only counts the memory on part of the main execution "
"path, "
"which is usually less than the real process memory.</h4>\n";
(*output) << "<h4> 2. each `type` is the sum of a set of tracker values, "
"`sum of all trackers` is the sum of all trackers of all types, .</h4>\n";
(*output) << "<h4> 3. `process resident memory` is the physical memory of the process, "
"from /proc VmRSS VmHWM.</h4>\n";
(*output) << "<h4> 4. `process virtual memory` is the virtual memory of the process, "
"from /proc VmSize VmPeak.</h4>\n";
(*output) << "<h4> 5.`/mem_tracker?type=<type name>` to view the memory details of each "
"type, for example, `/mem_tracker?type=query` will list the memory of all "
"queries; "
"`/mem_tracker?type=global` will list the memory of all Cache, metadata and "
"other "
"global life cycles.</h4>\n";
(*output) << "<h4>see documentation for details.";
MemTrackerLimiter::make_process_snapshots(&snapshots);
}

(*output) << "<table data-toggle='table' "
" data-pagination='true' "
" data-search='true' "
" class='table table-striped'>\n";
(*output) << "<thead><tr>"
"<th data-sortable='true'>Type</th>"
"<th data-sortable='true'>Label</th>"
"<th>Limit</th>"
"<th data-sortable='true' "
">Current Consumption(Bytes)</th>"
"<th>Current Consumption(Normalize)</th>"
"<th data-sortable='true' "
">Peak Consumption(Bytes)</th>"
"<th>Peak Consumption(Normalize)</th>"
"</tr></thead>";
(*output) << "<tbody>\n";
for (const auto& item : snapshots) {
string limit_str = item.limit == -1 ? "none" : AccurateItoaKMGT(item.limit);
string current_consumption_normalize = AccurateItoaKMGT(item.cur_consumption);
string peak_consumption_normalize = AccurateItoaKMGT(item.peak_consumption);
(*output) << strings::Substitute(
"<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td><td>$4</td><td>$5</td><td>$6</"
"td></tr>\n",
item.type, item.label, limit_str, item.cur_consumption,
current_consumption_normalize, item.peak_consumption, peak_consumption_normalize);
}
(*output) << "</tbody></table>\n";
(*output) << "<h2>mem_tracker webpage has been offline, please click <a "
"href=../profile>Process Profile</a></h2>\n";
}

void heap_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
Expand Down Expand Up @@ -394,14 +363,10 @@ void add_default_path_handlers(WebPageHandler* web_page_handler) {
web_page_handler->register_page("/varz", "Configs", config_handler,
true /* is_on_nav_bar */);
}
web_page_handler->register_page("/memz", "Memory", mem_usage_handler, true /* is_on_nav_bar */);
web_page_handler->register_page(
"/mem_tracker", "MemTracker",
[](auto&& PH1, auto&& PH2) {
return mem_tracker_handler(std::forward<decltype(PH1)>(PH1),
std::forward<decltype(PH2)>(PH2));
},
true /* is_on_nav_bar */);
web_page_handler->register_page("/profile", "Process Profile", process_profile_handler,
true /* is_on_nav_bar */);
web_page_handler->register_page("/mem_tracker", "MemTracker", mem_tracker_handler,
true /* is_on_nav_bar */);
web_page_handler->register_page("/heap", "Heap Profile", heap_handler,
true /* is_on_nav_bar */);
web_page_handler->register_page("/cpu", "CPU Profile", cpu_handler, true /* is_on_nav_bar */);
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
if (_segcompaction_worker) {
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id,
rowset_writer_context.load_id);
}
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ using namespace ErrorCode;

SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}

void SegcompactionWorker::init_mem_tracker(int64_t txn_id) {
void SegcompactionWorker::init_mem_tracker(int64_t txn_id, PUniqueId load_id) {
_seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::COMPACTION, "segcompaction-" + std::to_string(txn_id));
MemTrackerLimiter::Type::COMPACTION,
fmt::format("segcompaction-txnID_{}-loadID_{}", std::to_string(txn_id),
print_id(load_id)));
}

Status SegcompactionWorker::_get_segcompaction_reader(
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class SegcompactionWorker {
// set the cancel flag, tasks already started will not be cancelled.
bool cancel();

void init_mem_tracker(int64_t txn_id);
void init_mem_tracker(int64_t txn_id, PUniqueId load_id);

private:
Status _create_segment_writer_for_segcompaction(
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class LookupConnectionCache;
class RowCache;
class DummyLRUCache;
class CacheManager;
class ProcessProfile;
class WalManager;
class DNSCache;

Expand Down Expand Up @@ -271,6 +272,7 @@ class ExecEnv {

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
void set_process_profile(ProcessProfile* pp) { this->_process_profile = pp; }
void set_tablet_schema_cache(TabletSchemaCache* c) { this->_tablet_schema_cache = c; }
void set_storage_page_cache(StoragePageCache* c) { this->_storage_page_cache = c; }
void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; }
Expand Down Expand Up @@ -303,6 +305,7 @@ class ExecEnv {
LookupConnectionCache* get_lookup_connection_cache() { return _lookup_connection_cache; }
RowCache* get_row_cache() { return _row_cache; }
CacheManager* get_cache_manager() { return _cache_manager; }
ProcessProfile* get_process_profile() { return _process_profile; }
segment_v2::InvertedIndexSearcherCache* get_inverted_index_searcher_cache() {
return _inverted_index_searcher_cache;
}
Expand Down Expand Up @@ -441,6 +444,7 @@ class ExecEnv {
LookupConnectionCache* _lookup_connection_cache = nullptr;
RowCache* _row_cache = nullptr;
CacheManager* _cache_manager = nullptr;
ProcessProfile* _process_profile = nullptr;
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
QueryCache* _query_cache = nullptr;
Expand Down
Loading

0 comments on commit 7bf941e

Please sign in to comment.