Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Apr 17, 2024
1 parent bb1bbb2 commit a3179ce
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 15 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false");

// the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer
DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ DECLARE_mString(kerberos_krb5_conf_path);

// Values include `none`, `glog`, `boost`, `glibc`, `libunwind`
DECLARE_mString(get_stack_trace_tool);
DECLARE_mBool(enable_address_sanitizers_with_stack_trace);

// DISABLED: Don't resolve location info.
// FAST: Perform CU lookup using .debug_aranges (might be incomplete).
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class ExecEnv {
std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
return _segcompaction_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
return _point_query_executor_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> rowid_storage_reader_tracker() {
return _rowid_storage_reader_tracker;
}
Expand Down Expand Up @@ -348,6 +351,7 @@ class ExecEnv {
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;

// TODO, looking forward to more accurate tracking.
std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ void ExecEnv::init_mem_tracker() {
std::make_shared<MemTracker>("IOBufBlockMemory", _details_mem_tracker_set.get());
_segcompaction_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction");
_point_query_executor_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor");
_rowid_storage_reader_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader");
_subcolumns_tree_tracker =
Expand Down
98 changes: 93 additions & 5 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#ifdef USE_JEMALLOC
#include "jemalloc/jemalloc.h"
#else
#include <malloc.h>
#endif
#include "util/stack_util.h"

namespace doris {

Expand Down Expand Up @@ -99,7 +105,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLi
MemTrackerLimiter::~MemTrackerLimiter() {
consume(_untracked_mem);
static std::string mem_tracker_inaccurate_msg =
", mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
"1. For query and load, memory leaks may have occurred, it is expected that the query "
Expand All @@ -115,19 +121,101 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the task end.
if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) {
LOG(INFO) << "mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< mem_tracker_inaccurate_msg;
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
mem_tracker_inaccurate_msg);
#ifdef NDEBUG
LOG(INFO) << err_msg;
#else
LOG(FATAL) << err_msg << print_address_sanitizers();
#endif
}
if (ExecEnv::tracking_memory()) {
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
}
_consumption->set(0);
#ifdef DEBUG
} else if (!_address_sanitizers.empty()) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
g_memtrackerlimiter_cnt << -1;
}

#ifdef DEBUG
static size_t allocator_malloc_usable_size(void* ptr) {
#ifdef USE_JEMALLOC
return jemalloc_usable_size(ptr);
#else
return 0 // malloc_usable_size(ptr);
#endif
}

void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", usable_size "
<< ", old buf: " << it->first << ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}

// if alignment not equal to 0, maybe usable_size > size.
AddressSanitizer as = {
size, allocator_malloc_usable_size(buf),
doris::config::enable_address_sanitizers_with_stack_trace ? get_stack_trace() : ""};
_address_sanitizers.emplace(buf, as);
}
}

void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size ||
it->second.usable_size != allocator_malloc_usable_size(buf)) {
LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", usable_size "
<< allocator_malloc_usable_size(buf) << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", old usable_size: " << it->second.usable_size
<< ", new stack_trace: " << get_stack_trace()
<< ", old stack_trace: " << it->second.stack_trace;
}
_address_sanitizers.erase(buf);
} else {
LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace();
}
}
}

std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
for (const auto& it : _address_sanitizers) {
detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size,
it.second.stack_trace);
}
return detail;
}
#endif

MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
Snapshot snapshot;
snapshot.type = type_string(_type);
Expand Down
17 changes: 17 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ class MemTrackerLimiter final : public MemTracker {
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();

#ifdef DEBUG
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
#endif

std::string debug_string() override {
std::stringstream msg;
msg << "limit: " << _limit << "; "
Expand Down Expand Up @@ -274,6 +280,17 @@ class MemTrackerLimiter final : public MemTracker {
// Avoid frequent printing.
bool _enable_print_log_usage = false;
static std::atomic<bool> _enable_print_log_process_usage;

#ifdef DEBUG
struct AddressSanitizer {
size_t size;
size_t usable_size;
std::string stack_trace;
};

std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
#endif
};

inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ class AddThreadMemTrackerConsumerByHook {
// must call create_thread_local_if_not_exits() before use thread_context().
#define CONSUME_THREAD_MEM_TRACKER(size) \
do { \
if (doris::use_mem_hook || size == 0) { \
if (size == 0 || doris::use_mem_hook) { \
break; \
} \
if (doris::pthread_context_ptr_init) { \
Expand Down
11 changes: 5 additions & 6 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
Expand Down Expand Up @@ -165,7 +166,8 @@ void RowCache::erase(const RowCacheKey& key) {
}

PointQueryExecutor::~PointQueryExecutor() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->point_query_executor_mem_tracker());
_tablet.reset();
_reusable.reset();
_result_block.reset();
Expand All @@ -179,10 +181,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
// using cache
__int128_t uuid =
static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low();
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY,
fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id()));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
auto cache_handle = LookupConnectionCache::instance()->get(uuid);
_binary_row_format = request->is_binary_row();
if (cache_handle != nullptr) {
Expand Down Expand Up @@ -230,7 +229,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
}

Status PointQueryExecutor::lookup_up() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
RETURN_IF_ERROR(_lookup_row_key());
RETURN_IF_ERROR(_lookup_row_data());
RETURN_IF_ERROR(_output_data());
Expand Down
1 change: 0 additions & 1 deletion be/src/service/point_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ class PointQueryExecutor {
std::vector<RowReadContext> _row_read_ctxs;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<vectorized::Block> _result_block;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
Metrics _profile_metrics;
bool _binary_row_format = false;
// snapshot read version
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
#include "orc/OrcFile.hh"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/defer_op.h"
#include "util/faststring.h"
Expand Down Expand Up @@ -767,6 +768,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
return &s_instance;
}
~ZstdBlockCompression() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context);
for (auto ctx : _ctx_c_pool) {
_delete_compression_ctx(ctx);
}
Expand All @@ -786,6 +788,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
_query_thread_context.init();
CContext* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
bool compress_failed = false;
Expand Down Expand Up @@ -864,6 +867,7 @@ class ZstdBlockCompression : public BlockCompressionCodec {
}

Status decompress(const Slice& input, Slice* output) override {
_query_thread_context.init();
DContext* context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
Expand Down Expand Up @@ -960,6 +964,8 @@ class ZstdBlockCompression : public BlockCompressionCodec {

mutable std::mutex _ctx_d_mutex;
mutable std::vector<DContext*> _ctx_d_pool;

QueryThreadContext _query_thread_context;
};

class GzipBlockCompression : public ZlibBlockCompression {
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}

#ifdef DEBUG
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(void* buf,
size_t size) const {
doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizers(
void* buf, size_t size) const {
doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size);
}
#endif

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) {
return alloc_impl(size, alignment);
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,18 @@ class Allocator {
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifdef DEBUG
void add_address_sanitizers(void* buf, size_t size) const;
void remove_address_sanitizers(void* buf, size_t size) const;
#endif

void* alloc(size_t size, size_t alignment = 0);
void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0);

/// Allocate memory range.
void* alloc_impl(size_t size, size_t alignment = 0) {
memory_check(size);
// consume memory in tracker before alloc, similar to early declaration.
consume_memory(size);
void* buf;

Expand Down Expand Up @@ -125,6 +130,9 @@ class Allocator {
release_memory(size);
throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size));
}
#ifdef DEBUG
add_address_sanitizers(buf, size);
#endif
} else {
buf = nullptr;
int res = posix_memalign(&buf, alignment, size);
Expand All @@ -134,6 +142,9 @@ class Allocator {
throw_bad_alloc(
fmt::format("Cannot allocate memory (posix_memalign) {}.", size));
}
#ifdef DEBUG
add_address_sanitizers(buf, size);
#endif

if constexpr (clear_memory) memset(buf, 0, size);
}
Expand All @@ -148,6 +159,9 @@ class Allocator {
throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
}
} else {
#ifdef DEBUG
remove_address_sanitizers(buf, size);
#endif
::free(buf);
}
release_memory(size);
Expand Down Expand Up @@ -176,6 +190,10 @@ class Allocator {
throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size,
new_size));
}
#ifdef DEBUG
remove_address_sanitizers(buf, old_size); // buf addr = new_buf addr
add_address_sanitizers(new_buf, new_size);
#endif

buf = new_buf;
if constexpr (clear_memory)
Expand Down Expand Up @@ -205,6 +223,10 @@ class Allocator {
// Big allocs that requires a copy.
void* new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
#ifdef DEBUG
add_address_sanitizers(new_buf, new_size);
remove_address_sanitizers(buf, old_size);
#endif
free(buf, old_size);
buf = new_buf;
}
Expand Down
Loading

0 comments on commit a3179ce

Please sign in to comment.