diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 0d1cb362ea00bde..bf2a3945e4b419d 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -309,7 +309,7 @@ Status OlapScanLocalState::_init_scanners(std::list* s int max_scanners_count = state()->parallel_scan_max_scanners_count(); - // If the `max_scanners_count` was not set, + // If the `max_scanners_count` is invalid, // use `config::doris_scanner_thread_pool_thread_num` as the default value. if (max_scanners_count <= 0) { max_scanners_count = config::doris_scanner_thread_pool_thread_num; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index be940e8c89c3ae6..dea8281ebaaeaf9 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -31,6 +31,7 @@ #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/operator.h" #include "runtime/types.h" +#include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "vec/exec/scan/scanner_context.h" #include "vec/exprs/vcast_expr.h" @@ -1278,6 +1279,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: return Status::OK(); } + DorisMetrics::instance()->scan_operator_get_block_from_queue->increment(1); RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, eos, 0)); local_state.reached_limit(block, eos); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e9d4f31e5ca137b..e2df6025ebcfe2c 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -31,7 +31,9 @@ #include #include "common/status.h" +#include "doris_metrics.h" #include "io/fs/local_file_system.h" +#include "util/metrics.h" #include "util/system_metrics.h" namespace doris { @@ -53,6 +55,11 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_duration_us, MetricUnit::MICRO DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_bytes, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_rows, MetricUnit::ROWS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_create_free_block, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_append_block_to_queue, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_get_null_free_block, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scan_operator_get_block_from_queue, MetricUnit::NOUNIT); + #define DEFINE_ENGINE_COUNTER_METRIC(name, type, status) \ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(name, MetricUnit::REQUESTS, "", engine_requests_total, \ Labels({{"type", #type}, {"status", #status}})); @@ -191,8 +198,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; @@ -319,9 +324,11 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_create_free_block); + INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_append_block_to_queue); + INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_get_null_free_block); + INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scan_operator_get_block_from_queue); } void DorisMetrics::initialize(bool init_system_metrics, const std::set& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 3006461059c106e..72e6703bff01f16 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -238,9 +238,12 @@ class DorisMetrics { IntAtomicCounter* scanner_ctx_cnt = nullptr; IntAtomicCounter* scanner_cnt = nullptr; IntAtomicCounter* scanner_task_cnt = nullptr; - IntAtomicCounter* scanner_task_queued = nullptr; IntAtomicCounter* scanner_task_submit_failed = nullptr; - IntAtomicCounter* scanner_task_running = nullptr; + + IntAtomicCounter* scanner_create_free_block = nullptr; + IntAtomicCounter* scanner_append_block_to_queue = nullptr; + IntAtomicCounter* scanner_get_null_free_block = nullptr; + IntAtomicCounter* scan_operator_get_block_from_queue = nullptr; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 5ce27e2f27b9a57..1f908c5e60e567e 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -256,6 +256,11 @@ class ThreadPool { return _total_queued_tasks; } + int get_max_queue_size() const { + std::lock_guard l(_lock); + return _max_queue_size; + } + std::vector debug_info() { std::lock_guard l(_lock); std::vector arr = {_num_threads, static_cast(_threads.size()), _min_threads, diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index ee1d60d29024244..bcc5c8ee800f65b 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -33,6 +33,7 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "util/doris_metrics.h" #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/exec/scan/scanner_scheduler.h" @@ -241,6 +242,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { _newly_create_free_blocks_num->update(1); block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0, true /*ignore invalid slots*/); + DorisMetrics::instance()->scanner_create_free_block->increment(1); } return block; } @@ -280,6 +282,7 @@ void ScannerContext::append_block_to_queue(std::shared_ptr scan_task) } } } + DorisMetrics::instance()->scanner_append_block_to_queue->increment(1); std::lock_guard l(_transfer_lock); if (!scan_task->status_ok()) { _process_status = scan_task->get_status(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 3ad4e758e799809..f3e92619b726aba 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -142,11 +142,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, scanner_delegate->_scanner->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -172,11 +167,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, auto sumbit_task = [&]() { SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler(); auto work_func = [scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -283,6 +273,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { + DorisMetrics::instance()->scanner_get_null_free_block->increment(1); break; } // We got a new created block or a reused block. @@ -385,4 +376,38 @@ int ScannerScheduler::get_remote_scan_thread_queue_size() { return config::doris_remote_scanner_thread_pool_queue_size; } +// The name of these varialbs will be useds as metric name in prometheus. +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_running_tasks, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_capacity, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_max_thread_number, MetricUnit::NOUNIT); + +SimplifiedScanScheduler::SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cg_cpu_ctl) + : _is_stop(false), _cgroup_cpu_ctl(cg_cpu_ctl), _sched_name(sched_name) { + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + fmt::format("scan_scheduler_{}", sched_name), {{"wg_name", sched_name}}); + INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_running_tasks); + INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_queue_capacity); + INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_max_thread_number); +} + +Status SimplifiedScanScheduler::start(int max_thread_num, int min_thread_num, int queue_size) { + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) + .set_min_threads(min_thread_num) + .set_max_threads(max_thread_num) + .set_max_queue_size(queue_size) + .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) + .build(&_scan_thread_pool)); + + _metric_entity->register_hook("update", [this]() { + this->scan_scheduler_running_tasks->set_value(_scan_thread_pool->num_active_threads()); + this->scan_scheduler_queue_size->set_value(_scan_thread_pool->get_queue_size()); + this->scan_scheduler_queue_capacity->set_value(_scan_thread_pool->get_max_queue_size()); + this->scan_scheduler_max_thread_number->set_value(_scan_thread_pool->max_threads()); + }); + + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 56c49368598adc1..30bea375df33210 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "util/doris_metrics.h" +#include "util/metrics.h" #include "util/threadpool.h" #include "vec/exec/scan/vscanner.h" @@ -114,11 +115,7 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) { - _is_stop.store(false); - _cgroup_cpu_ctl = cgroup_cpu_ctl; - _sched_name = sched_name; - } + SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl); ~SimplifiedScanScheduler() { stop(); @@ -131,22 +128,12 @@ class SimplifiedScanScheduler { _scan_thread_pool->wait(); } - Status start(int max_thread_num, int min_thread_num, int queue_size) { - RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) - .set_min_threads(min_thread_num) - .set_max_threads(max_thread_num) - .set_max_queue_size(queue_size) - .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) - .build(&_scan_thread_pool)); - return Status::OK(); - } + Status start(int max_thread_num, int min_thread_num, int queue_size); Status submit_scan_task(SimplifiedScanTask scan_task) { if (!_is_stop) { - DorisMetrics::instance()->scanner_task_queued->increment(1); auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); if (!st.ok()) { - DorisMetrics::instance()->scanner_task_queued->increment(-1); DorisMetrics::instance()->scanner_task_submit_failed->increment(1); } return st; @@ -219,6 +206,13 @@ class SimplifiedScanScheduler { std::atomic _is_stop; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; std::string _sched_name; + + // Metrics of this scheduler + std::shared_ptr _metric_entity; + IntGauge* scan_scheduler_running_tasks = nullptr; + IntGauge* scan_scheduler_queue_size = nullptr; + IntGauge* scan_scheduler_queue_capacity = nullptr; + IntGauge* scan_scheduler_max_thread_number = nullptr; }; } // namespace doris::vectorized