Skip to content

Commit

Permalink
REF
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 20, 2024
1 parent c305edd commit ada5aeb
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 59 deletions.
79 changes: 79 additions & 0 deletions be/src/util/interval_histogram.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/interval_histogram.h"

#include <algorithm>
#include <mutex>
#include <numeric>
#include <vector>

#include "gutil/integral_types.h"

namespace doris {

template <typename T>
IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {}

template <typename T>
void IntervalHistogramStat<T>::add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);
if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

template <typename T>
T IntervalHistogramStat<T>::mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}
T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}

template <typename T>
T IntervalHistogramStat<T>::median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

template <typename T>
T IntervalHistogramStat<T>::max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

template <typename T>
T IntervalHistogramStat<T>::min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}

template class doris::IntervalHistogramStat<int64>;

} // namespace doris
60 changes: 13 additions & 47 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,30 @@

#pragma once

#include <algorithm>
#include <boost/circular_buffer.hpp>
#include <mutex>
#include <numeric>
#include <shared_mutex>
#include <vector>

namespace doris {

// A thread-safe interval histogram stat class.
// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide
// statistics like mean, median, max, min.

template <typename T>
class IntervalHistogramStat {
public:
IntervalHistogramStat(size_t N) : window(N) {}

void add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);

if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

T mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}
explicit IntervalHistogramStat(size_t N);

T median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}
void add(T value);

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

T max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

T min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}
T mean();
T median();
T max();
T min();

private:
boost::circular_buffer<T> window;
std::shared_mutex mutex;
mutable std::shared_mutex mutex;
};
} // namespace doris

} // namespace doris
1 change: 1 addition & 0 deletions be/src/util/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const {
std::string HistogramMetric::to_prometheus(const std::string& display_name,
const Labels& entity_labels,
const Labels& metric_labels) const {
// TODO: Use std::string concate for better performance.
std::stringstream ss;
for (const auto& percentile : _s_output_percentiles) {
auto quantile_lable = Labels({{"quantile", percentile.first}});
Expand Down
19 changes: 10 additions & 9 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
namespace doris {

// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_running_tasks, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_capacity, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_thread_number, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);
Expand Down Expand Up @@ -291,19 +291,19 @@ Status ThreadPool::init() {
fmt::format("thread_pool_{}", _name),
{{"thread_pool_name", _name}, {"workload_group", _workload_group}});

INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_running_tasks);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_thread_number);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_capacity);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);

_metric_entity->register_hook("update", [this]() {
thread_pool_running_tasks->set_value(num_active_threads());
thread_pool_active_threads->set_value(num_active_threads());
thread_pool_queue_size->set_value(get_queue_size());
thread_pool_queue_capacity->set_value(get_max_queue_size());
thread_pool_max_thread_number->set_value(max_threads());
thread_pool_max_queue_size->set_value(get_max_queue_size());
thread_pool_max_threads->set_value(max_threads());
task_execution_time_ns_avg_in_last_1000_times->set_value(
_task_execution_time_ns_statistic.mean());
task_wait_worker_ns_avg_in_last_1000_times->set_value(
Expand Down Expand Up @@ -586,6 +586,7 @@ void ThreadPool::dispatch_thread() {
auto submit_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
task.submit_time.time_since_epoch())
.count();

_task_wait_worker_time_ns_statistic.add(current_ns - submit_ns);
token->_entries.pop_front();
token->_active_threads++;
Expand Down
6 changes: 3 additions & 3 deletions be/src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ class ThreadPool {
std::unique_ptr<ThreadPoolToken> _tokenless;

std::shared_ptr<MetricEntity> _metric_entity;
IntGauge* thread_pool_running_tasks = nullptr;
IntGauge* thread_pool_active_threads = nullptr;
IntGauge* thread_pool_queue_size = nullptr;
IntGauge* thread_pool_queue_capacity = nullptr;
IntGauge* thread_pool_max_thread_number = nullptr;
IntGauge* thread_pool_max_queue_size = nullptr;
IntGauge* thread_pool_max_threads = nullptr;
IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr;
IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr;

Expand Down

0 comments on commit ada5aeb

Please sign in to comment.