Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 20, 2024
1 parent 6eb3eac commit bdd0a8d
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 143 deletions.
3 changes: 1 addition & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
// Sleep time in milliseconds between memtbale flush mgr refresh iterations
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");
DEFINE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms, "50");

// percent of (active memtables size / all memtables size) when reach hard limit
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);

// Sleep time in milliseconds between refresh iterations of workload group memory statistics
DECLARE_mInt64(wg_mem_refresh_interval_ms);
// Sleep time in milliseconds between refresh iterations of workload group weighted memory ratio
DECLARE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms);

// percent of (active memtables size / all memtables size) when reach hard limit
DECLARE_mInt32(memtable_hard_limit_active_percent);
Expand Down
11 changes: 6 additions & 5 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,11 @@ void Daemon::je_purge_dirty_pages_thread() const {
} while (true);
}

void Daemon::wg_mem_used_refresh_thread() {
// Refresh memory usage and limit of workload groups
void Daemon::wg_weighted_memory_ratio_refresh_thread() {
// Refresh weighted memory ratio of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio();
}
}

Expand Down Expand Up @@ -441,7 +441,8 @@ void Daemon::start() {
CHECK(st.ok()) << st;

st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
"Daemon", "wg_weighted_memory_ratio_refresh_thread",
[this]() { this->wg_weighted_memory_ratio_refresh_thread(); },
&_threads.emplace_back());

if (config::enable_be_proc_monitor) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Daemon {
void calculate_metrics_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
void wg_mem_used_refresh_thread();
void wg_weighted_memory_ratio_refresh_thread();
void be_proc_monitor_thread();

CountDownLatch _stop_background_threads_latch;
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m
} else if (is_wg_mem_low_water_mark) {
int64_t query_weighted_limit = 0;
int64_t query_weighted_consumption = 0;
query_ctx->get_weighted_mem_info(query_weighted_limit, query_weighted_consumption);
query_ctx->query_mem_tracker->get_weighted_memory(query_weighted_limit,
query_weighted_consumption);
if (query_weighted_limit == 0 || query_weighted_consumption < query_weighted_limit) {
return false;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
WorkloadGroupPtr workload_group_ptr =
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);
Expand Down
21 changes: 19 additions & 2 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ class MemTrackerLimiter final : public MemTracker {
return querytid;
}

void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
_weighted_limit = weighted_limit;
_weighted_ratio = weighted_ratio;
}

void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
weighted_limit = _weighted_limit;
weighted_consumption = int64_t(_consumption->current_value() * _weighted_ratio);
}

// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();

Expand All @@ -220,8 +232,10 @@ class MemTrackerLimiter final : public MemTracker {
return msg.str();
}

// only for work load group
// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator tg_tracker_limiter_group_it;
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator wg_tracker_limiter_group_it;
std::shared_ptr<std::atomic_int64_t> wg_refresh_interval_memory_growth;

private:
friend class ThreadMemTrackerMgr;
Expand All @@ -245,8 +259,11 @@ class MemTrackerLimiter final : public MemTracker {
// to avoid frequent calls to consume/release of MemTracker.
std::atomic<int64_t> _untracked_mem = 0;

// query or load
// only for query or load
std::atomic<bool> _is_query_cancelled = false;
std::mutex _weighted_mem_lock;
double _weighted_ratio = 0;
int64_t _weighted_limit = 0;

// Avoid frequent printing.
bool _enable_print_log_usage = false;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
_limiter_tracker_raw->release(size); // rollback
return false;
}
_limiter_tracker_raw->wg_refresh_interval_memory_growth->fetch_add(size);
if (_count_scope_mem) {
_scope_mem += size;
}
Expand All @@ -306,6 +307,7 @@ inline void ThreadMemTrackerMgr::release_reserved() {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
_limiter_tracker_raw->release(_reserved_mem);
_limiter_tracker_raw->wg_refresh_interval_memory_growth->fetch_sub(_reserved_mem);
if (_count_scope_mem) {
_scope_mem -= _reserved_mem;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ QueryContext::~QueryContext() {
if (_workload_group) {
group_id = _workload_group->id(); // before remove
_workload_group->remove_mem_tracker_limiter(query_mem_tracker);
_workload_group->remove_query(_query_id);
}

_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
Expand Down
15 changes: 0 additions & 15 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,6 @@ class QueryContext {
return _running_big_mem_op_num.load(std::memory_order_relaxed);
}

void set_weighted_mem(int64_t weighted_limit, int64_t weighted_consumption) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
_weighted_consumption = weighted_consumption;
_weighted_limit = weighted_limit;
}
void get_weighted_mem_info(int64_t& weighted_limit, int64_t& weighted_consumption) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
weighted_limit = _weighted_limit;
weighted_consumption = _weighted_consumption;
}

DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
Expand Down Expand Up @@ -310,10 +299,6 @@ class QueryContext {
std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
std::mutex _pipeline_map_write_lock;

std::mutex _weighted_mem_lock;
int64_t _weighted_consumption = 0;
int64_t _weighted_limit = 0;

std::mutex _profile_mutex;

// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
Expand Down
39 changes: 30 additions & 9 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark) {}
_spill_high_watermark(tg_info.spill_high_watermark) {
_refresh_interval_memory_growth = std::make_shared<std::atomic_int64_t>(0);
}

std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
Expand All @@ -74,7 +76,7 @@ std::string WorkloadGroup::debug_string() const {
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size());
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_num);
}

void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
Expand Down Expand Up @@ -107,41 +109,60 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
}
}

int64_t WorkloadGroup::memory_used() {
int64_t WorkloadGroup::make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (const auto& trackerWptr : mem_tracker_group.trackers) {
auto tracker = trackerWptr.lock();
CHECK(tracker != nullptr);
if (tracker_snapshots != nullptr) {
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
}
used_memory += tracker->consumption();
}
}
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _refresh_interval_memory_growth.
_refresh_interval_memory_growth->store(0.0);
return used_memory;
}

void WorkloadGroup::set_weighted_memory_used(int64_t wg_total_mem_used, double ratio) {
_weighted_mem_used.store(int64_t(wg_total_mem_used * ratio), std::memory_order_relaxed);
int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}

void WorkloadGroup::set_weighted_memory_ratio(double ratio) {
_weighted_mem_ratio = ratio;
}

void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
mem_tracker_ptr->tg_tracker_limiter_group_it =
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.insert(
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
mem_tracker_ptr->wg_refresh_interval_memory_growth = _refresh_interval_memory_growth;
_query_num++;
}

void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
if (mem_tracker_ptr->tg_tracker_limiter_group_it !=
if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
_mem_tracker_limiter_pool[group_num].trackers.end()) {
_mem_tracker_limiter_pool[group_num].trackers.erase(
mem_tracker_ptr->tg_tracker_limiter_group_it);
mem_tracker_ptr->tg_tracker_limiter_group_it =
mem_tracker_ptr->wg_tracker_limiter_group_it);
mem_tracker_ptr->wg_tracker_limiter_group_it =
_mem_tracker_limiter_pool[group_num].trackers.end();
}
_query_num--;
}

int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) {
Expand Down
51 changes: 16 additions & 35 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit;
};

// make memory snapshots and refresh total memory used at the same time.
int64_t make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();

int spill_threshold_low_water_mark() const {
Expand All @@ -85,10 +89,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _spill_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_used(int64_t wg_total_mem_used, double ratio);
void set_weighted_memory_ratio(double ratio);

void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
auto weighted_mem_used = _weighted_mem_used.load(std::memory_order_relaxed);
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used = int64_t(
(_total_mem_used + _refresh_interval_memory_growth->load()) * _weighted_mem_ratio);
*is_low_wartermark =
(weighted_mem_used > ((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
Expand All @@ -112,37 +119,14 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit > 0;
}

Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext> query_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
// If the workload group is set shutdown, then should not run any more,
// because the scheduler pool and other pointer may be released.
return Status::InternalError(
"Failed add query to wg {}, the workload group is shutdown. host: {}", _id,
BackendOptions::get_localhost());
}
_query_ctxs.insert({query_id, query_ctx});
return Status::OK();
}

void remove_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_query_ctxs.erase(query_id);
}

void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
}

bool can_be_dropped() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _is_shutdown && _query_ctxs.size() == 0;
}

int query_num() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_ctxs.size();
return _is_shutdown && _query_num == 0;
}

int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc);
Expand All @@ -156,11 +140,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void try_stop_schedulers();

std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> queries() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_ctxs;
}

std::string thread_debug_info();

private:
Expand All @@ -169,9 +148,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::string _name;
int64_t _version;
int64_t _memory_limit; // bytes
// `_weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
std::atomic_int64_t _weighted_mem_used = 0; // bytes
// last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called.
std::atomic_int64_t _total_mem_used = 0; // bytes
// last value of refresh_wg_weighted_memory_ratio.
std::atomic<double> _weighted_mem_ratio = 0.0;
std::shared_ptr<std::atomic_int64_t> _refresh_interval_memory_growth;
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
std::vector<TrackerLimiterGroup> _mem_tracker_limiter_pool;
Expand All @@ -186,7 +167,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
// new query can not submit
// waiting running query to be cancelled or finish
bool _is_shutdown = false;
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
std::atomic<int> _query_num;

std::shared_mutex _task_sched_lock;
std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
Expand Down
Loading

0 comments on commit bdd0a8d

Please sign in to comment.