diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index e539f4a440ab0c..9895a0138947be 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -604,12 +604,12 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCa INT_GAUGE_METRIC_REGISTER(_entity, cache_capacity); INT_GAUGE_METRIC_REGISTER(_entity, cache_usage); INT_GAUGE_METRIC_REGISTER(_entity, cache_element_count); - INT_DOUBLE_METRIC_REGISTER(_entity, cache_usage_ratio); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_lookup_count); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_hit_count); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_stampede_count); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_miss_count); - INT_DOUBLE_METRIC_REGISTER(_entity, cache_hit_ratio); + DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_usage_ratio); + INT_COUNTER_METRIC_REGISTER(_entity, cache_lookup_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_hit_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_stampede_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_miss_count); + DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_hit_ratio); _hit_count_bvar.reset(new bvar::Adder("doris_cache", _name)); _hit_count_per_second.reset(new bvar::PerSecond>( diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 303a4cf2065ef9..4a4b6ddd0054f3 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -447,10 +447,10 @@ class ShardedLRUCache : public Cache { IntGauge* cache_usage = nullptr; IntGauge* cache_element_count = nullptr; DoubleGauge* cache_usage_ratio = nullptr; - IntAtomicCounter* cache_lookup_count = nullptr; - IntAtomicCounter* cache_hit_count = nullptr; - IntAtomicCounter* cache_miss_count = nullptr; - IntAtomicCounter* cache_stampede_count = nullptr; + IntCounter* cache_lookup_count = nullptr; + IntCounter* cache_hit_count = nullptr; + IntCounter* cache_miss_count = nullptr; + IntCounter* cache_stampede_count = nullptr; DoubleGauge* cache_hit_ratio = nullptr; // bvars std::unique_ptr> _hit_count_bvar; diff --git a/be/src/runtime/workload_group/workload_group_metrics.cpp b/be/src/runtime/workload_group/workload_group_metrics.cpp index 18ff7aa2f4f185..0f7322b7feb448 100644 --- a/be/src/runtime/workload_group/workload_group_metrics.cpp +++ b/be/src/runtime/workload_group/workload_group_metrics.cpp @@ -36,32 +36,31 @@ WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) { _cpu_time_metric = std::make_unique( doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time_sec"); - _cpu_time_counter = - (IntAtomicCounter*)(_entity->register_metric(_cpu_time_metric.get())); + _cpu_time_counter = (IntCounter*)(_entity->register_metric(_cpu_time_metric.get())); _mem_used_bytes_metric = std::make_unique( doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_mem_used_bytes"); - _mem_used_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( - _mem_used_bytes_metric.get())); + _mem_used_bytes_counter = + (IntCounter*)(_entity->register_metric(_mem_used_bytes_metric.get())); _local_scan_bytes_metric = std::make_unique( doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_local_scan_bytes"); - _local_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( - _local_scan_bytes_metric.get())); + _local_scan_bytes_counter = + (IntCounter*)(_entity->register_metric(_local_scan_bytes_metric.get())); _remote_scan_bytes_metric = std::make_unique( doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_remote_scan_bytes"); - _remote_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric( - _remote_scan_bytes_metric.get())); + _remote_scan_bytes_counter = + (IntCounter*)(_entity->register_metric(_remote_scan_bytes_metric.get())); for (const auto& [key, io_throttle] : wg->_scan_io_throttle_map) { std::unique_ptr metric = std::make_unique( doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_local_scan_bytes_" + io_throttle->metric_name()); _local_scan_bytes_counter_map[key] = - (IntAtomicCounter*)(_entity->register_metric(metric.get())); + (IntCounter*)(_entity->register_metric(metric.get())); _local_scan_bytes_metric_map[key] = std::move(metric); } } diff --git a/be/src/runtime/workload_group/workload_group_metrics.h b/be/src/runtime/workload_group/workload_group_metrics.h index e68715df249dee..c761638d115439 100644 --- a/be/src/runtime/workload_group/workload_group_metrics.h +++ b/be/src/runtime/workload_group/workload_group_metrics.h @@ -28,7 +28,7 @@ class WorkloadGroup; template class AtomicCounter; -using IntAtomicCounter = AtomicCounter; +using IntCounter = AtomicCounter; class MetricEntity; struct MetricPrototype; @@ -65,11 +65,11 @@ class WorkloadGroupMetrics { // _local_disk_io_metric is every disk's IO std::map> _local_scan_bytes_metric_map; - IntAtomicCounter* _cpu_time_counter {nullptr}; // used for metric - IntAtomicCounter* _mem_used_bytes_counter {nullptr}; // used for metric - IntAtomicCounter* _local_scan_bytes_counter {nullptr}; // used for metric - IntAtomicCounter* _remote_scan_bytes_counter {nullptr}; // used for metric - std::map _local_scan_bytes_counter_map; // used for metric + IntCounter* _cpu_time_counter {nullptr}; // used for metric + IntCounter* _mem_used_bytes_counter {nullptr}; // used for metric + IntCounter* _local_scan_bytes_counter {nullptr}; // used for metric + IntCounter* _remote_scan_bytes_counter {nullptr}; // used for metric + std::map _local_scan_bytes_counter_map; // used for metric std::atomic _cpu_time_nanos {0}; std::atomic _last_cpu_time_nanos {0}; diff --git a/be/src/util/core_local.cpp b/be/src/util/core_local.cpp deleted file mode 100644 index 1c4b1dd04715b4..00000000000000 --- a/be/src/util/core_local.cpp +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/core_local.h" - -#include -#include -#include -#include - -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" -#include "util/spinlock.h" -#include "util/sse_util.hpp" - -namespace doris { - -constexpr int BLOCK_SIZE = 4096; -struct alignas(CACHE_LINE_SIZE) CoreDataBlock { - void* at(size_t offset) { return data + offset; } - char data[BLOCK_SIZE]; - - static void* operator new(size_t nbytes) { - void* p = nullptr; - if (posix_memalign(&p, alignof(CoreDataBlock), nbytes) == 0) { - return p; - } - throw std::bad_alloc(); - } - - static void operator delete(void* p) { free(p); } -}; - -template -class CoreDataAllocatorImpl : public CoreDataAllocator { -public: - virtual ~CoreDataAllocatorImpl(); - void* get_or_create(size_t id) override { - size_t block_id = id / ELEMENTS_PER_BLOCK; - { - std::lock_guard l(_lock); - if (block_id >= _blocks.size()) { - _blocks.resize(block_id + 1); - } - } - CoreDataBlock* block = _blocks[block_id]; - if (block == nullptr) { - std::lock_guard l(_lock); - block = _blocks[block_id]; - if (block == nullptr) { - block = new CoreDataBlock(); - _blocks[block_id] = block; - } - } - size_t offset = (id % ELEMENTS_PER_BLOCK) * ELEMENT_BYTES; - return block->at(offset); - } - -private: - static constexpr int ELEMENTS_PER_BLOCK = BLOCK_SIZE / ELEMENT_BYTES; - SpinLock _lock; // lock to protect the modification of _blocks - std::vector _blocks; -}; - -template -CoreDataAllocatorImpl::~CoreDataAllocatorImpl() { - for (auto block : _blocks) { - delete block; - } -} - -CoreDataAllocatorFactory* CoreDataAllocatorFactory::instance() { - static CoreDataAllocatorFactory _s_instance; - return &_s_instance; -} - -CoreDataAllocator* CoreDataAllocatorFactory::get_allocator(size_t cpu_idx, size_t data_bytes) { - std::lock_guard l(_lock); - auto pair = std::make_pair(cpu_idx, data_bytes); - auto it = _allocators.find(pair); - if (it != std::end(_allocators)) { - return it->second; - } - CoreDataAllocator* allocator = nullptr; - switch (data_bytes) { - case 1: - allocator = new CoreDataAllocatorImpl<1>(); - break; - case 2: - allocator = new CoreDataAllocatorImpl<2>(); - break; - case 3: - case 4: - allocator = new CoreDataAllocatorImpl<4>(); - break; - case 5: - case 6: - case 7: - case 8: - allocator = new CoreDataAllocatorImpl<8>(); - break; - default: - DCHECK(false) << "don't support core local value for this size, size=" << data_bytes; - } - _allocators.emplace(pair, allocator); - return allocator; -} - -CoreDataAllocatorFactory::~CoreDataAllocatorFactory() { - for (auto& it : _allocators) { - delete it.second; - } -} - -} // namespace doris diff --git a/be/src/util/core_local.h b/be/src/util/core_local.h deleted file mode 100644 index 1610ae5a0bb046..00000000000000 --- a/be/src/util/core_local.h +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "common/compiler_util.h" // IWYU pragma: keep - -namespace doris { - -class CoreDataAllocator { -public: - virtual ~CoreDataAllocator() {} - virtual void* get_or_create(size_t id) = 0; -}; - -class CoreDataAllocatorFactory { -public: - CoreDataAllocatorFactory() {} - ~CoreDataAllocatorFactory(); - CoreDataAllocator* get_allocator(size_t cpu_id, size_t data_bytes); - static CoreDataAllocatorFactory* instance(); - -private: - DISALLOW_COPY_AND_ASSIGN(CoreDataAllocatorFactory); - -private: - std::mutex _lock; - std::map, CoreDataAllocator*> _allocators; -}; - -template -class CoreLocalValueController { -public: - CoreLocalValueController() { - int num_cpus = static_cast(std::thread::hardware_concurrency()); - _size = 8; - while (_size < num_cpus) { - _size <<= 1; - } - _allocators.resize(_size, nullptr); - for (int i = 0; i < _size; ++i) { - _allocators[i] = CoreDataAllocatorFactory::instance()->get_allocator(i, sizeof(T)); - } - } - - ~CoreLocalValueController() {} - - int get_id() { - std::lock_guard l(_lock); - int id = 0; - if (_free_ids.empty()) { - id = _next_id++; - } else { - id = _free_ids.back(); - _free_ids.pop_back(); - } - return id; - } - void reclaim_id(int id) { - std::lock_guard l(_lock); - _free_ids.push_back(id); - } - size_t size() const { return _size; } - CoreDataAllocator* allocator(int i) const { return _allocators[i]; } - - static CoreLocalValueController* instance() { - static CoreLocalValueController _s_instance; - return &_s_instance; - } - -private: - DISALLOW_COPY_AND_ASSIGN(CoreLocalValueController); - -private: - std::mutex _lock; - int _next_id = 0; - std::deque _free_ids; - std::vector _allocators; - size_t _size; -}; - -template -class CoreLocalValue { -public: - CoreLocalValue(const T init_value = T()) { - CoreLocalValueController* controller = CoreLocalValueController::instance(); - _id = controller->get_id(); - _size = controller->size(); - _values.resize(_size, nullptr); - for (int i = 0; i < _size; ++i) { - void* ptr = controller->allocator(i)->get_or_create(_id); - _values[i] = new (ptr) T(init_value); - } - } - - ~CoreLocalValue() { - for (int i = 0; i < _size; ++i) { - _values[i]->~T(); - } - CoreLocalValueController::instance()->reclaim_id(_id); - } - - size_t size() const { return _size; } - T* access() const { -#ifdef __APPLE__ - size_t cpu_id = 0; -#else - size_t cpu_id = sched_getcpu(); -#endif - if (cpu_id >= _size) { - cpu_id &= _size - 1; - } - return access_at_core(cpu_id); - } - T* access_at_core(size_t core_idx) const { return _values[core_idx]; } - - inline void reset() { - for (int i = 0; i < _size; ++i) { - _values[i]->~T(); - } - _values.clear(); - _values.resize(_size, nullptr); - CoreLocalValueController* controller = CoreLocalValueController::instance(); - for (int i = 0; i < _size; ++i) { - void* ptr = controller->allocator(i)->get_or_create(_id); - _values[i] = new (ptr) T(); - } - } - -private: - int _id = -1; - size_t _size = 0; - std::vector _values; -}; - -} // namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e9d4f31e5ca137..e77ee1c36b6b89 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -311,17 +311,17 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_total); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_cache); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_remote); - - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); - INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_cache); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_remote); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); } void DorisMetrics::initialize(bool init_system_metrics, const std::set& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 31b907eec9ed6c..d089758c21c93f 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -236,17 +236,17 @@ class DorisMetrics { UIntGauge* group_local_scan_thread_pool_queue_size = nullptr; UIntGauge* group_local_scan_thread_pool_thread_num = nullptr; - IntAtomicCounter* num_io_bytes_read_total = nullptr; - IntAtomicCounter* num_io_bytes_read_from_cache = nullptr; - IntAtomicCounter* num_io_bytes_read_from_remote = nullptr; - - IntAtomicCounter* query_ctx_cnt = nullptr; - IntAtomicCounter* scanner_ctx_cnt = nullptr; - IntAtomicCounter* scanner_cnt = nullptr; - IntAtomicCounter* scanner_task_cnt = nullptr; - IntAtomicCounter* scanner_task_queued = nullptr; - IntAtomicCounter* scanner_task_submit_failed = nullptr; - IntAtomicCounter* scanner_task_running = nullptr; + IntCounter* num_io_bytes_read_total = nullptr; + IntCounter* num_io_bytes_read_from_cache = nullptr; + IntCounter* num_io_bytes_read_from_remote = nullptr; + + IntCounter* query_ctx_cnt = nullptr; + IntCounter* scanner_ctx_cnt = nullptr; + IntCounter* scanner_cnt = nullptr; + IntCounter* scanner_task_cnt = nullptr; + IntCounter* scanner_task_queued = nullptr; + IntCounter* scanner_task_submit_failed = nullptr; + IntCounter* scanner_task_running = nullptr; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index ac7e69a4ef8ab4..cb49884fefb60b 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -19,21 +19,17 @@ #include #include -#include -#include #include #include #include #include #include -#include #include #include #include #include -#include "util/core_local.h" #include "util/histogram.h" namespace doris { @@ -67,8 +63,8 @@ using Labels = std::unordered_map; class Metric { public: - Metric() {} - virtual ~Metric() {} + Metric() = default; + virtual ~Metric() = default; virtual std::string to_string() const = 0; virtual std::string to_prometheus(const std::string& display_name, const Labels& entity_labels, const Labels& metric_labels) const; @@ -83,7 +79,7 @@ template class AtomicMetric : public Metric { public: AtomicMetric() : _value(T()) {} - virtual ~AtomicMetric() {} + virtual ~AtomicMetric() = default; std::string to_string() const override { return std::to_string(value()); } @@ -101,81 +97,10 @@ class AtomicMetric : public Metric { std::atomic _value; }; -template -class LockSimpleMetric : public Metric { -public: - LockSimpleMetric() : _value(T()) {} - virtual ~LockSimpleMetric() {} - - std::string to_string() const override { return std::to_string(value()); } - - T value() const { - std::lock_guard l(_lock); - return _value; - } - - void increment(const T& delta) { - std::lock_guard l(this->_lock); - _value += delta; - } - - void set_value(const T& value) { - std::lock_guard l(this->_lock); - _value = value; - } - - rj::Value to_json_value(rj::Document::AllocatorType& allocator) const override { - return rj::Value(value()); - } - -protected: - // We use std::mutex instead of std::atomic is because atomic don't support - // double's fetch_add - // TODO(zc): If this is atomic is bottleneck, we change to thread local. - // performance: on Intel(R) Xeon(R) CPU E5-2450 int64_t - // original type: 2ns/op - // single thread std::mutex: 26ns/op - // multiple thread(8) std::mutex: 2500ns/op - mutable std::mutex _lock; - T _value; -}; - -template -class CoreLocalCounter : public Metric { -public: - CoreLocalCounter() {} - virtual ~CoreLocalCounter() {} - - std::string to_string() const override { - std::stringstream ss; - ss << value(); - return ss.str(); - } - - T value() const { - T sum = 0; - for (int i = 0; i < _value.size(); ++i) { - sum += *_value.access_at_core(i); - } - return sum; - } - - void increment(const T& delta) { __sync_fetch_and_add(_value.access(), delta); } - - void reset() { _value.reset(); } - - rj::Value to_json_value(rj::Document::AllocatorType& allocator) const override { - return rj::Value(value()); - } - -protected: - CoreLocalValue _value; -}; - class HistogramMetric : public Metric { public: - HistogramMetric() {} - virtual ~HistogramMetric() {} + HistogramMetric() = default; + virtual ~HistogramMetric() = default; HistogramMetric(const HistogramMetric&) = delete; HistogramMetric& operator=(const HistogramMetric&) = delete; @@ -208,41 +133,25 @@ class HistogramMetric : public Metric { template class AtomicCounter : public AtomicMetric { public: - AtomicCounter() {} - virtual ~AtomicCounter() {} + AtomicCounter() = default; + virtual ~AtomicCounter() = default; }; template class AtomicGauge : public AtomicMetric { public: AtomicGauge() : AtomicMetric() {} - virtual ~AtomicGauge() {} -}; - -template -class LockCounter : public LockSimpleMetric { -public: - LockCounter() : LockSimpleMetric() {} - virtual ~LockCounter() {} -}; - -// This can only used for trival type -template -class LockGauge : public LockSimpleMetric { -public: - LockGauge() : LockSimpleMetric() {} - virtual ~LockGauge() {} + virtual ~AtomicGauge() = default; }; -using IntCounter = CoreLocalCounter; -using IntAtomicCounter = AtomicCounter; -using UIntCounter = CoreLocalCounter; -using DoubleCounter = LockCounter; +using IntCounter = AtomicCounter; +using UIntCounter = AtomicCounter; +using DoubleCounter = AtomicCounter; using IntGauge = AtomicGauge; using UIntGauge = AtomicGauge; -using DoubleGauge = LockGauge; - +using DoubleGauge = AtomicGauge; using Labels = std::unordered_map; + struct MetricPrototype { public: MetricPrototype(MetricType type_, MetricUnit unit_, std::string name_, @@ -302,15 +211,12 @@ struct MetricPrototype { #define INT_GAUGE_METRIC_REGISTER(entity, metric) \ metric = (IntGauge*)(entity->register_metric(&METRIC_##metric)) -#define INT_DOUBLE_METRIC_REGISTER(entity, metric) \ +#define DOUBLE_GAUGE_METRIC_REGISTER(entity, metric) \ metric = (DoubleGauge*)(entity->register_metric(&METRIC_##metric)) #define INT_UGAUGE_METRIC_REGISTER(entity, metric) \ metric = (UIntGauge*)(entity->register_metric(&METRIC_##metric)) -#define INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, metric) \ - metric = (IntAtomicCounter*)(entity->register_metric(&METRIC_##metric)) - #define HISTOGRAM_METRIC_REGISTER(entity, metric) \ metric = (HistogramMetric*)(entity->register_metric(&METRIC_##metric)) @@ -338,8 +244,8 @@ enum class MetricEntityType { kServer, kTablet }; class MetricEntity { public: - MetricEntity(MetricEntityType type, const std::string& name, const Labels& labels) - : _type(type), _name(name), _labels(labels) {} + MetricEntity(MetricEntityType type, std::string name, Labels labels) + : _type(type), _name(std::move(name)), _labels(std::move(labels)) {} ~MetricEntity() { for (auto& metric : _metrics) { delete metric.second; @@ -401,7 +307,7 @@ using EntityMetricsByType = class MetricRegistry { public: - MetricRegistry(const std::string& name) : _name(name) {} + MetricRegistry(std::string name) : _name(std::move(name)) {} ~MetricRegistry(); std::shared_ptr register_entity( diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index fc2cdcc9262b31..ecbb4d580360c4 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -44,12 +44,12 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(avail_cpu_num, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(host_cpu_num, MetricUnit::NOUNIT); struct CpuNumberMetrics { CpuNumberMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, host_cpu_num); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, avail_cpu_num); + INT_COUNTER_METRIC_REGISTER(entity, host_cpu_num); + INT_COUNTER_METRIC_REGISTER(entity, avail_cpu_num); } - IntAtomicCounter* host_cpu_num {nullptr}; - IntAtomicCounter* avail_cpu_num {nullptr}; + IntCounter* host_cpu_num {nullptr}; + IntCounter* avail_cpu_num {nullptr}; MetricEntity* entity = nullptr; }; @@ -70,16 +70,16 @@ DEFINE_CPU_COUNTER_METRIC(guest_nice); // /proc/stat: http://www.linuxhowtos.org/System/procstat.htm struct CpuMetrics { CpuMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_user); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_nice); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_system); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_idle); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_iowait); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_irq); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_soft_irq); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_steal); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest_nice); + INT_COUNTER_METRIC_REGISTER(entity, cpu_user); + INT_COUNTER_METRIC_REGISTER(entity, cpu_nice); + INT_COUNTER_METRIC_REGISTER(entity, cpu_system); + INT_COUNTER_METRIC_REGISTER(entity, cpu_idle); + INT_COUNTER_METRIC_REGISTER(entity, cpu_iowait); + INT_COUNTER_METRIC_REGISTER(entity, cpu_irq); + INT_COUNTER_METRIC_REGISTER(entity, cpu_soft_irq); + INT_COUNTER_METRIC_REGISTER(entity, cpu_steal); + INT_COUNTER_METRIC_REGISTER(entity, cpu_guest); + INT_COUNTER_METRIC_REGISTER(entity, cpu_guest_nice); metrics[0] = cpu_user; metrics[1] = cpu_nice; @@ -96,18 +96,18 @@ struct CpuMetrics { static constexpr int cpu_num_metrics = 10; MetricEntity* entity = nullptr; - IntAtomicCounter* cpu_user; - IntAtomicCounter* cpu_nice; - IntAtomicCounter* cpu_system; - IntAtomicCounter* cpu_idle; - IntAtomicCounter* cpu_iowait; - IntAtomicCounter* cpu_irq; - IntAtomicCounter* cpu_soft_irq; - IntAtomicCounter* cpu_steal; - IntAtomicCounter* cpu_guest; - IntAtomicCounter* cpu_guest_nice; - - IntAtomicCounter* metrics[cpu_num_metrics]; + IntCounter* cpu_user; + IntCounter* cpu_nice; + IntCounter* cpu_system; + IntCounter* cpu_idle; + IntCounter* cpu_iowait; + IntCounter* cpu_irq; + IntCounter* cpu_soft_irq; + IntCounter* cpu_steal; + IntCounter* cpu_guest; + IntCounter* cpu_guest_nice; + + IntCounter* metrics[cpu_num_metrics]; }; #define DEFINE_MEMORY_GAUGE_METRIC(metric, unit) \ @@ -216,25 +216,25 @@ DEFINE_DISK_COUNTER_METRIC(io_time_weigthed, MetricUnit::MILLISECONDS); struct DiskMetrics { DiskMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_reads_completed); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_read); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_read_time_ms); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_writes_completed); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_written); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_write_time_ms); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_ms); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_weigthed); + INT_COUNTER_METRIC_REGISTER(entity, disk_reads_completed); + INT_COUNTER_METRIC_REGISTER(entity, disk_bytes_read); + INT_COUNTER_METRIC_REGISTER(entity, disk_read_time_ms); + INT_COUNTER_METRIC_REGISTER(entity, disk_writes_completed); + INT_COUNTER_METRIC_REGISTER(entity, disk_bytes_written); + INT_COUNTER_METRIC_REGISTER(entity, disk_write_time_ms); + INT_COUNTER_METRIC_REGISTER(entity, disk_io_time_ms); + INT_COUNTER_METRIC_REGISTER(entity, disk_io_time_weigthed); } MetricEntity* entity = nullptr; - IntAtomicCounter* disk_reads_completed; - IntAtomicCounter* disk_bytes_read; - IntAtomicCounter* disk_read_time_ms; - IntAtomicCounter* disk_writes_completed; - IntAtomicCounter* disk_bytes_written; - IntAtomicCounter* disk_write_time_ms; - IntAtomicCounter* disk_io_time_ms; - IntAtomicCounter* disk_io_time_weigthed; + IntCounter* disk_reads_completed; + IntCounter* disk_bytes_read; + IntCounter* disk_read_time_ms; + IntCounter* disk_writes_completed; + IntCounter* disk_bytes_written; + IntCounter* disk_write_time_ms; + IntCounter* disk_io_time_ms; + IntCounter* disk_io_time_weigthed; }; #define DEFINE_NETWORK_COUNTER_METRIC(metric, unit) \ @@ -246,17 +246,17 @@ DEFINE_NETWORK_COUNTER_METRIC(send_packets, MetricUnit::PACKETS); struct NetworkMetrics { NetworkMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_bytes); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_packets); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_bytes); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_packets); + INT_COUNTER_METRIC_REGISTER(entity, network_receive_bytes); + INT_COUNTER_METRIC_REGISTER(entity, network_receive_packets); + INT_COUNTER_METRIC_REGISTER(entity, network_send_bytes); + INT_COUNTER_METRIC_REGISTER(entity, network_send_packets); } MetricEntity* entity = nullptr; - IntAtomicCounter* network_receive_bytes; - IntAtomicCounter* network_receive_packets; - IntAtomicCounter* network_send_bytes; - IntAtomicCounter* network_send_packets; + IntCounter* network_receive_bytes; + IntCounter* network_receive_packets; + IntCounter* network_send_bytes; + IntCounter* network_send_packets; }; #define DEFINE_SNMP_COUNTER_METRIC(metric, unit, desc) \ @@ -270,17 +270,17 @@ DEFINE_SNMP_COUNTER_METRIC(tcp_out_segs, MetricUnit::NOUNIT, "All send TCP packe // metrics read from /proc/net/snmp struct SnmpMetrics { SnmpMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_errs); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_retrans_segs); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_segs); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_out_segs); + INT_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_errs); + INT_COUNTER_METRIC_REGISTER(entity, snmp_tcp_retrans_segs); + INT_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_segs); + INT_COUNTER_METRIC_REGISTER(entity, snmp_tcp_out_segs); } MetricEntity* entity = nullptr; - IntAtomicCounter* snmp_tcp_in_errs; - IntAtomicCounter* snmp_tcp_retrans_segs; - IntAtomicCounter* snmp_tcp_in_segs; - IntAtomicCounter* snmp_tcp_out_segs; + IntCounter* snmp_tcp_in_errs; + IntCounter* snmp_tcp_retrans_segs; + IntCounter* snmp_tcp_in_segs; + IntCounter* snmp_tcp_out_segs; }; #define DEFINE_FD_COUNTER_METRIC(metric, unit) \ @@ -308,9 +308,9 @@ DEFINE_LOAD_AVERAGE_DOUBLE_METRIC(15_minutes); struct LoadAverageMetrics { LoadAverageMetrics(MetricEntity* ent) : entity(ent) { - INT_DOUBLE_METRIC_REGISTER(entity, load_average_1_minutes); - INT_DOUBLE_METRIC_REGISTER(entity, load_average_5_minutes); - INT_DOUBLE_METRIC_REGISTER(entity, load_average_15_minutes); + DOUBLE_GAUGE_METRIC_REGISTER(entity, load_average_1_minutes); + DOUBLE_GAUGE_METRIC_REGISTER(entity, load_average_5_minutes); + DOUBLE_GAUGE_METRIC_REGISTER(entity, load_average_15_minutes); } MetricEntity* entity = nullptr; @@ -329,18 +329,18 @@ DEFINE_PROC_STAT_COUNTER_METRIC(procs_blocked); struct ProcMetrics { ProcMetrics(MetricEntity* ent) : entity(ent) { - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, proc_interrupt); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, proc_ctxt_switch); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, proc_procs_running); - INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, proc_procs_blocked); + INT_COUNTER_METRIC_REGISTER(entity, proc_interrupt); + INT_COUNTER_METRIC_REGISTER(entity, proc_ctxt_switch); + INT_COUNTER_METRIC_REGISTER(entity, proc_procs_running); + INT_COUNTER_METRIC_REGISTER(entity, proc_procs_blocked); } MetricEntity* entity = nullptr; - IntAtomicCounter* proc_interrupt; - IntAtomicCounter* proc_ctxt_switch; - IntAtomicCounter* proc_procs_running; - IntAtomicCounter* proc_procs_blocked; + IntCounter* proc_interrupt; + IntCounter* proc_ctxt_switch; + IntCounter* proc_procs_running; + IntCounter* proc_procs_blocked; }; DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(max_disk_io_util_percent, MetricUnit::PERCENT); diff --git a/be/test/util/core_local_test.cpp b/be/test/util/core_local_test.cpp deleted file mode 100644 index ed87015b189e1c..00000000000000 --- a/be/test/util/core_local_test.cpp +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/core_local.h" - -#include -#include -#include -#include - -#include -#include - -#include "common/logging.h" -#include "gtest/gtest_pred_impl.h" -#include "testutil/test_util.h" -#include "util/stopwatch.hpp" - -namespace doris { - -// Fixture for testing class Decompressor -class CoreLocalTest : public ::testing::Test { -protected: - CoreLocalTest() {} - ~CoreLocalTest() {} -}; - -void updater(int64_t loop, CoreLocalValue* value, int64_t* used_ns) { - usleep(100); - MonotonicStopWatch stopwatch; - stopwatch.start(); - for (int i = 0; i < loop; ++i) { - __sync_fetch_and_add(value->access(), 1); - } - *used_ns = stopwatch.elapsed_time(); -} - -TEST_F(CoreLocalTest, CoreLocalValue) { - int64_t loop = LOOP_LESS_OR_MORE(1000, 1000000L); - CoreLocalValue value; - std::vector used_ns; - used_ns.resize(8); - std::vector workers; - for (int i = 0; i < 8; ++i) { - workers.emplace_back(updater, loop, &value, &used_ns[i]); - } - int64_t sum_ns = 0; - for (int i = 0; i < 8; ++i) { - workers[i].join(); - sum_ns += used_ns[i]; - } - int64_t sum = 0; - for (int i = 0; i < value.size(); ++i) { - sum += __sync_fetch_and_add(value.access_at_core(i), 0); - } - EXPECT_EQ(8 * loop, sum); - LOG(INFO) << "time:" << sum_ns / sum << "ns/op"; -} - -TEST_F(CoreLocalTest, CoreDataAllocator) { - CoreDataAllocatorFactory factory; - auto allocator1 = factory.get_allocator(1, 8); - auto ptr = allocator1->get_or_create(0); - EXPECT_TRUE(ptr != nullptr); - { - auto ptr2 = allocator1->get_or_create(0); - EXPECT_TRUE(ptr == ptr2); - } - { - auto ptr2 = allocator1->get_or_create(4096); - EXPECT_TRUE(ptr2 != nullptr); - } - { - auto allocator2 = factory.get_allocator(2, 8); - EXPECT_TRUE(allocator2 != allocator1); - } -} - -TEST_F(CoreLocalTest, CoreLocalValueController) { - CoreLocalValueController controller; - auto id = controller.get_id(); - EXPECT_EQ(0, id); - controller.reclaim_id(id); - id = controller.get_id(); - EXPECT_EQ(0, id); - id = controller.get_id(); - EXPECT_EQ(1, id); -} - -TEST_F(CoreLocalTest, CoreLocalValueNormal) { - CoreLocalValue value; - for (int i = 0; i < value.size(); ++i) { - EXPECT_EQ(0, *value.access_at_core(i)); - *value.access_at_core(i) += 1; - } - for (int i = 0; i < value.size(); ++i) { - EXPECT_EQ(1, *value.access_at_core(i)); - } - for (int i = 0; i < 10000; ++i) { - *value.access() += 1; - } - int64_t sum = 0; - for (int i = 0; i < value.size(); ++i) { - sum += *value.access_at_core(i); - } - EXPECT_EQ(10000 + value.size(), sum); -} -} // namespace doris diff --git a/be/test/util/doris_metrics_test.cpp b/be/test/util/doris_metrics_test.cpp index dcba57cb7e9ff2..6e9969b1210345 100644 --- a/be/test/util/doris_metrics_test.cpp +++ b/be/test/util/doris_metrics_test.cpp @@ -34,14 +34,14 @@ TEST_F(DorisMetricsTest, Normal) { auto server_entity = DorisMetrics::instance()->server_entity(); // check metric { - DorisMetrics::instance()->fragment_requests_total->reset(); + DorisMetrics::instance()->fragment_requests_total->set_value(0); DorisMetrics::instance()->fragment_requests_total->increment(12); auto metric = server_entity->get_metric("fragment_requests_total"); EXPECT_TRUE(metric != nullptr); EXPECT_STREQ("12", metric->to_string().c_str()); } { - DorisMetrics::instance()->fragment_request_duration_us->reset(); + DorisMetrics::instance()->fragment_request_duration_us->set_value(0); DorisMetrics::instance()->fragment_request_duration_us->increment(101); auto metric = server_entity->get_metric("fragment_request_duration_us"); EXPECT_TRUE(metric != nullptr); @@ -92,7 +92,7 @@ TEST_F(DorisMetricsTest, Normal) { } // engine request { - DorisMetrics::instance()->create_tablet_requests_total->reset(); + DorisMetrics::instance()->create_tablet_requests_total->set_value(0); DorisMetrics::instance()->create_tablet_requests_total->increment(15); auto metric = server_entity->get_metric("create_tablet_requests_total", "engine_requests_total"); @@ -100,7 +100,7 @@ TEST_F(DorisMetricsTest, Normal) { EXPECT_STREQ("15", metric->to_string().c_str()); } { - DorisMetrics::instance()->drop_tablet_requests_total->reset(); + DorisMetrics::instance()->drop_tablet_requests_total->set_value(0); DorisMetrics::instance()->drop_tablet_requests_total->increment(16); auto metric = server_entity->get_metric("drop_tablet_requests_total", "engine_requests_total"); @@ -129,7 +129,7 @@ TEST_F(DorisMetricsTest, Normal) { EXPECT_STREQ("20", metric->to_string().c_str()); } { - DorisMetrics::instance()->storage_migrate_requests_total->reset(); + DorisMetrics::instance()->storage_migrate_requests_total->set_value(0); DorisMetrics::instance()->storage_migrate_requests_total->increment(21); auto metric = server_entity->get_metric("storage_migrate_requests_total", "engine_requests_total"); diff --git a/be/test/util/metrics_test.cpp b/be/test/util/metrics_test.cpp index 305d17c47ca06f..1703b5b42bd7b4 100644 --- a/be/test/util/metrics_test.cpp +++ b/be/test/util/metrics_test.cpp @@ -46,7 +46,7 @@ TEST_F(MetricsTest, Counter) { EXPECT_STREQ("100", counter.to_string().c_str()); } { - IntAtomicCounter counter; + IntCounter counter; EXPECT_EQ(0, counter.value()); counter.increment(100); EXPECT_EQ(100, counter.value()); @@ -99,7 +99,7 @@ TEST_F(MetricsTest, CounterPerf) { } // IntAtomicCounter { - IntAtomicCounter counter; + IntCounter counter; MonotonicStopWatch watch; watch.start(); for (int i = 0; i < kLoopCount; ++i) { @@ -141,11 +141,11 @@ TEST_F(MetricsTest, CounterPerf) { } // multi-thread for IntAtomicCounter { - IntAtomicCounter mt_counter; + IntCounter mt_counter; std::vector updaters; std::atomic used_time(0); for (int i = 0; i < 8; ++i) { - updaters.emplace_back(&mt_updater, kThreadLoopCount, &mt_counter, + updaters.emplace_back(&mt_updater, kThreadLoopCount, &mt_counter, &used_time); } for (int i = 0; i < 8; ++i) {