Skip to content

Commit

Permalink
DEV
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 20, 2024
1 parent b252c0f commit 47474ba
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 43 deletions.
7 changes: 7 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ std::vector<TFrontendInfo> ExecEnv::get_frontends() {
return infos;
}

doris::DorisMetrics* ExecEnv::doris_metrics() {
if (_doris_metrics == nullptr) {
_doris_metrics = std::unique_ptr<DorisMetrics>(DorisMetrics::create());
}
return _doris_metrics.get();
}

void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) {
std::lock_guard<std::mutex> lg(_frontends_lock);

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MemoryPool;
}

namespace doris {
class DorisMetrics;
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
Expand Down Expand Up @@ -349,6 +350,8 @@ class ExecEnv {

bool check_auth_token(const std::string& auth_token);

DorisMetrics* doris_metrics();

private:
ExecEnv();

Expand Down Expand Up @@ -488,6 +491,8 @@ class ExecEnv {

orc::MemoryPool* _orc_memory_pool = nullptr;
arrow::MemoryPool* _arrow_memory_pool = nullptr;

std::unique_ptr<doris::DorisMetrics> _doris_metrics;
};

template <>
Expand Down
9 changes: 7 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);

Expand Down Expand Up @@ -204,6 +203,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
spill_path.path, spill_path.capacity_bytes,
spill_path.storage_medium));
}

if (_doris_metrics == nullptr) {
_doris_metrics = std::unique_ptr<DorisMetrics>(DorisMetrics::create());
}

init_doris_metrics(store_paths);
_store_paths = store_paths;
_tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
Expand Down Expand Up @@ -677,7 +681,6 @@ void ExecEnv::_register_metrics() {
}

void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
Expand Down Expand Up @@ -825,6 +828,8 @@ void ExecEnv::destroy() {
SAFE_DELETE(_process_profile);
SAFE_DELETE(_heap_profiler);

_doris_metrics.reset();

_s_tracking_memory = false;

LOG(INFO) << "Doris exec envorinment is destoried.";
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
Expand All @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
Expand Down Expand Up @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
19 changes: 9 additions & 10 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "runtime/exec_env.h"
#include "util/jvm_metrics.h"
#include "util/metrics.h"
#include "util/system_metrics.h"
Expand All @@ -47,6 +48,14 @@ namespace doris {

class DorisMetrics {
public:
static DorisMetrics* create() {
// Life cycle if DorisMetrics will be managed by ExecEnv
static DorisMetrics* metrics = new DorisMetrics;
return metrics;
}

static DorisMetrics* instance() { return ExecEnv::GetInstance()->doris_metrics(); }

IntCounter* fragment_requests_total = nullptr;
IntCounter* fragment_request_duration_us = nullptr;
IntCounter* query_scan_bytes = nullptr;
Expand Down Expand Up @@ -197,7 +206,6 @@ class DorisMetrics {
UIntGauge* query_cache_sql_total_count = nullptr;
UIntGauge* query_cache_partition_total_count = nullptr;

UIntGauge* scanner_thread_pool_queue_size = nullptr;
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
Expand Down Expand Up @@ -244,14 +252,6 @@ class DorisMetrics {
IntAtomicCounter* scanner_ctx_cnt = nullptr;
IntAtomicCounter* scanner_cnt = nullptr;
IntAtomicCounter* scanner_task_cnt = nullptr;
IntAtomicCounter* scanner_task_queued = nullptr;
IntAtomicCounter* scanner_task_submit_failed = nullptr;
IntAtomicCounter* scanner_task_running = nullptr;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
}

// not thread-safe, call before calling metrics
void initialize(
Expand All @@ -273,7 +273,6 @@ class DorisMetrics {
void _update_process_thread_num();
void _update_process_fd_num();

private:
static const std::string _s_registry_name;
static const std::string _s_hook_name;

Expand Down
80 changes: 80 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
#include <boost/circular_buffer.hpp>
#include <mutex>
#include <numeric>
#include <shared_mutex>
#include <vector>

namespace doris {

template <typename T>
class IntervalHistogramStat {
public:
IntervalHistogramStat(size_t N) : window(N) {}

void add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);

if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

T mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}

T median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

T max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

T min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}

private:
boost::circular_buffer<T> window;
std::shared_mutex mutex;
};
} // namespace doris
59 changes: 57 additions & 2 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@
#include "gutil/port.h"
#include "gutil/strings/substitute.h"
#include "util/debug/sanitizer_scopes.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"

namespace doris {

// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_running_tasks, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_capacity, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_thread_number, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
MetricUnit::NANOSECONDS);

using namespace ErrorCode;

using std::string;
Expand All @@ -51,8 +66,9 @@ class FunctionRunnable : public Runnable {
std::function<void()> _func;
};

ThreadPoolBuilder::ThreadPoolBuilder(string name)
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
: _name(std::move(name)),
_workload_group(std::move(workload_group)),
_min_threads(0),
_max_threads(std::thread::hardware_concurrency()),
_max_queue_size(std::numeric_limits<int>::max()),
Expand Down Expand Up @@ -237,6 +253,7 @@ bool ThreadPoolToken::need_dispatch() {

ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_workload_group(builder._workload_group),
_min_threads(builder._min_threads),
_max_threads(builder._max_threads),
_max_queue_size(builder._max_queue_size),
Expand Down Expand Up @@ -269,10 +286,35 @@ Status ThreadPool::init() {
return status;
}
}

_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("thread_pool_{}", _name),
{{"thread_pool_name", _name}, {"workload_group", _workload_group}});

INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_running_tasks);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_thread_number);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_capacity);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times);
INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);

_metric_entity->register_hook("update", [this]() {
thread_pool_running_tasks->set_value(num_active_threads());
thread_pool_queue_size->set_value(get_queue_size());
thread_pool_queue_capacity->set_value(get_max_queue_size());
thread_pool_max_thread_number->set_value(max_threads());
task_execution_time_ns_avg_in_last_1000_times->set_value(
_task_execution_time_ns_statistic.mean());
task_wait_worker_ns_avg_in_last_1000_times->set_value(
_task_wait_worker_time_ns_statistic.mean());
});

return Status::OK();
}

void ThreadPool::shutdown() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
Expand Down Expand Up @@ -372,6 +414,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
if (capacity_remaining < 1) {
thread_pool_submit_failed->increment(1);
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
Expand Down Expand Up @@ -527,12 +570,23 @@ void ThreadPool::dispatch_thread() {
continue;
}

MonotonicStopWatch task_execution_time_watch;
task_execution_time_watch.start();
// Get the next token and task to execute.
ThreadPoolToken* token = _queue.front();
_queue.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->_entries.empty());
Task task = std::move(token->_entries.front());
std::chrono::time_point<std::chrono::system_clock> current =
std::chrono::system_clock::now();
auto current_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(current.time_since_epoch())
.count();
auto submit_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
task.submit_time.time_since_epoch())
.count();
_task_wait_worker_time_ns_statistic.add(current_ns - submit_ns);
token->_entries.pop_front();
token->_active_threads++;
--_total_queued_tasks;
Expand All @@ -542,7 +596,6 @@ void ThreadPool::dispatch_thread() {

// Execute the task
task.runnable->run();

// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
Expand All @@ -552,6 +605,8 @@ void ThreadPool::dispatch_thread() {
task.runnable.reset();
l.lock();

_task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());

// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
Expand Down
Loading

0 comments on commit 47474ba

Please sign in to comment.