Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 9, 2024
1 parent cd7718d commit 1decfa6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 31 deletions.
5 changes: 2 additions & 3 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ bvar::Adder<int64_t> g_memtracker_cnt("memtracker_cnt");
std::vector<MemTracker::TrackerGroup> MemTracker::mem_tracker_pool(1000);

MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) {
_consumption = std::make_shared<MemCounter>();
bind_parent(parent);
}

Expand Down Expand Up @@ -78,8 +77,8 @@ MemTracker::Snapshot MemTracker::make_snapshot() const {
snapshot.label = _label;
snapshot.parent_label = _parent_label;
snapshot.limit = -1;
snapshot.cur_consumption = _consumption->current_value();
snapshot.peak_consumption = _consumption->peak_value();
snapshot.cur_consumption = consumption();
snapshot.peak_consumption = peak_consumption();
return snapshot;
}

Expand Down
18 changes: 9 additions & 9 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,27 @@ class MemTracker {
const std::string& parent_label() const { return _parent_label; }
const std::string& set_parent_label() const { return _parent_label; }
// Returns the memory consumed in bytes.
int64_t consumption() const { return _consumption->current_value(); }
int64_t peak_consumption() const { return _consumption->peak_value(); }
int64_t consumption() const { return _consumption.current_value(); }
int64_t peak_consumption() const { return _consumption.peak_value(); }

void consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return;
}
_consumption->add(bytes);
_consumption.add(bytes);
if (_query_statistics) {
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
_query_statistics->set_current_used_memory_bytes(_consumption->current_value());
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
}

void consume_no_update_peak(int64_t bytes) { // need extreme fast
_consumption->add_no_update_peak(bytes);
_consumption.add_no_update_peak(bytes);
}

void release(int64_t bytes) { _consumption->sub(bytes); }
void release(int64_t bytes) { _consumption.sub(bytes); }

void set_consumption(int64_t bytes) { _consumption->set(bytes); }
void set_consumption(int64_t bytes) { _consumption.set(bytes); }

std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; }

Expand Down Expand Up @@ -211,7 +211,7 @@ class MemTracker {
// label used in the make snapshot, not guaranteed unique.
std::string _label;

std::shared_ptr<MemCounter> _consumption = nullptr;
MemCounter _consumption;

// Tracker is located in group num in mem_tracker_pool
int64_t _parent_group_num = 0;
Expand Down
24 changes: 12 additions & 12 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,26 @@ MemTrackerLimiter::~MemTrackerLimiter() {
"tracker web or log, this indicates that there may be a memory leak. "
"4. If you need to "
"transfer memory tracking value between two trackers, can use transfer_to.";
if (_consumption->current_value() != 0) {
if (consumption() != 0) {
// TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end.
#ifndef NDEBUG
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
label(), consumption(), peak_consumption(),
mem_tracker_inaccurate_msg);
LOG(FATAL) << err_msg << print_address_sanitizers();
}
#endif
if (ExecEnv::tracking_memory()) {
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption());
}
_consumption->set(0);
_consumption.set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty() && !is_group_commit_load) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< ", peak consumption: " << peak_consumption()
<< print_address_sanitizers();
#endif
}
Expand All @@ -162,7 +162,7 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
"consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
"buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(),
_label, consumption(), peak_consumption(),
buf, size, it->first, it->second.size,
get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace));
}
Expand All @@ -185,7 +185,7 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
"[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
"{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
"{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf,
_label, consumption(), peak_consumption(), buf,
size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
it->second.stack_trace));
}
Expand All @@ -194,7 +194,7 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf, size,
_label, consumption(), peak_consumption(), buf, size,
get_stack_trace(1, "FULL_WITH_INLINE")));
}
}
Expand All @@ -208,7 +208,7 @@ std::string MemTrackerLimiter::print_address_sanitizers() {
auto msg = fmt::format(
"\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
_label, _consumption->current_value(), _consumption->peak_value(), it.first,
_label, consumption(), peak_consumption(), it.first,
it.second.size, it.second.stack_trace);
LOG(INFO) << msg;
detail += msg;
Expand All @@ -227,8 +227,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
snapshot.type = type_string(_type);
snapshot.label = _label;
snapshot.limit = _limit;
snapshot.cur_consumption = _consumption->current_value();
snapshot.peak_consumption = _consumption->peak_value();
snapshot.cur_consumption = consumption();
snapshot.peak_consumption = peak_consumption();
return snapshot;
}

Expand Down Expand Up @@ -519,7 +519,7 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), print_bytes(limit()),
print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()),
print_bytes(peak_consumption()), print_bytes(consumption()),
BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ class MemTrackerLimiter final : public MemTracker {
int64_t limit() const { return _limit; }
bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }

bool try_consume(int64_t bytes) const {
bool try_consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool st = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
st = _consumption->try_add(bytes, _limit);
st = _consumption.try_add(bytes, _limit);
} else {
_consumption->add(bytes);
_consumption.add(bytes);
}
if (st && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
_query_statistics->set_current_used_memory_bytes(_consumption->current_value());
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
return st;
}
Expand Down Expand Up @@ -215,7 +215,7 @@ class MemTrackerLimiter final : public MemTracker {
std::string debug_string() override {
std::stringstream msg;
msg << "limit: " << _limit << "; "
<< "consumption: " << _consumption->current_value() << "; "
<< "consumption: " << consumption() << "; "
<< "label: " << _label << "; "
<< "type: " << type_string(_type) << "; ";
return msg.str();
Expand Down Expand Up @@ -283,7 +283,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memory_overcommit)) {
return Status::OK();
}
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
if (_limit > 0 && consumption() + bytes > _limit) {
return Status::MemoryLimitExceeded(fmt::format(
"failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str()));
}
Expand Down

0 comments on commit 1decfa6

Please sign in to comment.