Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 5, 2024
1 parent d03f017 commit 39e64ed
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 46 deletions.
10 changes: 6 additions & 4 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b) {
if (use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
_mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type);
} else {
_mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}
Expand All @@ -40,7 +42,7 @@ template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down
21 changes: 14 additions & 7 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,27 +452,34 @@ class SwitchThreadMemTrackerLimiter {
const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}
}

explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() ==
query_thread_context.query_id); // workload group alse not change
DCHECK(query_thread_context.query_mem_tracker);
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
if (query_thread_context.query_mem_tracker !=
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
}
}

~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
if (_old_mem_tracker != nullptr) {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
}
doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

private:
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr};
};

class AddThreadMemTrackerConsumer {
Expand Down
8 changes: 7 additions & 1 deletion be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ struct ByteBuffer : private Allocator<false> {
size_t capacity;

private:
ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) {
ByteBuffer(size_t capacity_)
: pos(0),
limit(capacity_),
capacity(capacity_),
mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
}

std::shared_ptr<MemTrackerLimiter> mem_tracker_;
};

} // namespace doris
33 changes: 2 additions & 31 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,43 +211,14 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) {
// Usually, an object that inherits Allocator has the same TLS tracker for each alloc.
// If an object that inherits Allocator needs to be reused by multiple queries,
// it is necessary to switch the same tracker to TLS when calling alloc.
// However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker,
// so we update the Allocator tracker when the TLS tracker changes.
// note that the tracker in thread context when object that inherit Allocator is constructed may be
// no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc.
if (mem_tracker_ == nullptr ||
mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) {
mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
size_t size) const {
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
doris::ThreadContext* thread_context = doris::thread_context(true);
if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") ||
mem_tracker_ == nullptr) {
// If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`,
// this means that in the scope of SCOPED_ATTACH_TASK,
// so thread_mem_tracker should be used to release memory.
// If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator
// has never called alloc, but free memory.
// in phmap, the memory alloced by an object may be transferred to another object and then free.
// in this case, thread context must attach a memory tracker other than Orphan,
// otherwise memory tracking will be wrong.
RELEASE_THREAD_MEM_TRACKER(size);
} else {
// if thread_context does not exist or the label of thread_mem_tracker is equal to
// `Orphan`, it usually happens during object destruction. This means that
// the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
RELEASE_THREAD_MEM_TRACKER(size);
}
RELEASE_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
Expand Down
11 changes: 8 additions & 3 deletions be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,14 @@ class Allocator {
// alloc will continue to execute, so the consume memtracker is forced.
void memory_check(size_t size) const;
// Increases consumption of this tracker by 'bytes'.
void consume_memory(size_t size);
// some special cases:
// 1. objects that inherit Allocator will not be shared by multiple queries.
// non-compliant: page cache, ORC ByteBuffer.
// 2. objects that inherit Allocator will only free memory allocated by themselves.
// non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free.
// 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator
// and during subsequent memory allocation.
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
Expand Down Expand Up @@ -404,8 +411,6 @@ class Allocator {

static constexpr bool clear_memory = clear_memory_;

std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr};

// Freshly mmapped pages are copy-on-write references to a global zero page.
// On the first write, a page fault occurs, and an actual writable page is
// allocated. If we are going to use this memory soon, such as when resizing
Expand Down

0 comments on commit 39e64ed

Please sign in to comment.