Skip to content

Commit

Permalink
NED
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Nov 2, 2024
1 parent 051128f commit b364157
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 33 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s

int max_scanners_count = state()->parallel_scan_max_scanners_count();

// If the `max_scanners_count` was not set,
// If the `max_scanners_count` is invalid,
// use `config::doris_scanner_thread_pool_thread_num` as the default value.
if (max_scanners_count <= 0) {
max_scanners_count = config::doris_scanner_thread_pool_thread_num;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/operator.h"
#include "runtime/types.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exprs/vcast_expr.h"
Expand Down Expand Up @@ -1278,6 +1279,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
return Status::OK();
}

DorisMetrics::instance()->scan_operator_get_block_from_queue->increment(1);
RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, eos, 0));

local_state.reached_limit(block, eos);
Expand Down
15 changes: 11 additions & 4 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <ostream>

#include "common/status.h"
#include "doris_metrics.h"
#include "io/fs/local_file_system.h"
#include "util/metrics.h"
#include "util/system_metrics.h"

namespace doris {
Expand All @@ -53,6 +55,11 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_duration_us, MetricUnit::MICRO
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_rows, MetricUnit::ROWS);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_create_free_block, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_append_block_to_queue, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_get_null_free_block, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scan_operator_get_block_from_queue, MetricUnit::NOUNIT);

#define DEFINE_ENGINE_COUNTER_METRIC(name, type, status) \
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(name, MetricUnit::REQUESTS, "", engine_requests_total, \
Labels({{"type", #type}, {"status", #status}}));
Expand Down Expand Up @@ -191,8 +198,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
Expand Down Expand Up @@ -319,9 +324,11 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_create_free_block);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_append_block_to_queue);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_get_null_free_block);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scan_operator_get_block_from_queue);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
7 changes: 5 additions & 2 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,12 @@ class DorisMetrics {
IntAtomicCounter* scanner_ctx_cnt = nullptr;
IntAtomicCounter* scanner_cnt = nullptr;
IntAtomicCounter* scanner_task_cnt = nullptr;
IntAtomicCounter* scanner_task_queued = nullptr;
IntAtomicCounter* scanner_task_submit_failed = nullptr;
IntAtomicCounter* scanner_task_running = nullptr;

IntAtomicCounter* scanner_create_free_block = nullptr;
IntAtomicCounter* scanner_append_block_to_queue = nullptr;
IntAtomicCounter* scanner_get_null_free_block = nullptr;
IntAtomicCounter* scan_operator_get_block_from_queue = nullptr;

static DorisMetrics* instance() {
static DorisMetrics instance;
Expand Down
5 changes: 5 additions & 0 deletions be/src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ class ThreadPool {
return _total_queued_tasks;
}

int get_max_queue_size() const {
std::lock_guard<std::mutex> l(_lock);
return _max_queue_size;
}

std::vector<int> debug_info() {
std::lock_guard<std::mutex> l(_lock);
std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
Expand Down Expand Up @@ -241,6 +242,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
DorisMetrics::instance()->scanner_create_free_block->increment(1);
}
return block;
}
Expand Down Expand Up @@ -280,6 +282,7 @@ void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task)
}
}
}
DorisMetrics::instance()->scanner_append_block_to_queue->increment(1);
std::lock_guard<std::mutex> l(_transfer_lock);
if (!scan_task->status_ok()) {
_process_status = scan_task->get_status();
Expand Down
45 changes: 35 additions & 10 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,

scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand All @@ -172,11 +167,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
auto sumbit_task = [&]() {
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
auto work_func = [scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand Down Expand Up @@ -283,6 +273,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
BlockUPtr free_block = ctx->get_free_block(first_read);
if (free_block == nullptr) {
DorisMetrics::instance()->scanner_get_null_free_block->increment(1);
break;
}
// We got a new created block or a reused block.
Expand Down Expand Up @@ -385,4 +376,38 @@ int ScannerScheduler::get_remote_scan_thread_queue_size() {
return config::doris_remote_scanner_thread_pool_queue_size;
}

// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_running_tasks, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_capacity, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_max_thread_number, MetricUnit::NOUNIT);

SimplifiedScanScheduler::SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cg_cpu_ctl)
: _is_stop(false), _cgroup_cpu_ctl(cg_cpu_ctl), _sched_name(sched_name) {
_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("scan_scheduler_{}", sched_name), {{"wg_name", sched_name}});
INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_running_tasks);
INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_queue_capacity);
INT_GAUGE_METRIC_REGISTER(_metric_entity, scan_scheduler_max_thread_number);
}

Status SimplifiedScanScheduler::start(int max_thread_num, int min_thread_num, int queue_size) {
RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.set_max_queue_size(queue_size)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_scan_thread_pool));

_metric_entity->register_hook("update", [this]() {
this->scan_scheduler_running_tasks->set_value(_scan_thread_pool->num_active_threads());
this->scan_scheduler_queue_size->set_value(_scan_thread_pool->get_queue_size());
this->scan_scheduler_queue_capacity->set_value(_scan_thread_pool->get_max_queue_size());
this->scan_scheduler_max_thread_number->set_value(_scan_thread_pool->max_threads());
});

return Status::OK();
}

} // namespace doris::vectorized
26 changes: 10 additions & 16 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "common/status.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"

Expand Down Expand Up @@ -114,11 +115,7 @@ struct SimplifiedScanTask {

class SimplifiedScanScheduler {
public:
SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) {
_is_stop.store(false);
_cgroup_cpu_ctl = cgroup_cpu_ctl;
_sched_name = sched_name;
}
SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl);

~SimplifiedScanScheduler() {
stop();
Expand All @@ -131,22 +128,12 @@ class SimplifiedScanScheduler {
_scan_thread_pool->wait();
}

Status start(int max_thread_num, int min_thread_num, int queue_size) {
RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.set_max_queue_size(queue_size)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_scan_thread_pool));
return Status::OK();
}
Status start(int max_thread_num, int min_thread_num, int queue_size);

Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
DorisMetrics::instance()->scanner_task_queued->increment(1);
auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
if (!st.ok()) {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
}
return st;
Expand Down Expand Up @@ -219,6 +206,13 @@ class SimplifiedScanScheduler {
std::atomic<bool> _is_stop;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
std::string _sched_name;

// Metrics of this scheduler
std::shared_ptr<MetricEntity> _metric_entity;
IntGauge* scan_scheduler_running_tasks = nullptr;
IntGauge* scan_scheduler_queue_size = nullptr;
IntGauge* scan_scheduler_queue_capacity = nullptr;
IntGauge* scan_scheduler_max_thread_number = nullptr;
};

} // namespace doris::vectorized

0 comments on commit b364157

Please sign in to comment.