Skip to content

Commit

Permalink
NED
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 24, 2024
1 parent 2bfab8f commit d7e5f63
Show file tree
Hide file tree
Showing 16 changed files with 388 additions and 61 deletions.
3 changes: 0 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ class ExecEnv {
Status _init_mem_env();
Status _check_deploy_mode();

void _register_metrics();
void _deregister_metrics();

inline static std::atomic_bool _s_ready {false};
inline static std::atomic_bool _s_tracking_memory {false};
std::vector<StorePath> _store_paths;
Expand Down
17 changes: 0 additions & 17 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);

Expand Down Expand Up @@ -336,7 +335,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
RETURN_IF_ERROR(_wal_manager->init());
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();

_tablet_schema_cache =
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
Expand Down Expand Up @@ -668,20 +666,6 @@ Status ExecEnv::_check_deploy_mode() {
return Status::OK();
}

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return _send_batch_thread_pool->num_threads(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return _send_batch_thread_pool->get_queue_size(); });
}

void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}

// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
// We need to stop all threads before releasing resource.
void ExecEnv::destroy() {
Expand Down Expand Up @@ -736,7 +720,6 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_s3_file_system_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);

_deregister_metrics();
SAFE_DELETE(_load_channel_mgr);

SAFE_DELETE(_spill_stream_mgr);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
Expand All @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name,
cg_cpu_ctl_ptr);
cg_cpu_ctl_ptr, wg_name);
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT);

const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
Expand Down Expand Up @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand Down
4 changes: 0 additions & 4 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,7 @@ class DorisMetrics {
UIntGauge* query_cache_sql_total_count = nullptr;
UIntGauge* query_cache_partition_total_count = nullptr;

UIntGauge* scanner_thread_pool_queue_size = nullptr;
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_num_active_threads = nullptr;

Expand Down Expand Up @@ -273,7 +270,6 @@ class DorisMetrics {
void _update_process_thread_num();
void _update_process_fd_num();

private:
static const std::string _s_registry_name;
static const std::string _s_hook_name;

Expand Down
80 changes: 80 additions & 0 deletions be/src/util/interval_histogram.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/interval_histogram.h"

#include <algorithm>
#include <mutex>
#include <numeric>
#include <vector>

#include "gutil/integral_types.h"

namespace doris {

template <typename T>
IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {}

template <typename T>
void IntervalHistogramStat<T>::add(T value) {
std::unique_lock<std::shared_mutex> lock(mutex);
if (window.full()) {
window.pop_front();
}
window.push_back(value);
}

template <typename T>
T IntervalHistogramStat<T>::mean() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}
T sum = std::accumulate(window.begin(), window.end(), T());
return sum / window.size();
}

template <typename T>
T IntervalHistogramStat<T>::median() {
std::shared_lock<std::shared_mutex> lock(mutex);
if (window.empty()) {
return T();
}

std::vector<T> sorted(window.begin(), window.end());
std::sort(sorted.begin(), sorted.end());

size_t mid = sorted.size() / 2;
return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid];
}

template <typename T>
T IntervalHistogramStat<T>::max() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::max_element(window.begin(), window.end());
}

template <typename T>
T IntervalHistogramStat<T>::min() {
std::shared_lock<std::shared_mutex> lock(mutex);
return *std::min_element(window.begin(), window.end());
}

template class doris::IntervalHistogramStat<int64>;
template class doris::IntervalHistogramStat<int32>;

} // namespace doris
46 changes: 46 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <boost/circular_buffer.hpp>
#include <shared_mutex>

namespace doris {

// A thread-safe interval histogram stat class.
// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide
// statistics like mean, median, max, min.

template <typename T>
class IntervalHistogramStat {
public:
explicit IntervalHistogramStat(size_t N);

void add(T value);

T mean();
T median();
T max();
T min();

private:
boost::circular_buffer<T> window;
mutable std::shared_mutex mutex;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/util/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const {
std::string HistogramMetric::to_prometheus(const std::string& display_name,
const Labels& entity_labels,
const Labels& metric_labels) const {
// TODO: Use std::string concate for better performance.
std::stringstream ss;
for (const auto& percentile : _s_output_percentiles) {
auto quantile_lable = Labels({{"quantile", percentile.first}});
Expand Down
Loading

0 comments on commit d7e5f63

Please sign in to comment.