Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Aug 13, 2024
1 parent c0f1a43 commit 00fc066
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 101 deletions.
2 changes: 0 additions & 2 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <fmt/format.h>
#include <sys/stat.h>

#include <cfloat>
#include <filesystem>

#include "util/cgroup_util.h"
#include "util/defer_op.h"

namespace doris {
Expand Down
156 changes: 75 additions & 81 deletions be/src/common/cgroup_memory_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@
#include <memory>
#include <utility>

#include "common/exception.h"
#include "common/status.h"
#include "util/cgroup_util.h"

namespace doris {

// Is the memory controller of cgroups v2 enabled on the system?
// Assumes that cgroupsv2_enable() is enabled.
bool cgroupsv2_memory_controller_enabled() {
Status cgroupsv2_memory_controller_enabled(bool* ret) {
#if defined(OS_LINUX)
if (!CGroupUtil::cgroupsv2_enable()) {
throw doris::Exception(doris::ErrorCode::CGROUP_ERROR, "cgroupsv2_enable is false");
return Status::CgroupError("cgroupsv2_enable is false");
}
// According to https://docs.kernel.org/admin-guide/cgroup-v2.html, file "cgroup.controllers" defines which controllers are available
// for the current + child cgroups. The set of available controllers can be restricted from level to level using file
Expand All @@ -45,36 +44,35 @@ bool cgroupsv2_memory_controller_enabled() {
auto cgroup_dir = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
std::ifstream controllers_file(cgroup_dir / "cgroup.controllers");
if (!controllers_file.is_open()) {
return false;
*ret = false;
return Status::CgroupError("open cgroup.controllers failed");
}
std::string controllers;
std::getline(controllers_file, controllers);
return controllers.find("memory") != std::string::npos;
*ret = controllers.find("memory") != std::string::npos;
return Status::OK();
#else
return false;
*ret = false;
return Status::CgroupError("cgroupsv2 only support Linux");
#endif
}

struct CgroupsV1Reader : CGroupMemoryCtl::ICgroupsReader {
explicit CgroupsV1Reader(std::filesystem::path mount_file_dir)
: _mount_file_dir(std::move(mount_file_dir)) {}

uint64_t read_memory_limit() override {
int64_t value;
auto st = CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.limit_in_bytes"), &value);
if (!st.ok()) {
throw doris::Exception(doris::ErrorCode::CGROUP_ERROR,
"Cannot read cgroupv1 memory.limit_in_bytes, " + st.to_string());
}
return value;
Status read_memory_limit(uint64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.limit_in_bytes"), (int64_t*)value));
return Status::OK();
}

uint64_t read_memory_usage() override {
Status read_memory_usage(uint64_t* value) override {
std::unordered_map<std::string, int64_t> metrics_map;
CGroupUtil::read_int_metric_from_cgroup_file((_mount_file_dir / "memory.stat"),
metrics_map);
return metrics_map["rss"];
*value = metrics_map["rss"];
return Status::OK();
}

private:
Expand All @@ -85,43 +83,36 @@ struct CgroupsV2Reader : CGroupMemoryCtl::ICgroupsReader {
explicit CgroupsV2Reader(std::filesystem::path mount_file_dir)
: _mount_file_dir(std::move(mount_file_dir)) {}

uint64_t read_memory_limit() override {
int64_t value;
auto st = CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
&value);
if (!st.ok()) {
throw doris::Exception(doris::ErrorCode::CGROUP_ERROR,
"Cannot read cgroupv2 memory.max, " + st.to_string());
}
return value;
Status read_memory_limit(uint64_t* value) override {
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.max"),
(int64_t*)value));
return Status::OK();
}

uint64_t read_memory_usage() override {
int64_t mem_usage = 0;
Status read_memory_usage(uint64_t* value) override {
// memory.current contains a single number
// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
auto st = CGroupUtil::read_int_line_from_cgroup_file((_mount_file_dir / "memory.current"),
&mem_usage);
if (!st.ok()) {
throw doris::Exception(doris::ErrorCode::CGROUP_ERROR,
"Cannot read cgroupv2 memory.current, " + st.to_string());
}
RETURN_IF_ERROR(CGroupUtil::read_int_line_from_cgroup_file(
(_mount_file_dir / "memory.current"), (int64_t*)value));
std::unordered_map<std::string, int64_t> metrics_map;
CGroupUtil::read_int_metric_from_cgroup_file((_mount_file_dir / "memory.stat"),
metrics_map);
mem_usage -= metrics_map["inactive_file"];
if (mem_usage < 0) {
throw doris::Exception(doris::ErrorCode::CGROUP_ERROR, "Negative memory usage");
*value -= metrics_map["inactive_file"];
if (*value < 0) {
return Status::CgroupError("CgroupsV2Reader read_memory_usage negative memory usage");
}
return mem_usage;
return Status::OK();
}

private:
std::filesystem::path _mount_file_dir;
};

std::pair<std::string, CGroupUtil::CgroupsVersion> get_cgroups_path() {
if (CGroupUtil::cgroupsv2_enable() && cgroupsv2_memory_controller_enabled()) {
bool enable_controller;
auto cgroupsv2_memory_controller_st = cgroupsv2_memory_controller_enabled(&enable_controller);
if (CGroupUtil::cgroupsv2_enable() && cgroupsv2_memory_controller_st.ok() &&
enable_controller) {
auto v2_memory_stat_path = CGroupUtil::get_cgroupsv2_path("memory.stat");
auto v2_memory_current_path = CGroupUtil::get_cgroupsv2_path("memory.current");
auto v2_memory_max_path = CGroupUtil::get_cgroupsv2_path("memory.max");
Expand All @@ -138,65 +129,68 @@ std::pair<std::string, CGroupUtil::CgroupsVersion> get_cgroups_path() {
return {cgroup_path, CGroupUtil::CgroupsVersion::V1};
}

throw doris::Exception(
doris::ErrorCode::CGROUP_ERROR,
fmt::format("Cannot find cgroups v1 or v2 current memory file, cgroupsv2_enable: {}, "
"cgroupsv2_memory_controller_enabled: {}, cgroupsv1_enable: {}",
CGroupUtil::cgroupsv2_enable(), cgroupsv2_memory_controller_enabled(),
CGroupUtil::cgroupsv1_enable()));
return {"", CGroupUtil::CgroupsVersion::V1};
}

std::shared_ptr<CGroupMemoryCtl::ICgroupsReader> get_cgroups_reader() {
Status get_cgroups_reader(std::shared_ptr<CGroupMemoryCtl::ICgroupsReader>& reader) {
const auto [cgroup_path, version] = get_cgroups_path();
if (cgroup_path.empty()) {
bool enable_controller;
auto st = cgroupsv2_memory_controller_enabled(&enable_controller);
return Status::CgroupError(
"Cannot find cgroups v1 or v2 current memory file, cgroupsv2_enable: {},{}, "
"cgroupsv2_memory_controller_enabled: {}, cgroupsv1_enable: {}",
CGroupUtil::cgroupsv2_enable(), enable_controller, st.to_string(),
CGroupUtil::cgroupsv1_enable());
}

if (version == CGroupUtil::CgroupsVersion::V2) {
return std::make_shared<CgroupsV2Reader>(cgroup_path);
reader = std::make_shared<CgroupsV2Reader>(cgroup_path);
} else {
return std::make_shared<CgroupsV1Reader>(cgroup_path);
reader = std::make_shared<CgroupsV1Reader>(cgroup_path);
}
return Status::OK();
}

Status CGroupMemoryCtl::find_cgroup_mem_limit(int64_t* bytes) {
try {
*bytes = get_cgroups_reader()->read_memory_limit();
return Status::OK();
} catch (const doris::Exception& e) {
LOG(WARNING) << "Cgroup find_cgroup_mem_limit failed, " << e.to_string();
return Status::CgroupError(e.to_string());
}
Status CGroupMemoryCtl::find_cgroup_mem_limit(uint64_t* bytes) {
std::shared_ptr<CGroupMemoryCtl::ICgroupsReader> reader;
RETURN_IF_ERROR(get_cgroups_reader(reader));
RETURN_IF_ERROR(reader->read_memory_limit(bytes));
return Status::OK();
}

Status CGroupMemoryCtl::find_cgroup_mem_usage(int64_t* bytes) {
try {
*bytes = get_cgroups_reader()->read_memory_usage();
return Status::OK();
} catch (const doris::Exception& e) {
LOG(WARNING) << "Cgroup find_cgroup_mem_usage failed, " << e.to_string();
return Status::CgroupError(e.to_string());
}
Status CGroupMemoryCtl::find_cgroup_mem_usage(uint64_t* bytes) {
std::shared_ptr<CGroupMemoryCtl::ICgroupsReader> reader;
RETURN_IF_ERROR(get_cgroups_reader(reader));
RETURN_IF_ERROR(reader->read_memory_usage(bytes));
return Status::OK();
}

std::string CGroupMemoryCtl::debug_string() {
try {
const auto [cgroup_path, version] = get_cgroups_path();
const auto [cgroup_path, version] = get_cgroups_path();
if (cgroup_path.empty()) {
bool enable_controller;
auto st = cgroupsv2_memory_controller_enabled(&enable_controller);
return fmt::format(
"Cannot find cgroups v1 or v2 current memory file, cgroupsv2_enable: {},{}, "
"cgroupsv2_memory_controller_enabled: {}, cgroupsv1_enable: {}",
CGroupUtil::cgroupsv2_enable(), enable_controller, st.to_string(),
CGroupUtil::cgroupsv1_enable());
}

int64_t mem_limit;
auto mem_limit_st = find_cgroup_mem_limit(&mem_limit);
uint64_t mem_limit;
auto mem_limit_st = find_cgroup_mem_limit(&mem_limit);

int64_t mem_usage;
auto mem_usage_st = find_cgroup_mem_usage(&mem_usage);
uint64_t mem_usage;
auto mem_usage_st = find_cgroup_mem_usage(&mem_usage);

return fmt::format(
"Process CGroup Memory Info (cgroups path: {}, cgroup version: {}): memory limit: "
"{}, "
"memory usage: {}",
cgroup_path, (version == CGroupUtil::CgroupsVersion::V1) ? "v1" : "v2",
mem_limit_st.ok() ? std::to_string(mem_limit) : mem_limit_st.to_string(),
mem_usage_st.ok() ? std::to_string(mem_usage) : mem_usage_st.to_string());
} catch (const doris::Exception& e) {
LOG(WARNING) << "Cgroup find_cgroup_mem_usage failed, " << e.to_string();
return fmt::format("CGroupMemoryCtl::debug_string {}", e.to_string());
}
return fmt::format(
"Process CGroup Memory Info (cgroups path: {}, cgroup version: {}): memory limit: "
"{}, "
"memory usage: {}",
cgroup_path, (version == CGroupUtil::CgroupsVersion::V1) ? "v1" : "v2",
mem_limit_st.ok() ? std::to_string(mem_limit) : mem_limit_st.to_string(),
mem_usage_st.ok() ? std::to_string(mem_usage) : mem_usage_st.to_string());
}

} // namespace doris
8 changes: 4 additions & 4 deletions be/src/common/cgroup_memory_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ class CGroupMemoryCtl {
struct ICgroupsReader {
virtual ~ICgroupsReader() = default;

virtual uint64_t read_memory_limit() = 0;
virtual Status read_memory_limit(uint64_t* value) = 0;

virtual uint64_t read_memory_usage() = 0;
virtual Status read_memory_usage(uint64_t* value) = 0;
};

// Determines the CGroup memory limit from the current processes' cgroup.
// If the limit is more than INT64_MAX, INT64_MAX is returned (since that is
// effectively unlimited anyway). Does not take into account memory limits
// set on any ancestor CGroups.
static Status find_cgroup_mem_limit(int64_t* bytes);
static Status find_cgroup_mem_limit(uint64_t* bytes);

// https://serverfault.com/questions/902009/the-memory-usage-reported-in-cgroup-differs-from-the-free-command
static Status find_cgroup_mem_usage(int64_t* bytes);
static Status find_cgroup_mem_usage(uint64_t* bytes);

// Returns a human-readable string with information about CGroups.
static std::string debug_string();
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ namespace ErrorCode {
TStatusError(HTTP_ERROR, true); \
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(CGROUP_ERROR, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down Expand Up @@ -291,7 +290,8 @@ namespace ErrorCode {
E(KEY_ALREADY_EXISTS, -7001, false); \
E(ENTRY_NOT_FOUND, -7002, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false);
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false);

// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
Expand Down
17 changes: 9 additions & 8 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem = std::numeric_limits<int64_
std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem_limit = std::numeric_limits<int64_t>::max();
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;

int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
uint64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<uint64_t>::max();
uint64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<uint64_t>::min();
bool MemInfo::_s_cgroup_mem_refresh_state = false;
int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;

Expand Down Expand Up @@ -190,16 +190,16 @@ void MemInfo::refresh_proc_meminfo() {

// refresh cgroup memory
if (_s_cgroup_mem_refresh_wait_times >= 0 && config::enable_use_cgroup_memory_info) {
int64_t cgroup_mem_limit = -1;
int64_t cgroup_mem_usage = -1;
uint64_t cgroup_mem_limit = -1;
uint64_t cgroup_mem_usage = -1;
std::string cgroup_mem_info_file_path;
_s_cgroup_mem_refresh_state = true;
Status status = CGroupMemoryCtl::find_cgroup_mem_limit(&cgroup_mem_limit);
if (!status.ok() || cgroup_mem_limit <= 0) {
if (!status.ok()) {
_s_cgroup_mem_refresh_state = false;
}
status = CGroupMemoryCtl::find_cgroup_mem_usage(&cgroup_mem_usage);
if (!status.ok() || cgroup_mem_usage <= 0) {
if (!status.ok()) {
_s_cgroup_mem_refresh_state = false;
}

Expand Down Expand Up @@ -234,7 +234,7 @@ void MemInfo::refresh_proc_meminfo() {
if (physical_mem < 0) {
physical_mem = _s_cgroup_mem_limit;
} else {
physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
physical_mem = std::min(physical_mem, (int64_t)_s_cgroup_mem_limit);
}
}

Expand Down Expand Up @@ -280,7 +280,8 @@ void MemInfo::refresh_proc_meminfo() {
if (mem_available < 0) {
mem_available = _s_cgroup_mem_limit - _s_cgroup_mem_usage;
} else {
mem_available = std::min(mem_available, _s_cgroup_mem_limit - _s_cgroup_mem_usage);
mem_available =
std::min(mem_available, (int64_t)(_s_cgroup_mem_limit - _s_cgroup_mem_usage));
}
}
if (mem_available < 0) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/util/mem_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ class MemInfo {
static std::atomic<int64_t> _s_je_dirty_pages_mem_limit;
static std::atomic<int64_t> _s_virtual_memory_used;

static int64_t _s_cgroup_mem_limit;
static int64_t _s_cgroup_mem_usage;
static uint64_t _s_cgroup_mem_limit;
static uint64_t _s_cgroup_mem_usage;
static bool _s_cgroup_mem_refresh_state;
static int64_t _s_cgroup_mem_refresh_wait_times;

Expand Down
17 changes: 15 additions & 2 deletions be/test/util/cgroup_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,25 @@ TEST_F(CGroupUtilTest, ReadMetrics) {
CGroupUtil::read_int_metric_from_cgroup_file(memory_stat_path, metrics_map);
EXPECT_EQ(5443584, metrics_map["inactive_file"]);
EXPECT_EQ(0, metrics_map["rss"]);

std::string error_memory_limit_in_bytes_path(dir_path);
error_memory_limit_in_bytes_path += "/util/test_data/Zzz/memory.limit_in_bytes";
int64_t error_value;
auto st2 = CGroupUtil::read_int_line_from_cgroup_file(error_memory_limit_in_bytes_path,
&error_value);
EXPECT_FALSE(st2.ok());

std::string error_memory_stat_path(dir_path);
error_memory_stat_path += "/util/test_data/Zzz/memory.stat";
std::unordered_map<std::string, int64_t> error_metrics_map;
CGroupUtil::read_int_metric_from_cgroup_file(error_memory_stat_path, error_metrics_map);
EXPECT_TRUE(error_metrics_map.empty());
}

TEST_F(CGroupUtilTest, memlimit) {
LOG(INFO) << CGroupMemoryCtl::debug_string();
int64_t mem_limit;
int64_t mem_usage;
uint64_t mem_limit;
uint64_t mem_usage;
auto status1 = CGroupMemoryCtl::find_cgroup_mem_limit(&mem_limit);
auto status2 = CGroupMemoryCtl::find_cgroup_mem_usage(&mem_usage);
if (CGroupUtil::cgroupsv1_enable() || CGroupUtil::cgroupsv2_enable()) {
Expand Down

0 comments on commit 00fc066

Please sign in to comment.