Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 12, 2024
1 parent 7fbae69 commit bbd2b6a
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 29 deletions.
14 changes: 6 additions & 8 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,31 @@
namespace doris {

constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000;
std::atomic<long> mem_tracker_group_counter(0);
bvar::Adder<int64_t> g_memtracker_cnt("memtracker_cnt");

std::vector<MemTracker::TrackersGroup> MemTracker::mem_tracker_pool(MEM_TRACKERS_GROUP_NUM);

MemTracker::MemTracker() {
g_memtracker_cnt << 1;
}

MemTracker::MemTracker(const std::string& label) : MemTracker() {
MemTracker::MemTracker(const std::string& label) {
_label = label;
_group_num = random() % MEM_TRACKERS_GROUP_NUM;
_group_num = mem_tracker_group_counter.fetch_add(1) % MEM_TRACKERS_GROUP_NUM;
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_group_num].group_lock);
_trackers_group_it = mem_tracker_pool[_group_num].trackers.insert(
mem_tracker_pool[_group_num].trackers.end(), this);
}
g_memtracker_cnt << 1;
}

MemTracker::~MemTracker() {
{
if (_group_num != -1) {
std::lock_guard<std::mutex> l(mem_tracker_pool[_group_num].group_lock);
if (_trackers_group_it != mem_tracker_pool[_group_num].trackers.end()) {
mem_tracker_pool[_group_num].trackers.erase(_trackers_group_it);
_trackers_group_it = mem_tracker_pool[_group_num].trackers.end();
}
g_memtracker_cnt << -1;
}
g_memtracker_cnt << -1;
}

} // namespace doris
4 changes: 2 additions & 2 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace doris {
*/
class MemTracker {
public:
MemTracker();
MemTracker() = default;
MemTracker(const std::string& label);
~MemTracker();

Expand Down Expand Up @@ -120,7 +120,7 @@ class MemTracker {
// Multiple groups are used to reduce the impact of locks.
static std::vector<TrackersGroup> mem_tracker_pool;
// Group number in mem_tracker_pool, generated by the timestamp.
int64_t _group_num;
int64_t _group_num {-1};
// Iterator into mem_tracker_pool for this object. Stored to have O(1) remove.
std::list<MemTracker*>::iterator _trackers_group_it;
};
Expand Down
18 changes: 9 additions & 9 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ static bvar::Adder<int64_t> memory_schema_change_trackers_sum_bytes(
"memory_schema_change_trackers_sum_bytes");
static bvar::Adder<int64_t> memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes");

std::atomic<long> mem_tracker_limiter_group_counter(0);
constexpr auto GC_MAX_SEEK_TRACKER = 1000;

std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
Expand Down Expand Up @@ -78,7 +79,8 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_
if (_type == Type::GLOBAL) {
_group_num = 0;
} else {
_group_num = random() % MEM_TRACKER_GROUP_NUM + 1;
_group_num =
mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 1) + 1;
}

// currently only select/load need runtime query statistics
Expand Down Expand Up @@ -239,7 +241,7 @@ void MemTrackerLimiter::refresh_global_counter() {
}
int64_t all_trackers_mem_sum = 0;
for (auto it : type_mem_sum) {
TypeMemSum[static_cast<int>(it.first)].set_consumption(it.second);
MemTrackerLimiter::TypeMemSum[it.first].set_consumption(it.second);

all_trackers_mem_sum += it.second;
switch (it.first) {
Expand All @@ -266,8 +268,6 @@ void MemTrackerLimiter::refresh_global_counter() {
case Type::OTHER:
memory_other_trackers_sum_bytes
<< it.second - memory_other_trackers_sum_bytes.get_value();
case Type::TYPE_COUNT:
break;
}
}
all_trackers_mem_sum += MemInfo::allocator_cache_mem();
Expand Down Expand Up @@ -298,14 +298,14 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<Snapshot>* snapshots)
MemTrackerLimiter::refresh_global_counter();
int64_t all_trackers_mem_sum = 0;
Snapshot snapshot;
for (size_t i = 0; i < static_cast<int>(Type::TYPE_COUNT); ++i) {
for (const auto& it : MemTrackerLimiter::TypeMemSum) {
snapshot.type = "overview";
snapshot.label = type_string(static_cast<Type>(i));
snapshot.label = type_string(it.first);
snapshot.limit = -1;
snapshot.cur_consumption = TypeMemSum[i].consumption();
snapshot.peak_consumption = TypeMemSum[i].peak_consumption();
snapshot.cur_consumption = it.second.consumption();
snapshot.peak_consumption = it.second.peak_consumption();
(*snapshots).emplace_back(snapshot);
all_trackers_mem_sum += TypeMemSum[i].consumption();
all_trackers_mem_sum += it.second.consumption();
}

snapshot.type = "overview";
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,11 @@ class MemTrackerLimiter final {
COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks.
SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
OTHER = 5,
TYPE_COUNT
};

// Corresponding to MemTrackerLimiter::Type.
// MemTracker contains atomic variables, which are not allowed to be copied or moved.
inline static MemTracker TypeMemSum[] = {MemTracker(), MemTracker(), MemTracker(),
MemTracker(), MemTracker(), MemTracker()};
inline static std::unordered_map<Type, MemTracker> TypeMemSum;

struct Snapshot {
std::string type;
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
DCHECK(mem_tracker);
CHECK(init());
flush_untracked_mem();
_reserved_mem_stack.push_back(_reserved_mem);
_last_attach_snapshots_stack.push_back({_reserved_mem, _consumer_tracker_stack});
if (_reserved_mem != 0) {
// _untracked_mem temporary store bytes that not synchronized to process reserved memory,
// but bytes have been subtracted from thread _reserved_mem.
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
_consumer_tracker_stack.clear();
_limiter_tracker = mem_tracker;
}

Expand All @@ -62,9 +63,10 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
CHECK(init());
flush_untracked_mem();
release_reserved();
DCHECK(!_reserved_mem_stack.empty());
_reserved_mem = _reserved_mem_stack.back();
_reserved_mem_stack.pop_back();
DCHECK(!_last_attach_snapshots_stack.empty());
_reserved_mem = _last_attach_snapshots_stack.back().reserved_mem;
_consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack;
_last_attach_snapshots_stack.pop_back();
_limiter_tracker = old_mem_tracker;
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,22 @@ class ThreadMemTrackerMgr {
int64_t reserved_mem() const { return _reserved_mem; }

private:
struct LastAttachSnapshot {
int64_t reserved_mem = 0;
std::vector<MemTracker*> consumer_tracker_stack;
};

// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
// Cache untracked mem.
int64_t _untracked_mem = 0;
int64_t _old_untracked_mem = 0;

int64_t _reserved_mem = 0;

// SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used,
// so `attach_limiter_tracker` may be nested.
std::vector<int64_t> _reserved_mem_stack;
std::vector<LastAttachSnapshot> _last_attach_snapshots_stack;

std::string _failed_consume_msg = std::string();
// If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit.
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
bool RoutineLoadTaskExecutor::_reach_memory_limit() {
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
auto current_load_mem_value =
MemTrackerLimiter::TypeMemSum[static_cast<int>(MemTrackerLimiter::Type::LOAD)]
.consumption();
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD].consumption();
if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
<< " current_load_mem_value: " << current_load_mem_value
Expand Down

0 comments on commit bbd2b6a

Please sign in to comment.