diff --git a/be/src/util/interval_histogram.cpp b/be/src/util/interval_histogram.cpp new file mode 100644 index 000000000000000..6683a8ea36ab363 --- /dev/null +++ b/be/src/util/interval_histogram.cpp @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/interval_histogram.h" + +#include +#include +#include +#include + +#include "gutil/integral_types.h" + +namespace doris { + +template +IntervalHistogramStat::IntervalHistogramStat(size_t N) : window(N) {} + +template +void IntervalHistogramStat::add(T value) { + std::unique_lock lock(mutex); + if (window.full()) { + window.pop_front(); + } + window.push_back(value); +} + +template +T IntervalHistogramStat::mean() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + T sum = std::accumulate(window.begin(), window.end(), T()); + return sum / window.size(); +} + +template +T IntervalHistogramStat::median() { + std::shared_lock lock(mutex); + if (window.empty()) { + return T(); + } + + std::vector sorted(window.begin(), window.end()); + std::sort(sorted.begin(), sorted.end()); + + size_t mid = sorted.size() / 2; + return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; +} + +template +T IntervalHistogramStat::max() { + std::shared_lock lock(mutex); + return *std::max_element(window.begin(), window.end()); +} + +template +T IntervalHistogramStat::min() { + std::shared_lock lock(mutex); + return *std::min_element(window.begin(), window.end()); +} + +template class doris::IntervalHistogramStat; + +} // namespace doris diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h index 6b41d2dcca92a65..2d5d9e6c6d2963e 100644 --- a/be/src/util/interval_histogram.h +++ b/be/src/util/interval_histogram.h @@ -17,64 +17,30 @@ #pragma once -#include #include -#include -#include #include -#include namespace doris { +// A thread-safe interval histogram stat class. +// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide +// statistics like mean, median, max, min. + template class IntervalHistogramStat { public: - IntervalHistogramStat(size_t N) : window(N) {} - - void add(T value) { - std::unique_lock lock(mutex); - - if (window.full()) { - window.pop_front(); - } - window.push_back(value); - } - - T mean() { - std::shared_lock lock(mutex); - if (window.empty()) { - return T(); - } - - T sum = std::accumulate(window.begin(), window.end(), T()); - return sum / window.size(); - } + explicit IntervalHistogramStat(size_t N); - T median() { - std::shared_lock lock(mutex); - if (window.empty()) { - return T(); - } + void add(T value); - std::vector sorted(window.begin(), window.end()); - std::sort(sorted.begin(), sorted.end()); - - size_t mid = sorted.size() / 2; - return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; - } - - T max() { - std::shared_lock lock(mutex); - return *std::max_element(window.begin(), window.end()); - } - - T min() { - std::shared_lock lock(mutex); - return *std::min_element(window.begin(), window.end()); - } + T mean(); + T median(); + T max(); + T min(); private: boost::circular_buffer window; - std::shared_mutex mutex; + mutable std::shared_mutex mutex; }; -} // namespace doris \ No newline at end of file + +} // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 23dbb628a0d95ba..1e92a360a4d9198 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const { std::string HistogramMetric::to_prometheus(const std::string& display_name, const Labels& entity_labels, const Labels& metric_labels) const { + // TODO: Use std::string concate for better performance. std::stringstream ss; for (const auto& percentile : _s_output_percentiles) { auto quantile_lable = Labels({{"quantile", percentile.first}}); diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 343690681e4882f..7fe63c4a7d63344 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -41,10 +41,10 @@ namespace doris { // The name of these varialbs will be useds as metric name in prometheus. -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_running_tasks, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_capacity, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_thread_number, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times, MetricUnit::NANOSECONDS); @@ -291,19 +291,19 @@ Status ThreadPool::init() { fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name}, {"workload_group", _workload_group}}); - INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_running_tasks); - INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_thread_number); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads); INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); - INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_capacity); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size); INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times); INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times); INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); _metric_entity->register_hook("update", [this]() { - thread_pool_running_tasks->set_value(num_active_threads()); + thread_pool_active_threads->set_value(num_active_threads()); thread_pool_queue_size->set_value(get_queue_size()); - thread_pool_queue_capacity->set_value(get_max_queue_size()); - thread_pool_max_thread_number->set_value(max_threads()); + thread_pool_max_queue_size->set_value(get_max_queue_size()); + thread_pool_max_threads->set_value(max_threads()); task_execution_time_ns_avg_in_last_1000_times->set_value( _task_execution_time_ns_statistic.mean()); task_wait_worker_ns_avg_in_last_1000_times->set_value( @@ -586,6 +586,7 @@ void ThreadPool::dispatch_thread() { auto submit_ns = std::chrono::duration_cast( task.submit_time.time_since_epoch()) .count(); + _task_wait_worker_time_ns_statistic.add(current_ns - submit_ns); token->_entries.pop_front(); token->_active_threads++; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 0a2d0409f8ba459..a93083802f3a806 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -411,10 +411,10 @@ class ThreadPool { std::unique_ptr _tokenless; std::shared_ptr _metric_entity; - IntGauge* thread_pool_running_tasks = nullptr; + IntGauge* thread_pool_active_threads = nullptr; IntGauge* thread_pool_queue_size = nullptr; - IntGauge* thread_pool_queue_capacity = nullptr; - IntGauge* thread_pool_max_thread_number = nullptr; + IntGauge* thread_pool_max_queue_size = nullptr; + IntGauge* thread_pool_max_threads = nullptr; IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr; IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr;