Skip to content

Commit

Permalink
[fix](memory) Fix BE memory info compatible with Cgroup (apache#35412)
Browse files Browse the repository at this point in the history
1. `memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)`, free cache can be reused,
   so, modify cgroup_memory_usage = memory.usage_in_bytes - memory.meminfo["Cached"].
2. If system not configured with cgroup, find cgroup file path will failed, refactor refresh cgroup memory info, compatible with find failed.
  • Loading branch information
xinyiZzz committed May 27, 2024
1 parent af986c3 commit ba8f54c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 23 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ DEFINE_String(priority_networks, "");
// performance or compact
DEFINE_String(memory_mode, "moderate");

DEFINE_mBool(enable_use_cgroup_memory_info, "true");

// process memory limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
// or percentage of the physical memory ('<int>%').
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ DECLARE_String(priority_networks);
// performance moderate or compact, only tcmalloc compile
DECLARE_String(memory_mode);

// if true, process memory limit and memory usage based on cgroup memory info.
DECLARE_mBool(enable_use_cgroup_memory_info);

// process memory limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
// or percentage of the physical memory ('<int>%').
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/cgroup_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ Status CGroupUtil::find_cgroup_mem_usage(int64_t* bytes) {
return read_cgroup_value(usage_file_path, bytes);
}

Status CGroupUtil::find_cgroup_mem_info(std::string* file_path) {
if (!enable()) {
return Status::InvalidArgument("cgroup is not enabled!");
}
string cgroup_path;
RETURN_IF_ERROR(find_abs_cgroup_path("memory", &cgroup_path));
*file_path = cgroup_path + "/memory.meminfo";
return Status::OK();
}

Status CGroupUtil::find_cgroup_cpu_limit(float* cpu_count) {
if (!enable()) {
return Status::InvalidArgument("cgroup is not enabled!");
Expand Down
1 change: 1 addition & 0 deletions be/src/util/cgroup_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class CGroupUtil {
// memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)
// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
static Status find_cgroup_mem_usage(int64_t* bytes);
static Status find_cgroup_mem_info(std::string* file_path);

// Determines the CGroup cpu cores limit from the current processes' cgroup.
static Status find_cgroup_cpu_limit(float* cpu_count);
Expand Down
108 changes: 87 additions & 21 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ bvar::PassiveStatus<int64_t> g_sys_mem_avail(

bool MemInfo::_s_initialized = false;
std::atomic<int64_t> MemInfo::_s_physical_mem = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_limit_refresh_wait_times = 0;
std::atomic<int64_t> MemInfo::_s_mem_limit = std::numeric_limits<int64_t>::max();
std::atomic<int64_t> MemInfo::_s_soft_mem_limit = std::numeric_limits<int64_t>::max();

Expand All @@ -69,6 +67,12 @@ std::string MemInfo::_s_allocator_cache_mem_str = "";
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;

int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
static std::unordered_map<std::string, int64_t> _s_cgroup_mem_info_bytes;
bool MemInfo::_s_cgroup_mem_refresh_state = false;
int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;

static std::unordered_map<std::string, int64_t> _mem_info_bytes;
std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
Expand Down Expand Up @@ -393,27 +397,91 @@ void MemInfo::refresh_proc_meminfo() {
meminfo.close();
}

// 1. calculate physical_mem
int64_t physical_mem = -1;
int64_t cgroup_mem_limit = -1;
physical_mem = _mem_info_bytes["MemTotal"];
if (_s_cgroup_mem_limit_refresh_wait_times >= 0) {
// refresh cgroup memory
if (_s_cgroup_mem_refresh_wait_times >= 0 && config::enable_use_cgroup_memory_info) {
int64_t cgroup_mem_limit = -1;
int64_t cgroup_mem_usage = -1;
std::string cgroup_mem_info_file_path;
_s_cgroup_mem_refresh_state = true;
Status status = CGroupUtil::find_cgroup_mem_limit(&cgroup_mem_limit);
if (status.ok() && cgroup_mem_limit > 0) {
if (!status.ok() || cgroup_mem_limit <= 0) {
_s_cgroup_mem_refresh_state = false;
}
status = CGroupUtil::find_cgroup_mem_usage(&cgroup_mem_usage);
if (!status.ok() || cgroup_mem_usage <= 0) {
_s_cgroup_mem_refresh_state = false;
}
status = CGroupUtil::find_cgroup_mem_info(&cgroup_mem_info_file_path);
if (status.ok()) {
std::ifstream cgroup_meminfo(cgroup_mem_info_file_path, std::ios::in);
std::string line;

while (cgroup_meminfo.good() && !cgroup_meminfo.eof()) {
getline(cgroup_meminfo, line);
std::vector<std::string> fields =
strings::Split(line, " ", strings::SkipWhitespace());
if (fields.size() < 2) {
continue;
}
std::string key = fields[0].substr(0, fields[0].size() - 1);

StringParser::ParseResult result;
auto mem_value = StringParser::string_to_int<int64_t>(fields[1].data(),
fields[1].size(), &result);

if (result == StringParser::PARSE_SUCCESS) {
if (fields.size() == 2) {
_s_cgroup_mem_info_bytes[key] = mem_value;
} else if (fields[2] == "kB") {
_s_cgroup_mem_info_bytes[key] = mem_value * 1024L;
}
}
}
if (cgroup_meminfo.is_open()) {
cgroup_meminfo.close();
}
} else {
_s_cgroup_mem_refresh_state = false;
}

if (_s_cgroup_mem_refresh_state) {
_s_cgroup_mem_limit = cgroup_mem_limit;
_s_cgroup_mem_limit_refresh_wait_times =
-1000; // wait 10s, 1000 * 100ms, avoid too frequently.
// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
// memory.usage_in_bytes ~= free.used + free.(buff/cache) - (buff)
// so, memory.usage_in_bytes - memory.meminfo["Cached"]
_s_cgroup_mem_usage = cgroup_mem_usage - _s_cgroup_mem_info_bytes["Cached"];
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
} else {
_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
_s_cgroup_mem_limit_refresh_wait_times =
-6000; // find cgroup failed, wait 60s, 6000 * 100ms.
// find cgroup failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
LOG(INFO)
<< "Refresh cgroup memory failed, refresh again after 300s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage
<< ", cgroup mem info cached: " << _s_cgroup_mem_info_bytes["Cached"];
}
} else {
_s_cgroup_mem_limit_refresh_wait_times++;
if (config::enable_use_cgroup_memory_info) {
_s_cgroup_mem_refresh_wait_times++;
} else {
_s_cgroup_mem_refresh_state = false;
}
}
if (_s_cgroup_mem_limit > 0) {

// 1. calculate physical_mem
int64_t physical_mem = -1;

physical_mem = _mem_info_bytes["MemTotal"];
if (_s_cgroup_mem_refresh_state) {
// In theory, always cgroup_mem_limit < physical_mem
physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
if (physical_mem < 0) {
physical_mem = _s_cgroup_mem_limit;
} else {
physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
}
}

if (physical_mem <= 0) {
Expand Down Expand Up @@ -449,16 +517,14 @@ void MemInfo::refresh_proc_meminfo() {

// 3. refresh process available memory
int64_t mem_available = -1;
int64_t cgroup_mem_usage = 0;
if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
mem_available = _mem_info_bytes["MemAvailable"];
}
auto status = CGroupUtil::find_cgroup_mem_usage(&cgroup_mem_usage);
if (status.ok() && cgroup_mem_usage > 0 && cgroup_mem_limit > 0) {
if (_s_cgroup_mem_refresh_state) {
if (mem_available < 0) {
mem_available = cgroup_mem_limit - cgroup_mem_usage;
mem_available = _s_cgroup_mem_limit - _s_cgroup_mem_usage;
} else {
mem_available = std::min(mem_available, cgroup_mem_limit - cgroup_mem_usage);
mem_available = std::min(mem_available, _s_cgroup_mem_limit - _s_cgroup_mem_usage);
}
}
if (mem_available < 0) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/util/mem_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,18 @@ class MemInfo {
private:
static bool _s_initialized;
static std::atomic<int64_t> _s_physical_mem;
static int64_t _s_cgroup_mem_limit;
static int64_t _s_cgroup_mem_limit_refresh_wait_times;
static std::atomic<int64_t> _s_mem_limit;
static std::atomic<int64_t> _s_soft_mem_limit;

static std::atomic<int64_t> _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
static std::atomic<int64_t> _s_virtual_memory_used;

static int64_t _s_cgroup_mem_limit;
static int64_t _s_cgroup_mem_usage;
static bool _s_cgroup_mem_refresh_state;
static int64_t _s_cgroup_mem_refresh_wait_times;

static std::atomic<int64_t> _s_sys_mem_available;
static int64_t _s_sys_mem_available_low_water_mark;
static int64_t _s_sys_mem_available_warning_water_mark;
Expand Down

0 comments on commit ba8f54c

Please sign in to comment.