Skip to content

Commit

Permalink
Merge branch 'master' into 20240908_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Sep 9, 2024
2 parents 1decfa6 + e25560e commit 1a83ee4
Show file tree
Hide file tree
Showing 102 changed files with 2,524 additions and 712 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/comment-to-trigger-teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ jobs:
id: parse
run: |
COMMENT_BODY=$(echo "${COMMENT_BODY}" | xargs)
PULL_REQUEST_NUM="$(echo "${{ github.event.issue.pull_request.url }}" | awk -F/ '{print $NF}')"
COMMIT_ID_FROM_TRIGGER="$(curl -s -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/${PULL_REQUEST_NUM}" | jq -r '.head.sha')"
TARGET_BRANCH="$(curl -s -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/${PULL_REQUEST_NUM}" | jq -r '.base.ref')"
if [[ "${COMMENT_BODY}" == *'run buildall'* ||
"${COMMENT_BODY}" == *'run compile'* ||
"${COMMENT_BODY}" == *'run beut'* ||
Expand All @@ -63,6 +66,10 @@ jobs:
echo "comment_trigger=false" | tee -a "$GITHUB_OUTPUT"
echo "comment_skip=true" | tee -a "$GITHUB_OUTPUT"
echo "COMMENT_USER_ID ${COMMENT_USER_ID} is allowed to skip buildall."
elif [[ "${COMMENT_USER_ID}" == '9208457' && "${TARGET_BRANCH}" == *'branch-2.1'* ]]; then
echo "COMMENT_USER_ID ${COMMENT_USER_ID} is allowed to skip buildall for branch-2.1"
echo "comment_trigger=false" | tee -a "$GITHUB_OUTPUT"
echo "comment_skip=true" | tee -a "$GITHUB_OUTPUT"
else
echo "COMMENT_USER_ID ${COMMENT_USER_ID} is not allowed to skip buildall."
exit
Expand All @@ -74,9 +81,6 @@ jobs:
exit
fi
PULL_REQUEST_NUM="$(echo "${{ github.event.issue.pull_request.url }}" | awk -F/ '{print $NF}')"
COMMIT_ID_FROM_TRIGGER="$(curl -s -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/${PULL_REQUEST_NUM}" | jq -r '.head.sha')"
TARGET_BRANCH="$(curl -s -H "Authorization:Bearer ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/pulls/${PULL_REQUEST_NUM}" | jq -r '.base.ref')"
echo "PULL_REQUEST_NUM=${PULL_REQUEST_NUM}" | tee -a "$GITHUB_OUTPUT"
echo "COMMIT_ID_FROM_TRIGGER=${COMMIT_ID_FROM_TRIGGER}" | tee -a "$GITHUB_OUTPUT"
echo "TARGET_BRANCH='${TARGET_BRANCH}'" | tee -a "$GITHUB_OUTPUT"
Expand Down
14 changes: 6 additions & 8 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ DEFINE_String(mem_limit, "90%");
// Soft memory limit as a fraction of hard memory limit.
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");

Expand Down Expand Up @@ -286,7 +289,7 @@ DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
DEFINE_mInt64(memory_limitation_per_thread_for_schema_change_bytes, "2147483648");

DEFINE_mInt32(cache_prune_interval_sec, "10");
DEFINE_mInt32(cache_periodic_prune_stale_sweep_sec, "300");
DEFINE_mInt32(cache_periodic_prune_stale_sweep_sec, "60");
// the clean interval of tablet lookup cache
DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, "30");
DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300");
Expand Down Expand Up @@ -565,7 +568,7 @@ DEFINE_String(pprof_profile_dir, "${DORIS_HOME}/log");
// for jeprofile in jemalloc
DEFINE_mString(jeprofile_dir, "${DORIS_HOME}/log");
DEFINE_mBool(enable_je_purge_dirty_pages, "true");
DEFINE_mString(je_dirty_pages_mem_limit_percent, "5%");
DEFINE_mString(je_dirty_pages_mem_limit_percent, "2%");

// to forward compatibility, will be removed later
DEFINE_mBool(enable_token_check, "true");
Expand All @@ -582,17 +585,12 @@ DEFINE_Int32(num_cores, "0");
DEFINE_Bool(ignore_broken_disk, "false");

// Sleep time in milliseconds between memory maintenance iterations
DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100");
DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");

// After full gc, no longer full gc and minor gc during sleep.
// After minor gc, no minor gc during sleep, but full gc is possible.
DEFINE_mInt32(memory_gc_sleep_time_ms, "500");

// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");

DEFINE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms, "50");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");

Expand Down
9 changes: 3 additions & 6 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ DECLARE_String(mem_limit);
// Soft memory limit as a fraction of hard memory limit.
DECLARE_Double(soft_mem_limit_frac);

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DECLARE_mDouble(cache_capacity_reduce_mem_limit_frac);

// Schema change memory limit as a fraction of soft memory limit.
DECLARE_Double(schema_change_mem_limit_frac);

Expand Down Expand Up @@ -641,12 +644,6 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
// After minor gc, no minor gc during sleep, but full gc is possible.
DECLARE_mInt32(memory_gc_sleep_time_ms);

// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);

// Sleep time in milliseconds between refresh iterations of workload group weighted memory ratio
DECLARE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);

Expand Down
195 changes: 147 additions & 48 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
namespace doris {
namespace {

int64_t last_print_proc_mem = 0;
int32_t refresh_cache_capacity_sleep_time_ms = 0;
#ifdef USE_JEMALLOC
int32_t je_purge_dirty_pages_sleep_time_ms = 0;
#endif

void update_rowsets_and_segments_num_metrics() {
if (config::is_cloud_mode()) {
// TODO(plat1ko): CloudStorageEngine
Expand Down Expand Up @@ -204,42 +210,104 @@ void Daemon::tcmalloc_gc_thread() {
#endif
}

void Daemon::memory_maintenance_thread() {
int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
int64_t last_print_proc_mem = PerfCounters::get_vm_rss();
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(interval_milliseconds))) {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
// Refresh allocator memory metrics.
void refresh_process_memory_metrics() {
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
}

void refresh_common_allocator_metrics() {
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
#ifdef USE_JEMALLOC
if (doris::MemInfo::je_dirty_pages_mem() > doris::MemInfo::je_dirty_pages_mem_limit() &&
GlobalMemoryArbitrator::is_exceed_soft_mem_limit()) {
doris::MemInfo::notify_je_purge_dirty_pages();
}
doris::MemInfo::refresh_allocator_mem();
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
MemInfo::refresh_memory_bvar();
}

void refresh_memory_state_after_memory_change() {
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();
// Refresh mem tracker each type counter.
doris::MemTrackerLimiter::refresh_global_counter();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
}

void refresh_cache_capacity() {
if (refresh_cache_capacity_sleep_time_ms <= 0) {
auto cache_capacity_reduce_mem_limit = uint64_t(
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
int64_t process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
double new_cache_capacity_adjust_weighted =
process_memory_usage <= cache_capacity_reduce_mem_limit
? 1
: std::min<double>(
1 - (process_memory_usage - cache_capacity_reduce_mem_limit) /
(doris::MemInfo::soft_mem_limit() -
cache_capacity_reduce_mem_limit),
0);
if (new_cache_capacity_adjust_weighted !=
doris::GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted) {
doris::GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted =
new_cache_capacity_adjust_weighted;
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
}
}
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
}

void je_purge_dirty_pages() {
#ifdef USE_JEMALLOC
if (je_purge_dirty_pages_sleep_time_ms <= 0 &&
doris::MemInfo::je_dirty_pages_mem() > doris::MemInfo::je_dirty_pages_mem_limit() &&
GlobalMemoryArbitrator::is_exceed_soft_mem_limit()) {
doris::MemInfo::notify_je_purge_dirty_pages();
je_purge_dirty_pages_sleep_time_ms = config::memory_gc_sleep_time_ms;
}
je_purge_dirty_pages_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
#endif
MemInfo::refresh_memory_bvar();

// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();
// Refresh mem tracker each type counter.
doris::MemTrackerLimiter::refresh_global_counter();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
}

void Daemon::memory_maintenance_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
// step 1. Refresh process memory metrics.
refresh_process_memory_metrics();

// step 2. Refresh jemalloc/tcmalloc metrics.
refresh_common_allocator_metrics();

// step 3. Update and print memory stat when the memory changes by 256M.
refresh_memory_state_after_memory_change();

// step 4. Asyn Refresh cache capacity
// TODO adjust cache capacity based on smoothstep (smooth gradient).
refresh_cache_capacity();

// step 5. Cancel top memory task when process memory exceed hard limit.
// TODO replace memory_gc_thread.

// step 6. Refresh weighted memory ratio of workload groups.
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
// TODO sort the operators that can spill, wake up the pipeline task spill
// or continue execution according to certain rules or cancel query.

// step 8. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
// TODO notify flush memtable

// step 9. Jemalloc purge all arena dirty pages
je_purge_dirty_pages();
}
}

Expand Down Expand Up @@ -301,10 +369,21 @@ void Daemon::memory_gc_thread() {
void Daemon::memtable_memory_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more frequently,
// which helps to accurately control the memory of LoadChannelMgr.
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memtable_mem_tracker_refresh_interval_ms))) {
do {
std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::memtable_memory_refresh_lock);
while (_stop_background_threads_latch.count() != 0 &&
!doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.load(
std::memory_order_relaxed)) {
doris::GlobalMemoryArbitrator::memtable_memory_refresh_cv.wait_for(
l, std::chrono::seconds(1));
}
if (_stop_background_threads_latch.count() == 0) {
break;
}
doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
}
doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.store(
false, std::memory_order_relaxed);
} while (true);
}

/*
Expand Down Expand Up @@ -396,6 +475,35 @@ void Daemon::je_purge_dirty_pages_thread() const {
} while (true);
}

void Daemon::cache_adjust_capacity_thread() {
do {
std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::cache_adjust_capacity_lock);
while (_stop_background_threads_latch.count() != 0 &&
!doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::seconds(1));
}
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
if (_stop_background_threads_latch.count() == 0) {
break;
}
if (config::disable_memory_gc) {
continue;
}
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted,
profile.get());
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
false, std::memory_order_relaxed);
} while (true);
}

void Daemon::cache_prune_stale_thread() {
int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
Expand All @@ -411,14 +519,6 @@ void Daemon::cache_prune_stale_thread() {
}
}

void Daemon::wg_weighted_memory_ratio_refresh_thread() {
// Refresh weighted memory ratio of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
}
}

void Daemon::be_proc_monitor_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
Expand Down Expand Up @@ -455,6 +555,10 @@ void Daemon::start() {
"Daemon", "je_purge_dirty_pages_thread",
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "cache_adjust_capacity_thread",
[this]() { this->cache_adjust_capacity_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); },
&_threads.emplace_back());
Expand All @@ -464,11 +568,6 @@ void Daemon::start() {
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "wg_weighted_memory_ratio_refresh_thread",
[this]() { this->wg_weighted_memory_ratio_refresh_thread(); },
&_threads.emplace_back());

if (config::enable_be_proc_monitor) {
st = Thread::create(
"Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); },
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class Daemon {
void memtable_memory_refresh_thread();
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void cache_adjust_capacity_thread();
void cache_prune_stale_thread();
void report_runtime_query_statistics_thread();
void wg_weighted_memory_ratio_refresh_thread();
void be_proc_monitor_thread();

CountDownLatch _stop_background_threads_latch;
Expand Down
Loading

0 comments on commit 1a83ee4

Please sign in to comment.