From a3179ceb629e883248d5283272e63bee6e84c237 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 17 Apr 2024 15:18:07 +0800 Subject: [PATCH] 1 --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/runtime/exec_env.h | 4 + be/src/runtime/exec_env_init.cpp | 2 + be/src/runtime/memory/mem_tracker_limiter.cpp | 98 ++++++++++++++++++- be/src/runtime/memory/mem_tracker_limiter.h | 17 ++++ be/src/runtime/thread_context.h | 2 +- be/src/service/point_query_executor.cpp | 11 +-- be/src/service/point_query_executor.h | 1 - be/src/util/block_compression.cpp | 6 ++ be/src/vec/common/allocator.cpp | 14 +++ be/src/vec/common/allocator.h | 22 +++++ be/src/vec/common/pod_array_fwd.h | 7 +- 13 files changed, 171 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 603b1739062f19..8b304a0e7825db 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1059,6 +1059,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf"); DEFINE_mString(get_stack_trace_tool, "libunwind"); DEFINE_mString(dwarf_location_info_mode, "FAST"); +DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false"); // the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 2aca6a31fbb918..de4c148ee8115c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1110,6 +1110,7 @@ DECLARE_mString(kerberos_krb5_conf_path); // Values include `none`, `glog`, `boost`, `glibc`, `libunwind` DECLARE_mString(get_stack_trace_tool); +DECLARE_mBool(enable_address_sanitizers_with_stack_trace); // DISABLED: Don't resolve location info. // FAST: Perform CU lookup using .debug_aranges (might be incomplete). diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 24a7705b52b956..5d03bca2bb980c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -178,6 +178,9 @@ class ExecEnv { std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr point_query_executor_mem_tracker() { + return _point_query_executor_mem_tracker; + } std::shared_ptr rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; } @@ -348,6 +351,7 @@ class ExecEnv { std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; + std::shared_ptr _point_query_executor_mem_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr _rowid_storage_reader_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 37cd51d1ca41dc..d90a92038d110b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -538,6 +538,8 @@ void ExecEnv::init_mem_tracker() { std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); + _point_query_executor_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); _subcolumns_tree_tracker = diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 2218bed6959ddc..d3f6a488e9d812 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -38,6 +38,12 @@ #include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#ifdef USE_JEMALLOC +#include "jemalloc/jemalloc.h" +#else +#include +#endif +#include "util/stack_util.h" namespace doris { @@ -99,7 +105,7 @@ std::shared_ptr MemTrackerLimiter::create_shared(MemTrackerLi MemTrackerLimiter::~MemTrackerLimiter() { consume(_untracked_mem); static std::string mem_tracker_inaccurate_msg = - ", mem tracker not equal to 0 when mem tracker destruct, this usually means that " + "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " "1. For query and load, memory leaks may have occurred, it is expected that the query " @@ -115,19 +121,101 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the task end. if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) { - LOG(INFO) << "mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << mem_tracker_inaccurate_msg; + std::string err_msg = + fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", + label(), _consumption->current_value(), _consumption->peak_value(), + mem_tracker_inaccurate_msg); +#ifdef NDEBUG + LOG(INFO) << err_msg; +#else + LOG(FATAL) << err_msg << print_address_sanitizers(); +#endif } if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); } _consumption->set(0); +#ifdef DEBUG + } else if (!_address_sanitizers.empty()) { + LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); +#endif } g_memtrackerlimiter_cnt << -1; } +#ifdef DEBUG +static size_t allocator_malloc_usable_size(void* ptr) { +#ifdef USE_JEMALLOC + return jemalloc_usable_size(ptr); +#else + return 0 // malloc_usable_size(ptr); +#endif +} + +void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", usable_size " + << ", old buf: " << it->first << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; + } + + // if alignment not equal to 0, maybe usable_size > size. + AddressSanitizer as = { + size, allocator_malloc_usable_size(buf), + doris::config::enable_address_sanitizers_with_stack_trace ? get_stack_trace() : ""}; + _address_sanitizers.emplace(buf, as); + } +} + +void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + if (it->second.size != size || + it->second.usable_size != allocator_malloc_usable_size(buf)) { + LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " + "label: " + << _label << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() + << ", buf: " << buf << ", size: " << size << ", usable_size " + << allocator_malloc_usable_size(buf) << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", old usable_size: " << it->second.usable_size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; + } + _address_sanitizers.erase(buf); + } else { + LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", stack_trace: " << get_stack_trace(); + } + } +} + +std::string MemTrackerLimiter::print_address_sanitizers() { + std::lock_guard l(_address_sanitizers_mtx); + std::string detail = "[Address Sanitizer]:"; + for (const auto& it : _address_sanitizers) { + detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, + it.second.stack_trace); + } + return detail; +} +#endif + MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index fcb319a8d770f9..ad18abe48eb42e 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -232,6 +232,12 @@ class MemTrackerLimiter final : public MemTracker { // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); +#ifdef DEBUG + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + std::string print_address_sanitizers(); +#endif + std::string debug_string() override { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -274,6 +280,17 @@ class MemTrackerLimiter final : public MemTracker { // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + +#ifdef DEBUG + struct AddressSanitizer { + size_t size; + size_t usable_size; + std::string stack_trace; + }; + + std::mutex _address_sanitizers_mtx; + std::unordered_map _address_sanitizers; +#endif }; inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 09721fb5d048af..ca5c3a95c18afa 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -447,7 +447,7 @@ class AddThreadMemTrackerConsumerByHook { // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (doris::use_mem_hook || size == 0) { \ + if (size == 0 || doris::use_mem_hook) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \ diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index fb4b8130dd74ef..2d6c3d9665665d 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -40,6 +40,7 @@ #include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/key_util.h" #include "util/runtime_profile.h" #include "util/thrift_util.h" @@ -165,7 +166,8 @@ void RowCache::erase(const RowCacheKey& key) { } PointQueryExecutor::~PointQueryExecutor() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->point_query_executor_mem_tracker()); _tablet.reset(); _reusable.reset(); _result_block.reset(); @@ -179,10 +181,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, // using cache __int128_t uuid = static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id())); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); auto cache_handle = LookupConnectionCache::instance()->get(uuid); _binary_row_format = request->is_binary_row(); if (cache_handle != nullptr) { @@ -230,7 +229,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, } Status PointQueryExecutor::lookup_up() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); RETURN_IF_ERROR(_lookup_row_key()); RETURN_IF_ERROR(_lookup_row_data()); RETURN_IF_ERROR(_output_data()); diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 953e253f6196e3..e761a32a8f6323 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -316,7 +316,6 @@ class PointQueryExecutor { std::vector _row_read_ctxs; std::shared_ptr _reusable; std::unique_ptr _result_block; - std::shared_ptr _mem_tracker; Metrics _profile_metrics; bool _binary_row_format = false; // snapshot read version diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index b5e800a7d028d7..cce0500593888d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -48,6 +48,7 @@ #include "gutil/endian.h" #include "gutil/strings/substitute.h" #include "orc/OrcFile.hh" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -767,6 +768,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { return &s_instance; } ~ZstdBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context); for (auto ctx : _ctx_c_pool) { _delete_compression_ctx(ctx); } @@ -786,6 +788,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { + _query_thread_context.init(); CContext* context; RETURN_IF_ERROR(_acquire_compression_ctx(&context)); bool compress_failed = false; @@ -864,6 +867,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + _query_thread_context.init(); DContext* context; bool decompress_failed = false; RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); @@ -960,6 +964,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { mutable std::mutex _ctx_d_mutex; mutable std::vector _ctx_d_pool; + + QueryThreadContext _query_thread_context; }; class GzipBlockCompression : public ZlibBlockCompression { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index a72165f936ca0d..f012e15299e47e 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -192,6 +192,20 @@ void Allocator::throw_bad_alloc( throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } +#ifdef DEBUG +template +void Allocator::add_address_sanitizers(void* buf, + size_t size) const { + doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); +} + +template +void Allocator::remove_address_sanitizers( + void* buf, size_t size) const { + doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); +} +#endif + template void* Allocator::alloc(size_t size, size_t alignment) { return alloc_impl(size, alignment); diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index f805ab445a6ac6..60559e4993163e 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -90,6 +90,10 @@ class Allocator { void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; +#ifdef DEBUG + void add_address_sanitizers(void* buf, size_t size) const; + void remove_address_sanitizers(void* buf, size_t size) const; +#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -97,6 +101,7 @@ class Allocator { /// Allocate memory range. void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); + // consume memory in tracker before alloc, similar to early declaration. consume_memory(size); void* buf; @@ -125,6 +130,9 @@ class Allocator { release_memory(size); throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); } +#ifdef DEBUG + add_address_sanitizers(buf, size); +#endif } else { buf = nullptr; int res = posix_memalign(&buf, alignment, size); @@ -134,6 +142,9 @@ class Allocator { throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } +#ifdef DEBUG + add_address_sanitizers(buf, size); +#endif if constexpr (clear_memory) memset(buf, 0, size); } @@ -148,6 +159,9 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { +#ifdef DEBUG + remove_address_sanitizers(buf, size); +#endif ::free(buf); } release_memory(size); @@ -176,6 +190,10 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } +#ifdef DEBUG + remove_address_sanitizers(buf, old_size); // buf addr = new_buf addr + add_address_sanitizers(new_buf, new_size); +#endif buf = new_buf; if constexpr (clear_memory) @@ -205,6 +223,10 @@ class Allocator { // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); +#ifdef DEBUG + add_address_sanitizers(new_buf, new_size); + remove_address_sanitizers(buf, old_size); +#endif free(buf, old_size); buf = new_buf; } diff --git a/be/src/vec/common/pod_array_fwd.h b/be/src/vec/common/pod_array_fwd.h index ff00b312575a63..e1a428eda9dafb 100644 --- a/be/src/vec/common/pod_array_fwd.h +++ b/be/src/vec/common/pod_array_fwd.h @@ -36,9 +36,12 @@ template class PODArray; -/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */ +/** For columns. Padding is enough to read and write xmm-register at the address of the last element. + * TODO, pad_right is temporarily changed from 15 to 16, will waste 1 bytes, + * can rollback after fix wrong reinterpret_cast column and PODArray swap. + */ template > -using PaddedPODArray = PODArray; +using PaddedPODArray = PODArray; /** A helper for declaring PODArray that uses inline memory. * The initial size is set to use all the inline bytes, since using less would