diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index 6fa234e6ee2d6c..a494681d082a5a 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -18,6 +18,9 @@ #include "agent/cgroup_cpu_ctl.h" #include +#include + +#include namespace doris { @@ -100,11 +103,34 @@ Status CgroupV1CpuCtl::init() { int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU); if (ret != 0) { LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path; - return Status::InternalError("cgroup v1 mkdir query failed, path=", + return Status::InternalError("cgroup v1 mkdir query failed, path={}", _cgroup_v1_cpu_query_path); } } + // check whether current user specified path is a valid cgroup path + std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks"; + std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + "/cpu.shares"; + std::string query_path_quota = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us"; + if (access(query_path_tasks.c_str(), F_OK) != 0) { + return Status::InternalError("invalid cgroup path, not find task file"); + } + if (access(query_path_cpu_shares.c_str(), F_OK) != 0) { + return Status::InternalError("invalid cgroup path, not find cpu share file"); + } + if (access(query_path_quota.c_str(), F_OK) != 0) { + return Status::InternalError("invalid cgroup path, not find cpu quota file"); + } + + if (_tg_id == -1) { + // means current cgroup cpu ctl is just used to clear dir, + // it does not contains task group. + // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl + _init_succ = true; + LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path; + return Status::OK(); + } + // workload group path _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id); if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { @@ -157,4 +183,52 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() { return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true); #endif } + +Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set& used_wg_ids) { + if (!_init_succ) { + return Status::InternalError( + "cgroup cpu ctl init failed, delete can not be executed"); + } + // 1 get unused wg id + std::set unused_wg_ids; + for (const auto& entry : std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) { + const std::string dir_name = entry.path().string(); + struct stat st; + // == 0 means exists + if (stat(dir_name.c_str(), &st) == 0 && (st.st_mode & S_IFDIR)) { + int pos = dir_name.rfind("/"); + std::string wg_dir_name = dir_name.substr(pos + 1, dir_name.length()); + if (wg_dir_name.empty()) { + return Status::InternalError("find an empty workload group path, path={}", + dir_name); + } + if (std::all_of(wg_dir_name.begin(), wg_dir_name.end(), ::isdigit)) { + uint64_t id_in_path = std::stoll(wg_dir_name); + if (used_wg_ids.find(id_in_path) == used_wg_ids.end()) { + unused_wg_ids.insert(wg_dir_name); + } + } + } + } + + // 2 delete unused cgroup path + int failed_count = 0; + std::string query_path = _cgroup_v1_cpu_query_path.back() != '/' + ? _cgroup_v1_cpu_query_path + "/" + : _cgroup_v1_cpu_query_path; + for (const std::string& unused_wg_id : unused_wg_ids) { + std::string wg_path = query_path + unused_wg_id; + int ret = rmdir(wg_path.c_str()); + if (ret < 0) { + LOG(WARNING) << "rmdir failed, path=" << wg_path; + failed_count++; + } + } + if (failed_count != 0) { + return Status::InternalError("error happens when delete unused path, count={}", + failed_count); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 2a7cdc5719ba4c..94514c8e2e0b39 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -37,6 +37,7 @@ const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; class CgroupCpuCtl { public: virtual ~CgroupCpuCtl() = default; + CgroupCpuCtl() = default; CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; } virtual Status init(); @@ -50,6 +51,8 @@ class CgroupCpuCtl { // for log void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit); + virtual Status delete_unused_cgroup_path(std::set& used_wg_ids) = 0; + protected: Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append); @@ -63,7 +66,7 @@ class CgroupCpuCtl { int _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; - uint64_t _tg_id; // workload group id + uint64_t _tg_id = -1; // workload group id uint64_t _cpu_shares = 0; }; @@ -96,11 +99,14 @@ class CgroupCpuCtl { class CgroupV1CpuCtl : public CgroupCpuCtl { public: CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {} + CgroupV1CpuCtl() = default; Status init() override; Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override; Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override; Status add_thread_to_cgroup() override; + Status delete_unused_cgroup_path(std::set& used_wg_ids) override; + private: std::string _cgroup_v1_cpu_query_path; std::string _cgroup_v1_cpu_tg_path; // workload group path diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 6d7dfb9a3a01d4..f2770e8e7c4991 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -57,7 +57,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi << ", reason=" << ret2.to_string(); } - LOG(INFO) << "update task group success, tg info=" << tg->debug_string() + LOG(INFO) << "update task group finish, tg info=" << tg->debug_string() << ", enable_cpu_hard_limit=" << (_exec_env->task_group_manager()->enable_cpu_hard_limit() ? "true" : "false") << ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index da6294045f818c..98043c6395ac8b 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -68,7 +68,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { } bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) { - std::lock_guard lock(_task_scheduler_lock); + std::lock_guard write_lock(_task_scheduler_lock); if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) { query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get()); } else { @@ -91,7 +91,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i uint64_t cpu_shares = tg_info->cpu_share; bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; - std::lock_guard lock(_task_scheduler_lock); + std::lock_guard write_lock(_task_scheduler_lock); // step 1: init cgroup cpu controller CgroupCpuCtl* cg_cu_ctl_ptr = nullptr; if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) { @@ -101,7 +101,8 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i cg_cu_ctl_ptr = cgroup_cpu_ctl.get(); _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl)); } else { - return Status::InternalError("cgroup init failed, gid={}", tg_id); + return Status::InternalError("cgroup init failed, gid={}, reason={}", tg_id, + ret.to_string()); } } @@ -157,54 +158,83 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i return Status::OK(); } -void TaskGroupManager::delete_task_group_by_ids(std::set id_set) { +void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { + // stop task sche may cost some time, so it should not be locked + std::set task_sche_to_del; + std::set scan_task_sche_to_del; + std::set deleted_tg_ids; { - std::lock_guard w_lock(_group_mutex); - for (auto iter = _task_groups.begin(); iter != _task_groups.end();) { + std::shared_lock read_lock(_task_scheduler_lock); + for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); iter++) { uint64_t tg_id = iter->first; - if (id_set.find(tg_id) == id_set.end()) { - iter = _task_groups.erase(iter); - } else { - iter++; + if (used_wg_id.find(tg_id) == used_wg_id.end()) { + task_sche_to_del.insert(_tg_sche_map[tg_id].get()); + deleted_tg_ids.insert(tg_id); } } - } - // stop task sche may cost some time, so it should not be locked - // task scheduler is stoped in task scheduler's destructor - std::set> task_sche_to_del; - std::set> scan_task_sche_to_del; - { - std::lock_guard lock(_task_scheduler_lock); - for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) { + for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end(); iter++) { uint64_t tg_id = iter->first; - if (id_set.find(tg_id) == id_set.end()) { - task_sche_to_del.insert(std::move(_tg_sche_map[tg_id])); - iter = _tg_sche_map.erase(iter); - } else { - iter++; + if (used_wg_id.find(tg_id) == used_wg_id.end()) { + scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get()); } } + } + // 1 stop all threads + for (auto* ptr1 : task_sche_to_del) { + ptr1->stop(); + } + for (auto* ptr2 : scan_task_sche_to_del) { + ptr2->stop(); + } + // 2 release resource in memory + { + std::lock_guard write_lock(_task_scheduler_lock); + for (uint64_t tg_id : deleted_tg_ids) { + _tg_sche_map.erase(tg_id); + _tg_scan_sche_map.erase(tg_id); + _cgroup_ctl_map.erase(tg_id); + } + } - for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end();) { + { + std::lock_guard write_lock(_group_mutex); + for (auto iter = _task_groups.begin(); iter != _task_groups.end();) { uint64_t tg_id = iter->first; - if (id_set.find(tg_id) == id_set.end()) { - scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id])); - iter = _tg_scan_sche_map.erase(iter); + if (used_wg_id.find(tg_id) == used_wg_id.end()) { + iter = _task_groups.erase(iter); } else { iter++; } } + } - for (auto iter = _cgroup_ctl_map.begin(); iter != _cgroup_ctl_map.end();) { - uint64_t tg_id = iter->first; - if (id_set.find(tg_id) == id_set.end()) { - iter = _cgroup_ctl_map.erase(iter); + // 3 clear cgroup dir + // NOTE(wb) currently we use rmdir to delete cgroup path, + // this action may be failed until task file is cleared which means all thread are stopped. + // So the first time to rmdir a cgroup path may failed. + // Using cgdelete has no such issue. + { + std::lock_guard write_lock(_init_cg_ctl_lock); + if (!_cg_cpu_ctl) { + _cg_cpu_ctl = std::make_unique(); + } + if (!_is_init_succ) { + Status ret = _cg_cpu_ctl->init(); + if (ret.ok()) { + _is_init_succ = true; } else { - iter++; + LOG(INFO) << "init task group mgr cpu ctl failed, " << ret.to_string(); + } + } + if (_is_init_succ) { + Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id); + if (!ret.ok()) { + LOG(WARNING) << ret.to_string(); } } } + LOG(INFO) << "finish clear unused cgroup path"; } void TaskGroupManager::stop() { diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 91156237f40a22..08968b6fe999f4 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -77,10 +77,14 @@ class TaskGroupManager { // map for workload group id and task scheduler pool // used for cpu hard limit - std::mutex _task_scheduler_lock; + std::shared_mutex _task_scheduler_lock; std::map> _tg_sche_map; std::map> _tg_scan_sche_map; std::map> _cgroup_ctl_map; + + std::shared_mutex _init_cg_ctl_lock; + std::unique_ptr _cg_cpu_ctl; + bool _is_init_succ = false; }; } // namespace taskgroup