From d7e5f633d1ee66509d146217650b46becf70de78 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Tue, 24 Dec 2024 15:14:56 +0800 Subject: [PATCH] NED --- be/src/runtime/exec_env.h | 3 - be/src/runtime/exec_env_init.cpp | 17 ---- .../runtime/workload_group/workload_group.cpp | 4 +- be/src/util/doris_metrics.cpp | 6 -- be/src/util/doris_metrics.h | 4 - be/src/util/interval_histogram.cpp | 80 +++++++++++++++++++ be/src/util/interval_histogram.h | 46 +++++++++++ be/src/util/metrics.cpp | 1 + be/src/util/threadpool.cpp | 68 ++++++++++++++-- be/src/util/threadpool.h | 38 ++++++++- be/src/vec/exec/scan/scanner_scheduler.cpp | 10 --- be/src/vec/exec/scan/scanner_scheduler.h | 19 +++-- be/test/olap/rowset/beta_rowset_test.cpp | 2 + be/test/util/interval_histogram_test.cpp | 76 ++++++++++++++++++ .../agg_linear_histogram_test.cpp | 2 +- .../test_metrics_with_workload_group.groovy | 73 +++++++++++++++++ 16 files changed, 388 insertions(+), 61 deletions(-) create mode 100644 be/src/util/interval_histogram.cpp create mode 100644 be/src/util/interval_histogram.h create mode 100644 be/test/util/interval_histogram_test.cpp create mode 100644 regression-test/suites/workload_manager_p0/test_metrics_with_workload_group.groovy diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 636ce2bf288b58a..46112b5ed133753 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -360,9 +360,6 @@ class ExecEnv { Status _init_mem_env(); Status _check_deploy_mode(); - void _register_metrics(); - void _deregister_metrics(); - inline static std::atomic_bool _s_ready {false}; inline static std::atomic_bool _s_tracking_memory {false}; std::vector _store_paths; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 2d7554e702969f6..21aa00e4d04ebf9 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -139,7 +139,6 @@ namespace doris { class PBackendService_Stub; class PFunctionService_Stub; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); @@ -336,7 +335,6 @@ Status ExecEnv::_init(const std::vector& store_paths, RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); RETURN_IF_ERROR(_wal_manager->init()); _heartbeat_flags = new HeartbeatFlags(); - _register_metrics(); _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); @@ -668,20 +666,6 @@ Status ExecEnv::_check_deploy_mode() { return Status::OK(); } -void ExecEnv::_register_metrics() { - REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, - [this]() { return _send_batch_thread_pool->num_threads(); }); - - REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, - [this]() { return _send_batch_thread_pool->get_queue_size(); }); -} - -void ExecEnv::_deregister_metrics() { - DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); -} - // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. // We need to stop all threads before releasing resource. void ExecEnv::destroy() { @@ -736,7 +720,6 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_s3_file_system_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); - _deregister_metrics(); SAFE_DELETE(_load_channel_mgr); SAFE_DELETE(_spill_stream_mgr); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6b9388af30a7f79..3ceeed2de191860 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, if (_scan_task_sched == nullptr) { std::unique_ptr scan_scheduler = std::make_unique("Scan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size); @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr remote_scan_scheduler = std::make_unique("RScan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = remote_scan_scheduler->start(remote_max_thread_num, config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e77ee1c36b6b89a..39f246d98d37375 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); } void DorisMetrics::initialize(bool init_system_metrics, const std::set& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index d089758c21c93fc..6e4ea2efa7692de 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -197,10 +197,7 @@ class DorisMetrics { UIntGauge* query_cache_sql_total_count = nullptr; UIntGauge* query_cache_partition_total_count = nullptr; - UIntGauge* scanner_thread_pool_queue_size = nullptr; UIntGauge* add_batch_task_queue_size = nullptr; - UIntGauge* send_batch_thread_pool_thread_num = nullptr; - UIntGauge* send_batch_thread_pool_queue_size = nullptr; UIntGauge* fragment_thread_pool_queue_size = nullptr; UIntGauge* fragment_thread_pool_num_active_threads = nullptr; @@ -273,7 +270,6 @@ class DorisMetrics { void _update_process_thread_num(); void _update_process_fd_num(); -private: static const std::string _s_registry_name; static const std::string _s_hook_name; diff --git a/be/src/util/interval_histogram.cpp b/be/src/util/interval_histogram.cpp new file mode 100644 index 000000000000000..ec894fc69fa3fe4 --- /dev/null +++ b/be/src/util/interval_histogram.cpp @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/interval_histogram.h" + +#include +#include +#include +#include + +#include "gutil/integral_types.h" + +namespace doris { + +template +IntervalHistogramStat::IntervalHistogramStat(size_t N) : window(N) {} + +template +void IntervalHistogramStat::add(T value) { + std::unique_lock lock(mutex); + if (window.full()) { + window.pop_front(); + } + window.push_back(value); +} + +template +T IntervalHistogramStat::mean() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + T sum = std::accumulate(window.begin(), window.end(), T()); + return sum / window.size(); +} + +template +T IntervalHistogramStat::median() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + + std::vector sorted(window.begin(), window.end()); + std::sort(sorted.begin(), sorted.end()); + + size_t mid = sorted.size() / 2; + return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; +} + +template +T IntervalHistogramStat::max() { + std::shared_lock lock(mutex); + return *std::max_element(window.begin(), window.end()); +} + +template +T IntervalHistogramStat::min() { + std::shared_lock lock(mutex); + return *std::min_element(window.begin(), window.end()); +} + +template class doris::IntervalHistogramStat; +template class doris::IntervalHistogramStat; + +} // namespace doris diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h new file mode 100644 index 000000000000000..2d5d9e6c6d2963e --- /dev/null +++ b/be/src/util/interval_histogram.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +// A thread-safe interval histogram stat class. +// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide +// statistics like mean, median, max, min. + +template +class IntervalHistogramStat { +public: + explicit IntervalHistogramStat(size_t N); + + void add(T value); + + T mean(); + T median(); + T max(); + T min(); + +private: + boost::circular_buffer window; + mutable std::shared_mutex mutex; +}; + +} // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 23dbb628a0d95ba..1e92a360a4d9198 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const { std::string HistogramMetric::to_prometheus(const std::string& display_name, const Labels& entity_labels, const Labels& metric_labels) const { + // TODO: Use std::string concate for better performance. std::stringstream ss; for (const auto& percentile : _s_output_percentiles) { auto quantile_lable = Labels({{"quantile", percentile.first}}); diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index e9af13f556e1436..33ed11dd3cae46e 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -33,10 +33,25 @@ #include "gutil/port.h" #include "gutil/strings/substitute.h" #include "util/debug/sanitizer_scopes.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" #include "util/scoped_cleanup.h" +#include "util/stopwatch.hpp" #include "util/thread.h" namespace doris { + +// The name of these varialbs will be useds as metric name in prometheus. +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); + using namespace ErrorCode; using std::string; @@ -52,8 +67,9 @@ class FunctionRunnable : public Runnable { std::function _func; }; -ThreadPoolBuilder::ThreadPoolBuilder(string name) +ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group) : _name(std::move(name)), + _workload_group(std::move(workload_group)), _min_threads(0), _max_threads(std::thread::hardware_concurrency()), _max_queue_size(std::numeric_limits::max()), @@ -238,6 +254,7 @@ bool ThreadPoolToken::need_dispatch() { ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) : _name(builder._name), + _workload_group(builder._workload_group), _min_threads(builder._min_threads), _max_threads(builder._max_threads), _max_queue_size(builder._max_queue_size), @@ -248,7 +265,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) _active_threads(0), _total_queued_tasks(0), _cgroup_cpu_ctl(builder._cgroup_cpu_ctl), - _tokenless(new_token(ExecutionMode::CONCURRENT)) {} + _tokenless(new_token(ExecutionMode::CONCURRENT)), + _id(UniqueId::gen_uid()) {} ThreadPool::~ThreadPool() { // There should only be one live token: the one used in tokenless submission. @@ -270,10 +288,45 @@ Status ThreadPool::init() { return status; } } + + // _id of thread pool is used to make sure when we create thread pool with same name, we can + // get different _metric_entity + // If not, we will have problem when we deregister entity and register hook. + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + fmt::format("thread_pool_{}", _name), + {{"thread_pool_name", _name}, {"workload_group", _workload_group}, {"id", _id.to_string()}}); + + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times); + INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); + + _metric_entity->register_hook("update", [this]() { + { + std::lock_guard l(_lock); + if (!_pool_status.ok()) { + return; + } + } + + thread_pool_active_threads->set_value(num_active_threads()); + thread_pool_queue_size->set_value(get_queue_size()); + thread_pool_max_queue_size->set_value(get_max_queue_size()); + thread_pool_max_threads->set_value(max_threads()); + task_execution_time_ns_avg_in_last_1000_times->set_value( + _task_execution_time_ns_statistic.mean()); + task_wait_worker_ns_avg_in_last_1000_times->set_value( + _task_wait_worker_time_ns_statistic.mean()); + }); + return Status::OK(); } void ThreadPool::shutdown() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::unique_lock l(_lock); check_not_pool_thread_unlocked(); @@ -357,8 +410,6 @@ Status ThreadPool::submit_func(std::function f) { Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token) { DCHECK(token); - std::chrono::time_point submit_time = - std::chrono::system_clock::now(); std::unique_lock l(_lock); if (PREDICT_FALSE(!_pool_status.ok())) { @@ -373,6 +424,7 @@ Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token int64_t capacity_remaining = static_cast(_max_threads) - _active_threads + static_cast(_max_queue_size) - _total_queued_tasks; if (capacity_remaining < 1) { + thread_pool_submit_failed->increment(1); return Status::Error( "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, @@ -408,7 +460,7 @@ Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token Task task; task.runnable = std::move(r); - task.submit_time = submit_time; + task.submit_time_wather.start(); // Add the task to the token's queue. ThreadPoolToken::State state = token->state(); @@ -528,12 +580,15 @@ void ThreadPool::dispatch_thread() { continue; } + MonotonicStopWatch task_execution_time_watch; + task_execution_time_watch.start(); // Get the next token and task to execute. ThreadPoolToken* token = _queue.front(); _queue.pop_front(); DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); DCHECK(!token->_entries.empty()); Task task = std::move(token->_entries.front()); + _task_wait_worker_time_ns_statistic.add(task.submit_time_wather.elapsed_time()); token->_entries.pop_front(); token->_active_threads++; --_total_queued_tasks; @@ -543,7 +598,6 @@ void ThreadPool::dispatch_thread() { // Execute the task task.runnable->run(); - // Destruct the task while we do not hold the lock. // // The task's destructor may be expensive if it has a lot of bound @@ -553,6 +607,8 @@ void ThreadPool::dispatch_thread() { task.runnable.reset(); l.lock(); + _task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time()); + // Possible states: // 1. The token was shut down while we ran its task. Transition to QUIESCED. // 2. The token has no more queued tasks. Transition back to IDLE. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index f822c307aa6b8ed..1dcbd9547813ac4 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -20,6 +20,7 @@ #pragma once +#include #include #include #include @@ -38,6 +39,9 @@ #include "agent/cgroup_cpu_ctl.h" #include "common/status.h" +#include "util/interval_histogram.h" +#include "util/metrics.h" +#include "util/uid_util.h" #include "util/work_thread_pool.hpp" namespace doris { @@ -99,7 +103,7 @@ class Runnable { // class ThreadPoolBuilder { public: - explicit ThreadPoolBuilder(std::string name); + explicit ThreadPoolBuilder(std::string name, std::string workload_group = "system"); // Note: We violate the style guide by returning mutable references here // in order to provide traditional Builder pattern conveniences. @@ -132,6 +136,7 @@ class ThreadPoolBuilder { private: friend class ThreadPool; const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; int _max_queue_size; @@ -226,36 +231,49 @@ class ThreadPool { // Return the number of threads currently running (or in the process of starting up) // for this thread pool. int num_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _num_threads + _num_threads_pending_start; } int max_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _max_threads; } int min_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _min_threads; } int num_threads_pending_start() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _num_threads_pending_start; } int num_active_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _active_threads; } int get_queue_size() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _total_queued_tasks; } + int get_max_queue_size() const { + // TODO: Use RW lock or atomic for performance. + std::lock_guard l(_lock); + return _max_queue_size; + } + std::vector debug_info() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); std::vector arr = {_num_threads, static_cast(_threads.size()), _min_threads, _max_threads}; @@ -280,7 +298,7 @@ class ThreadPool { std::shared_ptr runnable; // Time at which the entry was submitted to the pool. - std::chrono::time_point submit_time; + MonotonicStopWatch submit_time_wather; }; // Creates a new thread pool using a builder. @@ -308,6 +326,7 @@ class ThreadPool { void release_token(ThreadPoolToken* t); const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; const int _max_queue_size; @@ -392,6 +411,21 @@ class ThreadPool { // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr _tokenless; + + std::shared_ptr _metric_entity; + IntGauge* thread_pool_active_threads = nullptr; + IntGauge* thread_pool_queue_size = nullptr; + IntGauge* thread_pool_max_queue_size = nullptr; + IntGauge* thread_pool_max_threads = nullptr; + IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr; + IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr; + + IntervalHistogramStat _task_execution_time_ns_statistic {1000}; + IntervalHistogramStat _task_wait_worker_time_ns_statistic {1000}; + + IntCounter* thread_pool_submit_failed = nullptr; + + const UniqueId _id; }; // Entry point for token-based task submission and blocking for a particular diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f419f58037aab64..33ea3e20977cb4d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -141,11 +141,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, scanner_delegate->_scanner->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -171,11 +166,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, auto sumbit_task = [&]() { SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler(); auto work_func = [scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 7731b3ba8f983b8..e94659c79d1bff9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,8 +114,12 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl) - : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl, + std::string workload_group = "system") + : _is_stop(false), + _cgroup_cpu_ctl(cgroup_cpu_ctl), + _sched_name(sched_name), + _workload_group(workload_group) {} ~SimplifiedScanScheduler() { stop(); @@ -129,7 +133,7 @@ class SimplifiedScanScheduler { } Status start(int max_thread_num, int min_thread_num, int queue_size) { - RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group) .set_min_threads(min_thread_num) .set_max_threads(max_thread_num) .set_max_queue_size(queue_size) @@ -140,13 +144,7 @@ class SimplifiedScanScheduler { Status submit_scan_task(SimplifiedScanTask scan_task) { if (!_is_stop) { - DorisMetrics::instance()->scanner_task_queued->increment(1); - auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); - if (!st.ok()) { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_submit_failed->increment(1); - } - return st; + return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); } else { return Status::InternalError("scanner pool {} is shutdown.", _sched_name); } @@ -216,6 +214,7 @@ class SimplifiedScanScheduler { std::atomic _is_stop; std::weak_ptr _cgroup_cpu_ctl; std::string _sched_name; + std::string _workload_group; }; } // namespace doris::vectorized diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 0c3001758f08af5..2e13436b3d3337f 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -236,6 +236,8 @@ TEST_F(BetaRowsetTest, ReadTest) { .region = "region", .ak = "ak", .sk = "sk", + .token = "", + .bucket = "", }}; std::string resource_id = "10000"; auto res = io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID); diff --git a/be/test/util/interval_histogram_test.cpp b/be/test/util/interval_histogram_test.cpp new file mode 100644 index 000000000000000..0c753a29eac155e --- /dev/null +++ b/be/test/util/interval_histogram_test.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "util/interval_histogram.h" + +namespace doris { + +TEST(IntervalHistogramStat, SerialTest) { + IntervalHistogramStat stat(5); + + stat.add(10); + stat.add(20); + stat.add(30); + stat.add(40); + stat.add(50); + + EXPECT_EQ(stat.mean(), 30); + EXPECT_EQ(stat.median(), 30); + EXPECT_EQ(stat.max(), 50); + EXPECT_EQ(stat.min(), 10); + + // Make window move forward + stat.add(60); + stat.add(70); + + // window now contains [30, 40, 50, 60, 70] + EXPECT_EQ(stat.mean(), 50); + EXPECT_EQ(stat.median(), 50); + EXPECT_EQ(stat.max(), 70); + EXPECT_EQ(stat.min(), 30); +} + +TEST(IntervalHistogramStatTest, ParallelTest) { + constexpr int thread_count = 10; + constexpr int values_per_thread = 10; + IntervalHistogramStat stat(thread_count * values_per_thread); + + auto add_values = [&stat](int start_value, int count) { + for (int i = 0; i < count; ++i) { + stat.add(start_value + i); + } + }; + + std::vector threads; + for (int i = 0; i < thread_count; ++i) { + threads.emplace_back(add_values, i * values_per_thread, values_per_thread); + } + + for (auto& thread : threads) { + thread.join(); + } + + int total_values = thread_count * values_per_thread; + EXPECT_EQ(stat.mean(), (total_values - 1) / 2); + EXPECT_EQ(stat.max(), total_values - 1); + EXPECT_EQ(stat.min(), 0); + EXPECT_EQ(stat.median(), (total_values - 1) / 2); +} + +} // namespace doris diff --git a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp index 3dbf34a4dcb30c9..a82448f5502ce01 100644 --- a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp +++ b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp @@ -205,7 +205,7 @@ class AggLinearHistogramTest : public testing::Test { AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); auto agg_function = - factory.get("linear_histogram", data_types, false, -1, {.enable_decimal256 = true}); + factory.get("linear_histogram", data_types, false, -1, {.enable_decimal256 = true, .column_names={""}}); EXPECT_NE(agg_function, nullptr); std::unique_ptr memory(new char[agg_function->size_of_data()]); diff --git a/regression-test/suites/workload_manager_p0/test_metrics_with_workload_group.groovy b/regression-test/suites/workload_manager_p0/test_metrics_with_workload_group.groovy new file mode 100644 index 000000000000000..9ae5ae7b01c107d --- /dev/null +++ b/regression-test/suites/workload_manager_p0/test_metrics_with_workload_group.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +def getMetrics = { ip, port -> + def dst = 'http://' + ip + ':' + port + def conn = new URL(dst + "/metrics").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + +suite("test_metrics_with_workload_group") { + for (int i = 0; i < 20; i++) { + // 1. SHOW BACKENDS get be ip and http port + Map backendId_to_backendIP = new HashMap<>(); + Map backendId_to_backendHttpPort = new HashMap<>(); + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + // Print above maps in logger. + logger.info("backendId_to_backendIP: " + backendId_to_backendIP); + logger.info("backendId_to_backendHttpPort: " + backendId_to_backendHttpPort); + + // 2. CREATE WORKLOAD GROUP + sql "drop workload group if exists test_wg_metrics;" + sql "create workload group if not exists test_wg_metrics " + + "properties ( " + + " 'cpu_share'='10', " + + " 'memory_limit'='10%', " + + " 'enable_memory_overcommit'='true' " + + ");" + sql "set workload_group=test_wg_metrics;" + wg = sql("select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name = 'test_wg_metrics' order by name;"); + logger.info("wg: " + wg); + + // 3. EXECUTE A QUERY SO THAT THE WORKLOAD GROUP IS USED + sql "select count(*) from numbers(\"number\"=\"100\");" + + // curl backend http port to get metrics + // get first backendId + backendId = backendId_to_backendIP.keySet().iterator().next(); + backendIP = backendId_to_backendIP.get(backendId); + backendHttpPort = backendId_to_backendHttpPort.get(backendId); + logger.info("backendId: " + backendId + ", backendIP: " + backendIP + ", backendHttpPort: " + backendHttpPort); + + // Create a for loop to get metrics 5 times + for (int i = 0; i < 5; i++) { + String metrics = getMetrics(backendIP, backendHttpPort); + String filteredMetrics = metrics.split('\n').findAll { line -> + line.startsWith('doris_be_thread_pool') && line.contains('workload_group="test_wg_metrics"') && line.contains('thread_pool_name="Scan_test_wg_metrics"') + }.join('\n') + // Filter metrics with name test_wg_metrics + logger.info("filteredMetrics: " + filteredMetrics); + + List lines = filteredMetrics.split('\n').findAll { it.trim() } + assert lines.size() == 5 + } + } +} \ No newline at end of file