Skip to content

Commit

Permalink
Merge branch 'master' into gavin-opt-s3-file-writer-log
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinchou authored Nov 14, 2024
2 parents 4bf585e + 02e3de3 commit 654b408
Show file tree
Hide file tree
Showing 53 changed files with 1,332 additions and 184 deletions.
6 changes: 3 additions & 3 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
}

std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
if (_is_enable_cgroup_v2_in_env) {
return std::make_unique<CgroupV2CpuCtl>(wg_id);
return std::make_shared<CgroupV2CpuCtl>(wg_id);
} else if (_is_enable_cgroup_v1_in_env) {
return std::make_unique<CgroupV1CpuCtl>(wg_id);
return std::make_shared<CgroupV1CpuCtl>(wg_id);
}
return nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CgroupCpuCtl {

static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

Expand Down
6 changes: 2 additions & 4 deletions be/src/agent/topic_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first;
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
workload_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
wg->upsert_task_scheduler(&workload_group_info, _exec_env);
wg->upsert_task_scheduler(&workload_group_info);

// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);
Expand Down
31 changes: 22 additions & 9 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id) {
});
}

Status CloudStorageEngine::start_bg_threads() {
Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "refresh_s3_info_thread",
[this]() { this->_refresh_storage_vault_info_thread_callback(); },
Expand Down Expand Up @@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() {
// compaction tasks producer thread
int base_thread_num = get_base_thread_num();
int cumu_thread_num = get_cumu_thread_num();
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.build(&_cumu_compaction_thread_pool));
if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cumu_compaction_thread_pool));
} else {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_thread_num)
.set_max_threads(base_thread_num)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_thread_num)
.set_max_threads(cumu_thread_num)
.build(&_cumu_compaction_thread_pool));
}
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
[this]() { this->_compaction_tasks_producer_callback(); },
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CloudStorageEngine final : public BaseStorageEngine {

Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) override;

Status start_bg_threads() override;
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;

Status set_cluster_id(int32_t cluster_id) override {
_effective_cluster_id = cluster_id;
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace ErrorCode {
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(OBTAIN_LOCK_FAILED, false); \
TStatusError(SNAPSHOT_EXPIRED, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down
77 changes: 54 additions & 23 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
return threads_num;
}

Status StorageEngine::start_bg_threads() {
Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
[this]() { this->_unused_rowset_monitor_thread_callback(); },
Expand Down Expand Up @@ -243,29 +243,60 @@ Status StorageEngine::start_bg_threads() {
auto single_replica_compaction_threads =
get_single_replica_compaction_threads_num(data_dirs.size());

RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool));
if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_seg_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
.build(&_cold_data_compaction_thread_pool));
} else {
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));

if (config::enable_segcompaction) {
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::segcompaction_num_threads)
.set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.build(&_cold_data_compaction_thread_pool));
}
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
.set_min_threads(config::cold_data_compaction_thread_num)
.set_max_threads(config::cold_data_compaction_thread_num)
.build(&_cold_data_compaction_thread_pool));

// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ReportWorker;
class CreateTabletRRIdxCache;
struct DirInfo;
class SnapshotManager;
class WorkloadGroup;

using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
Expand Down Expand Up @@ -105,7 +106,7 @@ class BaseStorageEngine {
virtual bool stopped() = 0;

// start all background threads. This should be call after env is ready.
virtual Status start_bg_threads() = 0;
virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) = 0;

virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) = 0;

Expand Down Expand Up @@ -278,7 +279,7 @@ class StorageEngine final : public BaseStorageEngine {
return _default_rowset_type;
}

Status start_bg_threads() override;
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;

// clear trash and snapshot file
// option: update disk usage after sweep
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace doris::pipeline {

class TaskScheduler {
public:
TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl)
TaskScheduler(int core_num, std::string name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: _task_queue(core_num),
_shutdown(false),
_name(std::move(name)),
Expand All @@ -65,7 +65,7 @@ class TaskScheduler {
std::vector<bool> _markers;
bool _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;

void _do_work(int index);
};
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
_workload_group_manager->init_internal_workload_group();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
Expand Down Expand Up @@ -364,7 +365,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
return st;
}
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
if (st = _storage_engine->start_bg_threads(); !st.ok()) {
WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st;
return st;
}
Expand Down
Loading

0 comments on commit 654b408

Please sign in to comment.