Skip to content

Commit

Permalink
DEV
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 20, 2024
1 parent b252c0f commit 614c0c9
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 41 deletions.
2 changes: 0 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);

Expand Down Expand Up @@ -677,7 +676,6 @@ void ExecEnv::_register_metrics() {
}

void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
Expand All @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
Expand Down Expand Up @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class DorisMetrics {
UIntGauge* query_cache_sql_total_count = nullptr;
UIntGauge* query_cache_partition_total_count = nullptr;

UIntGauge* scanner_thread_pool_queue_size = nullptr;
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
Expand Down Expand Up @@ -244,10 +243,6 @@ class DorisMetrics {
IntAtomicCounter* scanner_ctx_cnt = nullptr;
IntAtomicCounter* scanner_cnt = nullptr;
IntAtomicCounter* scanner_task_cnt = nullptr;
IntAtomicCounter* scanner_task_queued = nullptr;
IntAtomicCounter* scanner_task_submit_failed = nullptr;
IntAtomicCounter* scanner_task_running = nullptr;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
Expand All @@ -273,7 +268,6 @@ class DorisMetrics {
void _update_process_thread_num();
void _update_process_fd_num();

private:
static const std::string _s_registry_name;
static const std::string _s_hook_name;

Expand Down
79 changes: 79 additions & 0 deletions be/src/util/interval_histogram.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/interval_histogram.h"

#include <algorithm>
#include <mutex>
#include <numeric>
#include <vector>

#include "gutil/integral_types.h"

namespace doris {

template <typename T>
IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {}

template <typename T>
void IntervalHistogramStat<T>::add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);
if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

template <typename T>
T IntervalHistogramStat<T>::mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}
T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}

template <typename T>
T IntervalHistogramStat<T>::median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

template <typename T>
T IntervalHistogramStat<T>::max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

template <typename T>
T IntervalHistogramStat<T>::min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}

template class doris::IntervalHistogramStat<int64>;

} // namespace doris
46 changes: 46 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <boost/circular_buffer.hpp>
#include <shared_mutex>

namespace doris {

// A thread-safe interval histogram stat class.
// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide
// statistics like mean, median, max, min.

template <typename T>
class IntervalHistogramStat {
public:
explicit IntervalHistogramStat(size_t N);

void add(T value);

T mean();
T median();
T max();
T min();

private:
boost::circular_buffer<T> window;
mutable std::shared_mutex mutex;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/util/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const {
std::string HistogramMetric::to_prometheus(const std::string& display_name,
const Labels& entity_labels,
const Labels& metric_labels) const {
// TODO: Use std::string concate for better performance.
std::stringstream ss;
for (const auto& percentile : _s_output_percentiles) {
auto quantile_lable = Labels({{"quantile", percentile.first}});
Expand Down
5 changes: 3 additions & 2 deletions be/src/util/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class LockSimpleMetric : public Metric {
T _value;
};

// TODO: Remove this class and related code.
template <typename T>
class CoreLocalCounter : public Metric {
public:
Expand Down Expand Up @@ -234,9 +235,9 @@ class LockGauge : public LockSimpleMetric<T> {
virtual ~LockGauge() {}
};

using IntCounter = CoreLocalCounter<int64_t>;
using IntCounter = AtomicCounter<int64_t>;
using IntAtomicCounter = AtomicCounter<int64_t>;
using UIntCounter = CoreLocalCounter<uint64_t>;
using UIntCounter = AtomicCounter<uint64_t>;
using DoubleCounter = LockCounter<double>;
using IntGauge = AtomicGauge<int64_t>;
using UIntGauge = AtomicGauge<uint64_t>;
Expand Down
60 changes: 58 additions & 2 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@
#include "gutil/port.h"
#include "gutil/strings/substitute.h"
#include "util/debug/sanitizer_scopes.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"

namespace doris {

// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);

using namespace ErrorCode;

using std::string;
Expand All @@ -51,8 +66,9 @@ class FunctionRunnable : public Runnable {
std::function<void()> _func;
};

ThreadPoolBuilder::ThreadPoolBuilder(string name)
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
: _name(std::move(name)),
_workload_group(std::move(workload_group)),
_min_threads(0),
_max_threads(std::thread::hardware_concurrency()),
_max_queue_size(std::numeric_limits<int>::max()),
Expand Down Expand Up @@ -237,6 +253,7 @@ bool ThreadPoolToken::need_dispatch() {

ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_workload_group(builder._workload_group),
_min_threads(builder._min_threads),
_max_threads(builder._max_threads),
_max_queue_size(builder._max_queue_size),
Expand Down Expand Up @@ -269,10 +286,35 @@ Status ThreadPool::init() {
return status;
}
}

_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("thread_pool_{}", _name),
{{"thread_pool_name", _name}, {"workload_group", _workload_group}});

INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);

_metric_entity->register_hook("update", [this]() {
thread_pool_active_threads->set_value(num_active_threads());
thread_pool_queue_size->set_value(get_queue_size());
thread_pool_max_queue_size->set_value(get_max_queue_size());
thread_pool_max_threads->set_value(max_threads());
task_execution_time_ns_avg_in_last_1000_times->set_value(
_task_execution_time_ns_statistic.mean());
task_wait_worker_ns_avg_in_last_1000_times->set_value(
_task_wait_worker_time_ns_statistic.mean());
});

return Status::OK();
}

void ThreadPool::shutdown() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
Expand Down Expand Up @@ -372,6 +414,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
if (capacity_remaining < 1) {
thread_pool_submit_failed->increment(1);
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
Expand Down Expand Up @@ -527,12 +570,24 @@ void ThreadPool::dispatch_thread() {
continue;
}

MonotonicStopWatch task_execution_time_watch;
task_execution_time_watch.start();
// Get the next token and task to execute.
ThreadPoolToken* token = _queue.front();
_queue.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->_entries.empty());
Task task = std::move(token->_entries.front());
std::chrono::time_point<std::chrono::system_clock> current =
std::chrono::system_clock::now();
auto current_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(current.time_since_epoch())
.count();
auto submit_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
task.submit_time.time_since_epoch())
.count();

_task_wait_worker_time_ns_statistic.add(current_ns - submit_ns);
token->_entries.pop_front();
token->_active_threads++;
--_total_queued_tasks;
Expand All @@ -542,7 +597,6 @@ void ThreadPool::dispatch_thread() {

// Execute the task
task.runnable->run();

// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
Expand All @@ -552,6 +606,8 @@ void ThreadPool::dispatch_thread() {
task.runnable.reset();
l.lock();

_task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());

// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
Expand Down
Loading

0 comments on commit 614c0c9

Please sign in to comment.