Skip to content

Commit

Permalink
NED
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 17, 2024
1 parent c966991 commit 5928144
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 33 deletions.
2 changes: 0 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);

Expand Down Expand Up @@ -677,7 +676,6 @@ void ExecEnv::_register_metrics() {
}

void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
Expand Down Expand Up @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
4 changes: 0 additions & 4 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class DorisMetrics {
UIntGauge* query_cache_sql_total_count = nullptr;
UIntGauge* query_cache_partition_total_count = nullptr;

UIntGauge* scanner_thread_pool_queue_size = nullptr;
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
Expand Down Expand Up @@ -243,9 +242,6 @@ class DorisMetrics {
IntAtomicCounter* scanner_ctx_cnt = nullptr;
IntAtomicCounter* scanner_cnt = nullptr;
IntAtomicCounter* scanner_task_cnt = nullptr;
IntAtomicCounter* scanner_task_queued = nullptr;
IntAtomicCounter* scanner_task_submit_failed = nullptr;
IntAtomicCounter* scanner_task_running = nullptr;

static DorisMetrics* instance() {
static DorisMetrics instance;
Expand Down
80 changes: 80 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
#include <boost/circular_buffer.hpp>
#include <mutex>
#include <numeric>
#include <shared_mutex>
#include <vector>

namespace doris {

template <typename T>
class IntervalHistogramStat {
public:
IntervalHistogramStat(size_t N) : window(N) {}

void add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);

if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

T mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}

T median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

T max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

T min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}

private:
boost::circular_buffer<T> window;
std::shared_mutex mutex;
};
} // namespace doris
59 changes: 57 additions & 2 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@
#include "gutil/port.h"
#include "gutil/strings/substitute.h"
#include "util/debug/sanitizer_scopes.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"

namespace doris {

// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_running_tasks, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_capacity, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_thread_number, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);

using namespace ErrorCode;

using std::string;
Expand All @@ -51,8 +66,9 @@ class FunctionRunnable : public Runnable {
std::function<void()> _func;
};

ThreadPoolBuilder::ThreadPoolBuilder(string name)
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
: _name(std::move(name)),
_workload_group(std::move(workload_group)),
_min_threads(0),
_max_threads(std::thread::hardware_concurrency()),
_max_queue_size(std::numeric_limits<int>::max()),
Expand Down Expand Up @@ -237,6 +253,7 @@ bool ThreadPoolToken::need_dispatch() {

ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_workload_group(builder._workload_group),
_min_threads(builder._min_threads),
_max_threads(builder._max_threads),
_max_queue_size(builder._max_queue_size),
Expand Down Expand Up @@ -269,10 +286,35 @@ Status ThreadPool::init() {
return status;
}
}

_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("thread_pool_{}", _name),
{{"thread_pool_name", _name}, {"workload_group", "system"}});

INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_running_tasks);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_thread_number);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_capacity);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);

_metric_entity->register_hook("update", [this]() {
thread_pool_running_tasks->set_value(num_active_threads());
thread_pool_queue_size->set_value(get_queue_size());
thread_pool_queue_capacity->set_value(get_max_queue_size());
thread_pool_max_thread_number->set_value(max_threads());
task_execution_time_ns_avg_in_last_1000_times->set_value(
_task_execution_time_ns_statistic.mean());
task_wait_worker_ns_avg_in_last_1000_times->set_value(
_task_wait_worker_time_ns_statistic.mean());
});

return Status::OK();
}

void ThreadPool::shutdown() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
Expand Down Expand Up @@ -372,6 +414,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
if (capacity_remaining < 1) {
thread_pool_submit_failed->increment(1);
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
Expand Down Expand Up @@ -527,12 +570,23 @@ void ThreadPool::dispatch_thread() {
continue;
}

MonotonicStopWatch task_execution_time_watch;
task_execution_time_watch.start();
// Get the next token and task to execute.
ThreadPoolToken* token = _queue.front();
_queue.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->_entries.empty());
Task task = std::move(token->_entries.front());
std::chrono::time_point<std::chrono::system_clock> current =
std::chrono::system_clock::now();
auto current_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(current.time_since_epoch())
.count();
auto submit_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
task.submit_time.time_since_epoch())
.count();
_task_wait_worker_time_ns_statistic.add(current_ns - submit_ns);
token->_entries.pop_front();
token->_active_threads++;
--_total_queued_tasks;
Expand All @@ -542,7 +596,6 @@ void ThreadPool::dispatch_thread() {

// Execute the task
task.runnable->run();

// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
Expand All @@ -552,6 +605,8 @@ void ThreadPool::dispatch_thread() {
task.runnable.reset();
l.lock();

_task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());

// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
Expand Down
32 changes: 31 additions & 1 deletion be/src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

#include "agent/cgroup_cpu_ctl.h"
#include "common/status.h"
#include "util/interval_histogram.h"
#include "util/metrics.h"
#include "util/work_thread_pool.hpp"

namespace doris {
Expand Down Expand Up @@ -100,7 +102,7 @@ class Runnable {
//
class ThreadPoolBuilder {
public:
explicit ThreadPoolBuilder(std::string name);
explicit ThreadPoolBuilder(std::string name, std::string workload_group = "");

// Note: We violate the style guide by returning mutable references here
// in order to provide traditional Builder pattern conveniences.
Expand Down Expand Up @@ -130,6 +132,7 @@ class ThreadPoolBuilder {
private:
friend class ThreadPool;
const std::string _name;
const std::string _workload_group;
int _min_threads;
int _max_threads;
int _max_queue_size;
Expand Down Expand Up @@ -227,36 +230,49 @@ class ThreadPool {
// Return the number of threads currently running (or in the process of starting up)
// for this thread pool.
int num_threads() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _num_threads + _num_threads_pending_start;
}

int max_threads() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _max_threads;
}

int min_threads() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _min_threads;
}

int num_threads_pending_start() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _num_threads_pending_start;
}

int num_active_threads() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _active_threads;
}

int get_queue_size() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _total_queued_tasks;
}

int get_max_queue_size() const {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
return _max_queue_size;
}

std::vector<int> debug_info() {
// TODO: Use RW lock or atomic for performance.
std::lock_guard<std::mutex> l(_lock);
std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads,
_max_threads};
Expand Down Expand Up @@ -300,6 +316,7 @@ class ThreadPool {
void release_token(ThreadPoolToken* t);

const std::string _name;
const std::string _workload_group;
int _min_threads;
int _max_threads;
const int _max_queue_size;
Expand Down Expand Up @@ -387,6 +404,19 @@ class ThreadPool {

ThreadPool(const ThreadPool&) = delete;
void operator=(const ThreadPool&) = delete;

std::shared_ptr<MetricEntity> _metric_entity;
IntGauge* thread_pool_running_tasks = nullptr;
IntGauge* thread_pool_queue_size = nullptr;
IntGauge* thread_pool_queue_capacity = nullptr;
IntGauge* thread_pool_max_thread_number = nullptr;
IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr;
IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr;

IntervalHistogramStat<int64_t> _task_execution_time_ns_statistic {1000};
IntervalHistogramStat<int64_t> _task_wait_worker_time_ns_statistic {1000};

IntAtomicCounter* thread_pool_submit_failed = nullptr;
};

// Entry point for token-based task submission and blocking for a particular
Expand Down
10 changes: 0 additions & 10 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,

scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand All @@ -171,11 +166,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
auto sumbit_task = [&]() {
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
auto work_func = [scanner_ref = scan_task, ctx]() {
DorisMetrics::instance()->scanner_task_queued->increment(-1);
DorisMetrics::instance()->scanner_task_running->increment(1);
Defer metrics_defer(
[&] { DorisMetrics::instance()->scanner_task_running->increment(-1); });

auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
Expand Down
Loading

0 comments on commit 5928144

Please sign in to comment.