Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 22, 2024
1 parent cacb94a commit 69273a9
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 31 deletions.
4 changes: 1 addition & 3 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
_load_id.to_thrift(),
MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string())),
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(
1)}; // tg_id=1 is normal workload group.
fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()))};
}
g_loadchannel_cnt << 1;
// _last_updated_time should be set before being inserted to
Expand Down
21 changes: 8 additions & 13 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,21 +365,16 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e
_query_thread_context = {load_tid, query_context->query_mem_tracker,
query_context->workload_group()};
} else {
_query_thread_context = {
load_tid,
MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())),
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(
1)}; // tg_id=1 is normal workload group.
_query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}",
((UniqueId)load_id).to_string()))};
}
#else
_query_thread_context = {
load_tid,
MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}", ((UniqueId)load_id).to_string())),
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(1)};
_query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD,
fmt::format("(FromLoadStream)Load#Id={}",
((UniqueId)load_id).to_string()))};
#endif
}

Expand Down
14 changes: 9 additions & 5 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,17 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
if (!_limiter_tracker_raw->try_consume(size)) {
return false;
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
_limiter_tracker_raw->release(size); // rollback
return false;
}
auto wg_ptr = _wg_wptr.lock();
if (!wg_ptr) {
wg_ptr->add_wg_refresh_interval_memory_growth(size);
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
_limiter_tracker_raw->release(size); // rollback
return false;
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
_limiter_tracker_raw->release(size); // rollback
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
return false;
}
if (_count_scope_mem) {
_scope_mem += size;
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ void AttachTask::init(const QueryThreadContext& query_thread_context) {
}

AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
QueryThreadContext query_thread_context = {
TUniqueId(), mem_tracker,
doris::ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(1)};
QueryThreadContext query_thread_context = {TUniqueId(), mem_tracker};
init(query_thread_context);
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ class QueryThreadContext {
const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const std::weak_ptr<WorkloadGroup>& wg_wptr)
: query_id(query_id), query_mem_tracker(mem_tracker), wg_wptr(wg_wptr) {}
// If use WorkloadGroup and can get WorkloadGroup ptr, must as a parameter.
QueryThreadContext(const TUniqueId& query_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: query_id(query_id), query_mem_tracker(mem_tracker) {}

// Not thread safe, generally be called in class constructor, shared_ptr use_count may be
// wrong when called by multiple threads, cause crash after object be destroyed prematurely.
Expand Down
14 changes: 9 additions & 5 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,21 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
used_memory += tracker->consumption();
}
}
refresh_memory(used_memory);
return used_memory;
}

int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}

void WorkloadGroup::refresh_memory(int64_t used_memory) {
// refresh total memory used.
_total_mem_used = used_memory;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
return used_memory;
}

int64_t WorkloadGroup::memory_used() {
return make_memory_tracker_snapshots(nullptr);
}

void WorkloadGroup::set_weighted_memory_ratio(double ratio) {
Expand Down
16 changes: 14 additions & 2 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
// call make_memory_tracker_snapshots, so also refresh total memory used.
int64_t memory_used();
void refresh_memory(int64_t used_memory);

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
Expand All @@ -90,8 +91,19 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
}

void set_weighted_memory_ratio(double ratio);
void add_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_add(size);
bool add_wg_refresh_interval_memory_growth(int64_t size) {
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used =
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) *
_weighted_mem_ratio);
if ((weighted_mem_used > ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
return true;
}
}
void sub_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_sub(size);
Expand Down

0 comments on commit 69273a9

Please sign in to comment.