Skip to content

Commit

Permalink
[improvement](executor)clear unused cgroup path (apache#27798)
Browse files Browse the repository at this point in the history
* clear unused cgroup path

* use C++ api

* add gcc header
  • Loading branch information
wangbo authored Dec 5, 2023
1 parent c98b80a commit 3595f21
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 36 deletions.
76 changes: 75 additions & 1 deletion be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "agent/cgroup_cpu_ctl.h"

#include <fmt/format.h>
#include <sys/stat.h>

#include <filesystem>

namespace doris {

Expand Down Expand Up @@ -100,11 +103,34 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError<false>("cgroup v1 mkdir query failed, path=",
return Status::InternalError<false>("cgroup v1 mkdir query failed, path={}",
_cgroup_v1_cpu_query_path);
}
}

// check whether current user specified path is a valid cgroup path
std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks";
std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + "/cpu.shares";
std::string query_path_quota = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us";
if (access(query_path_tasks.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find task file");
}
if (access(query_path_cpu_shares.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu share file");
}
if (access(query_path_quota.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu quota file");
}

if (_tg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains task group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
_init_succ = true;
LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path;
return Status::OK();
}

// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
Expand Down Expand Up @@ -157,4 +183,52 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
#endif
}

Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) {
if (!_init_succ) {
return Status::InternalError<false>(
"cgroup cpu ctl init failed, delete can not be executed");
}
// 1 get unused wg id
std::set<std::string> unused_wg_ids;
for (const auto& entry : std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) {
const std::string dir_name = entry.path().string();
struct stat st;
// == 0 means exists
if (stat(dir_name.c_str(), &st) == 0 && (st.st_mode & S_IFDIR)) {
int pos = dir_name.rfind("/");
std::string wg_dir_name = dir_name.substr(pos + 1, dir_name.length());
if (wg_dir_name.empty()) {
return Status::InternalError<false>("find an empty workload group path, path={}",
dir_name);
}
if (std::all_of(wg_dir_name.begin(), wg_dir_name.end(), ::isdigit)) {
uint64_t id_in_path = std::stoll(wg_dir_name);
if (used_wg_ids.find(id_in_path) == used_wg_ids.end()) {
unused_wg_ids.insert(wg_dir_name);
}
}
}
}

// 2 delete unused cgroup path
int failed_count = 0;
std::string query_path = _cgroup_v1_cpu_query_path.back() != '/'
? _cgroup_v1_cpu_query_path + "/"
: _cgroup_v1_cpu_query_path;
for (const std::string& unused_wg_id : unused_wg_ids) {
std::string wg_path = query_path + unused_wg_id;
int ret = rmdir(wg_path.c_str());
if (ret < 0) {
LOG(WARNING) << "rmdir failed, path=" << wg_path;
failed_count++;
}
}
if (failed_count != 0) {
return Status::InternalError<false>("error happens when delete unused path, count={}",
failed_count);
}
return Status::OK();
}

} // namespace doris
8 changes: 7 additions & 1 deletion be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }

virtual Status init();
Expand All @@ -50,6 +51,8 @@ class CgroupCpuCtl {
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);

virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;

protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);

Expand All @@ -63,7 +66,7 @@ class CgroupCpuCtl {
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id; // workload group id
uint64_t _tg_id = -1; // workload group id
uint64_t _cpu_shares = 0;
};

Expand Down Expand Up @@ -96,11 +99,14 @@ class CgroupCpuCtl {
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;

private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
<< ", reason=" << ret2.to_string();
}

LOG(INFO) << "update task group success, tg info=" << tg->debug_string()
LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
<< (_exec_env->task_group_manager()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares
Expand Down
94 changes: 62 additions & 32 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
}

bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) {
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
} else {
Expand All @@ -91,7 +91,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
uint64_t cpu_shares = tg_info->cpu_share;
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;

std::lock_guard<std::mutex> lock(_task_scheduler_lock);
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
Expand All @@ -101,7 +101,8 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
} else {
return Status::InternalError<false>("cgroup init failed, gid={}", tg_id);
return Status::InternalError<false>("cgroup init failed, gid={}, reason={}", tg_id,
ret.to_string());
}
}

Expand Down Expand Up @@ -157,54 +158,83 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
return Status::OK();
}

void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
// stop task sche may cost some time, so it should not be locked
std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
std::set<uint64_t> deleted_tg_ids;
{
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); iter++) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
iter = _task_groups.erase(iter);
} else {
iter++;
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
task_sche_to_del.insert(_tg_sche_map[tg_id].get());
deleted_tg_ids.insert(tg_id);
}
}
}

// stop task sche may cost some time, so it should not be locked
// task scheduler is stoped in task scheduler's destructor
std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>> scan_task_sche_to_del;
{
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end(); iter++) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
iter = _tg_sche_map.erase(iter);
} else {
iter++;
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
}
}
}
// 1 stop all threads
for (auto* ptr1 : task_sche_to_del) {
ptr1->stop();
}
for (auto* ptr2 : scan_task_sche_to_del) {
ptr2->stop();
}
// 2 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
for (uint64_t tg_id : deleted_tg_ids) {
_tg_sche_map.erase(tg_id);
_tg_scan_sche_map.erase(tg_id);
_cgroup_ctl_map.erase(tg_id);
}
}

for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end();) {
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
iter = _tg_scan_sche_map.erase(iter);
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
iter = _task_groups.erase(iter);
} else {
iter++;
}
}
}

for (auto iter = _cgroup_ctl_map.begin(); iter != _cgroup_ctl_map.end();) {
uint64_t tg_id = iter->first;
if (id_set.find(tg_id) == id_set.end()) {
iter = _cgroup_ctl_map.erase(iter);
// 3 clear cgroup dir
// NOTE(wb) currently we use rmdir to delete cgroup path,
// this action may be failed until task file is cleared which means all thread are stopped.
// So the first time to rmdir a cgroup path may failed.
// Using cgdelete has no such issue.
{
std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
if (!_cg_cpu_ctl) {
_cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
}
if (!_is_init_succ) {
Status ret = _cg_cpu_ctl->init();
if (ret.ok()) {
_is_init_succ = true;
} else {
iter++;
LOG(INFO) << "init task group mgr cpu ctl failed, " << ret.to_string();
}
}
if (_is_init_succ) {
Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << ret.to_string();
}
}
}
LOG(INFO) << "finish clear unused cgroup path";
}

void TaskGroupManager::stop() {
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/task_group/task_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ class TaskGroupManager {

// map for workload group id and task scheduler pool
// used for cpu hard limit
std::mutex _task_scheduler_lock;
std::shared_mutex _task_scheduler_lock;
std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map;
std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map;
std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;

std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
bool _is_init_succ = false;
};

} // namespace taskgroup
Expand Down

0 comments on commit 3595f21

Please sign in to comment.