From 47474bacb4ed133c044e0fde8cadd1d2bc1d7d85 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Fri, 20 Dec 2024 12:11:23 +0800 Subject: [PATCH] DEV --- be/src/runtime/exec_env.cpp | 7 ++ be/src/runtime/exec_env.h | 5 ++ be/src/runtime/exec_env_init.cpp | 9 ++- .../runtime/workload_group/workload_group.cpp | 4 +- be/src/util/doris_metrics.cpp | 6 -- be/src/util/doris_metrics.h | 19 +++-- be/src/util/interval_histogram.h | 80 +++++++++++++++++++ be/src/util/threadpool.cpp | 59 +++++++++++++- be/src/util/threadpool.h | 32 +++++++- be/src/vec/exec/scan/scanner_scheduler.cpp | 10 --- be/src/vec/exec/scan/scanner_scheduler.h | 19 +++-- 11 files changed, 207 insertions(+), 43 deletions(-) create mode 100644 be/src/util/interval_histogram.h diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index ab24d7ca192689f..202540274cb9d1d 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -77,6 +77,13 @@ std::vector ExecEnv::get_frontends() { return infos; } +doris::DorisMetrics* ExecEnv::doris_metrics() { + if (_doris_metrics == nullptr) { + _doris_metrics = std::unique_ptr(DorisMetrics::create()); + } + return _doris_metrics.get(); +} + void ExecEnv::update_frontends(const std::vector& new_fe_infos) { std::lock_guard lg(_frontends_lock); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 636ce2bf288b58a..d20627ff106ff2c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -45,6 +45,7 @@ class MemoryPool; } namespace doris { +class DorisMetrics; namespace vectorized { class VDataStreamMgr; class ScannerScheduler; @@ -349,6 +350,8 @@ class ExecEnv { bool check_auth_token(const std::string& auth_token); + DorisMetrics* doris_metrics(); + private: ExecEnv(); @@ -488,6 +491,8 @@ class ExecEnv { orc::MemoryPool* _orc_memory_pool = nullptr; arrow::MemoryPool* _arrow_memory_pool = nullptr; + + std::unique_ptr _doris_metrics; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a371cdb947ff56d..eed6750c28ce2a6 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -139,7 +139,6 @@ namespace doris { class PBackendService_Stub; class PFunctionService_Stub; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); @@ -204,6 +203,11 @@ Status ExecEnv::_init(const std::vector& store_paths, spill_path.path, spill_path.capacity_bytes, spill_path.storage_medium)); } + + if (_doris_metrics == nullptr) { + _doris_metrics = std::unique_ptr(DorisMetrics::create()); + } + init_doris_metrics(store_paths); _store_paths = store_paths; _tmp_file_dirs = std::make_unique(_store_paths); @@ -677,7 +681,6 @@ void ExecEnv::_register_metrics() { } void ExecEnv::_deregister_metrics() { - DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); } @@ -825,6 +828,8 @@ void ExecEnv::destroy() { SAFE_DELETE(_process_profile); SAFE_DELETE(_heap_profiler); + _doris_metrics.reset(); + _s_tracking_memory = false; LOG(INFO) << "Doris exec envorinment is destoried."; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6b9388af30a7f79..3ceeed2de191860 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, if (_scan_task_sched == nullptr) { std::unique_ptr scan_scheduler = std::make_unique("Scan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size); @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr remote_scan_scheduler = std::make_unique("RScan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = remote_scan_scheduler->start(remote_max_thread_num, config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e9d4f31e5ca137b..1f7134e708dd37b 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); } void DorisMetrics::initialize(bool init_system_metrics, const std::set& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 31b907eec9ed6cf..001f5f4adbb6e8a 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -24,6 +24,7 @@ #include #include +#include "runtime/exec_env.h" #include "util/jvm_metrics.h" #include "util/metrics.h" #include "util/system_metrics.h" @@ -47,6 +48,14 @@ namespace doris { class DorisMetrics { public: + static DorisMetrics* create() { + // Life cycle if DorisMetrics will be managed by ExecEnv + static DorisMetrics* metrics = new DorisMetrics; + return metrics; + } + + static DorisMetrics* instance() { return ExecEnv::GetInstance()->doris_metrics(); } + IntCounter* fragment_requests_total = nullptr; IntCounter* fragment_request_duration_us = nullptr; IntCounter* query_scan_bytes = nullptr; @@ -197,7 +206,6 @@ class DorisMetrics { UIntGauge* query_cache_sql_total_count = nullptr; UIntGauge* query_cache_partition_total_count = nullptr; - UIntGauge* scanner_thread_pool_queue_size = nullptr; UIntGauge* add_batch_task_queue_size = nullptr; UIntGauge* send_batch_thread_pool_thread_num = nullptr; UIntGauge* send_batch_thread_pool_queue_size = nullptr; @@ -244,14 +252,6 @@ class DorisMetrics { IntAtomicCounter* scanner_ctx_cnt = nullptr; IntAtomicCounter* scanner_cnt = nullptr; IntAtomicCounter* scanner_task_cnt = nullptr; - IntAtomicCounter* scanner_task_queued = nullptr; - IntAtomicCounter* scanner_task_submit_failed = nullptr; - IntAtomicCounter* scanner_task_running = nullptr; - - static DorisMetrics* instance() { - static DorisMetrics instance; - return &instance; - } // not thread-safe, call before calling metrics void initialize( @@ -273,7 +273,6 @@ class DorisMetrics { void _update_process_thread_num(); void _update_process_fd_num(); -private: static const std::string _s_registry_name; static const std::string _s_hook_name; diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h new file mode 100644 index 000000000000000..6b41d2dcca92a65 --- /dev/null +++ b/be/src/util/interval_histogram.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace doris { + +template +class IntervalHistogramStat { +public: + IntervalHistogramStat(size_t N) : window(N) {} + + void add(T value) { + std::unique_lock lock(mutex); + + if (window.full()) { + window.pop_front(); + } + window.push_back(value); + } + + T mean() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + + T sum = std::accumulate(window.begin(), window.end(), T()); + return sum / window.size(); + } + + T median() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + + std::vector sorted(window.begin(), window.end()); + std::sort(sorted.begin(), sorted.end()); + + size_t mid = sorted.size() / 2; + return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; + } + + T max() { + std::shared_lock lock(mutex); + return *std::max_element(window.begin(), window.end()); + } + + T min() { + std::shared_lock lock(mutex); + return *std::min_element(window.begin(), window.end()); + } + +private: + boost::circular_buffer window; + std::shared_mutex mutex; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index f5ea38515def363..343690681e4882f 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -32,10 +32,25 @@ #include "gutil/port.h" #include "gutil/strings/substitute.h" #include "util/debug/sanitizer_scopes.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" #include "util/scoped_cleanup.h" +#include "util/stopwatch.hpp" #include "util/thread.h" namespace doris { + +// The name of these varialbs will be useds as metric name in prometheus. +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_running_tasks, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_capacity, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_thread_number, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); + using namespace ErrorCode; using std::string; @@ -51,8 +66,9 @@ class FunctionRunnable : public Runnable { std::function _func; }; -ThreadPoolBuilder::ThreadPoolBuilder(string name) +ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group) : _name(std::move(name)), + _workload_group(std::move(workload_group)), _min_threads(0), _max_threads(std::thread::hardware_concurrency()), _max_queue_size(std::numeric_limits::max()), @@ -237,6 +253,7 @@ bool ThreadPoolToken::need_dispatch() { ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) : _name(builder._name), + _workload_group(builder._workload_group), _min_threads(builder._min_threads), _max_threads(builder._max_threads), _max_queue_size(builder._max_queue_size), @@ -269,10 +286,35 @@ Status ThreadPool::init() { return status; } } + + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + fmt::format("thread_pool_{}", _name), + {{"thread_pool_name", _name}, {"workload_group", _workload_group}}); + + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_running_tasks); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_thread_number); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_capacity); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times); + INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); + + _metric_entity->register_hook("update", [this]() { + thread_pool_running_tasks->set_value(num_active_threads()); + thread_pool_queue_size->set_value(get_queue_size()); + thread_pool_queue_capacity->set_value(get_max_queue_size()); + thread_pool_max_thread_number->set_value(max_threads()); + task_execution_time_ns_avg_in_last_1000_times->set_value( + _task_execution_time_ns_statistic.mean()); + task_wait_worker_ns_avg_in_last_1000_times->set_value( + _task_wait_worker_time_ns_statistic.mean()); + }); + return Status::OK(); } void ThreadPool::shutdown() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::unique_lock l(_lock); check_not_pool_thread_unlocked(); @@ -372,6 +414,7 @@ Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token int64_t capacity_remaining = static_cast(_max_threads) - _active_threads + static_cast(_max_queue_size) - _total_queued_tasks; if (capacity_remaining < 1) { + thread_pool_submit_failed->increment(1); return Status::Error( "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, @@ -527,12 +570,23 @@ void ThreadPool::dispatch_thread() { continue; } + MonotonicStopWatch task_execution_time_watch; + task_execution_time_watch.start(); // Get the next token and task to execute. ThreadPoolToken* token = _queue.front(); _queue.pop_front(); DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); DCHECK(!token->_entries.empty()); Task task = std::move(token->_entries.front()); + std::chrono::time_point current = + std::chrono::system_clock::now(); + auto current_ns = + std::chrono::duration_cast(current.time_since_epoch()) + .count(); + auto submit_ns = std::chrono::duration_cast( + task.submit_time.time_since_epoch()) + .count(); + _task_wait_worker_time_ns_statistic.add(current_ns - submit_ns); token->_entries.pop_front(); token->_active_threads++; --_total_queued_tasks; @@ -542,7 +596,6 @@ void ThreadPool::dispatch_thread() { // Execute the task task.runnable->run(); - // Destruct the task while we do not hold the lock. // // The task's destructor may be expensive if it has a lot of bound @@ -552,6 +605,8 @@ void ThreadPool::dispatch_thread() { task.runnable.reset(); l.lock(); + _task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time()); + // Possible states: // 1. The token was shut down while we ran its task. Transition to QUIESCED. // 2. The token has no more queued tasks. Transition back to IDLE. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index f822c307aa6b8ed..0a2d0409f8ba459 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -38,6 +38,8 @@ #include "agent/cgroup_cpu_ctl.h" #include "common/status.h" +#include "util/interval_histogram.h" +#include "util/metrics.h" #include "util/work_thread_pool.hpp" namespace doris { @@ -99,7 +101,7 @@ class Runnable { // class ThreadPoolBuilder { public: - explicit ThreadPoolBuilder(std::string name); + explicit ThreadPoolBuilder(std::string name, std::string workload_group = "system"); // Note: We violate the style guide by returning mutable references here // in order to provide traditional Builder pattern conveniences. @@ -132,6 +134,7 @@ class ThreadPoolBuilder { private: friend class ThreadPool; const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; int _max_queue_size; @@ -226,36 +229,49 @@ class ThreadPool { // Return the number of threads currently running (or in the process of starting up) // for this thread pool. int num_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _num_threads + _num_threads_pending_start; } int max_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _max_threads; } int min_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _min_threads; } int num_threads_pending_start() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _num_threads_pending_start; } int num_active_threads() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _active_threads; } int get_queue_size() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); return _total_queued_tasks; } + int get_max_queue_size() const { + // TODO: Use RW lock or atomic for performance. + std::lock_guard l(_lock); + return _max_queue_size; + } + std::vector debug_info() const { + // TODO: Use RW lock or atomic for performance. std::lock_guard l(_lock); std::vector arr = {_num_threads, static_cast(_threads.size()), _min_threads, _max_threads}; @@ -308,6 +324,7 @@ class ThreadPool { void release_token(ThreadPoolToken* t); const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; const int _max_queue_size; @@ -392,6 +409,19 @@ class ThreadPool { // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr _tokenless; + + std::shared_ptr _metric_entity; + IntGauge* thread_pool_running_tasks = nullptr; + IntGauge* thread_pool_queue_size = nullptr; + IntGauge* thread_pool_queue_capacity = nullptr; + IntGauge* thread_pool_max_thread_number = nullptr; + IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr; + IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr; + + IntervalHistogramStat _task_execution_time_ns_statistic {1000}; + IntervalHistogramStat _task_wait_worker_time_ns_statistic {1000}; + + IntAtomicCounter* thread_pool_submit_failed = nullptr; }; // Entry point for token-based task submission and blocking for a particular diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f419f58037aab64..33ea3e20977cb4d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -141,11 +141,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, scanner_delegate->_scanner->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -171,11 +166,6 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, auto sumbit_task = [&]() { SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler(); auto work_func = [scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 7731b3ba8f983b8..e94659c79d1bff9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,8 +114,12 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl) - : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr cgroup_cpu_ctl, + std::string workload_group = "system") + : _is_stop(false), + _cgroup_cpu_ctl(cgroup_cpu_ctl), + _sched_name(sched_name), + _workload_group(workload_group) {} ~SimplifiedScanScheduler() { stop(); @@ -129,7 +133,7 @@ class SimplifiedScanScheduler { } Status start(int max_thread_num, int min_thread_num, int queue_size) { - RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group) .set_min_threads(min_thread_num) .set_max_threads(max_thread_num) .set_max_queue_size(queue_size) @@ -140,13 +144,7 @@ class SimplifiedScanScheduler { Status submit_scan_task(SimplifiedScanTask scan_task) { if (!_is_stop) { - DorisMetrics::instance()->scanner_task_queued->increment(1); - auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); - if (!st.ok()) { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_submit_failed->increment(1); - } - return st; + return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); } else { return Status::InternalError("scanner pool {} is shutdown.", _sched_name); } @@ -216,6 +214,7 @@ class SimplifiedScanScheduler { std::atomic _is_stop; std::weak_ptr _cgroup_cpu_ctl; std::string _sched_name; + std::string _workload_group; }; } // namespace doris::vectorized