Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 13, 2023
1 parent 45ba1e8 commit 26f66bf
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 226 deletions.
1 change: 0 additions & 1 deletion be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
_use_proto(use_proto) {}

StreamLoadPipe::~StreamLoadPipe() {
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using SchemaSPtr = std::shared_ptr<const Schema>;
class Schema {
public:
Schema(TabletSchemaSPtr tablet_schema) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
size_t num_columns = tablet_schema->num_columns();
// ignore this column
if (tablet_schema->columns().back().name() == BeConsts::ROW_STORE_COL) {
Expand Down Expand Up @@ -86,7 +86,7 @@ class Schema {

// All the columns of one table may exist in the columns param, but col_ids is only a subset.
Schema(const std::vector<TabletColumn>& columns, const std::vector<ColumnId>& col_ids) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
size_t num_key_columns = 0;
_unique_ids.resize(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
Expand All @@ -109,7 +109,7 @@ class Schema {

// Only for UT
Schema(const std::vector<TabletColumn>& columns, size_t num_key_columns) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
std::vector<ColumnId> col_ids(columns.size());
_unique_ids.resize(columns.size());
for (uint32_t cid = 0; cid < columns.size(); ++cid) {
Expand All @@ -121,7 +121,7 @@ class Schema {
}

Schema(const std::vector<const Field*>& cols, size_t num_key_columns) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
std::vector<ColumnId> col_ids(cols.size());
_unique_ids.resize(cols.size());
for (uint32_t cid = 0; cid < cols.size(); ++cid) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ void TabletSchema::clear_columns() {
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
_keys_type = schema.keys_type();
_num_columns = 0;
_num_variant_columns = 0;
Expand Down
11 changes: 2 additions & 9 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ void GetResultBatchCtx::on_failure(const Status& status) {
status.to_protobuf(result->mutable_status());
{
// call by result sink
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
delete this;
Expand All @@ -57,10 +56,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
}
result->set_packet_seq(packet_seq);
result->set_eos(true);
{
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
{ done->Run(); }
delete this;
}

Expand All @@ -85,10 +81,7 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
result->set_eos(eos);
}
st.to_protobuf(result->mutable_status());
{
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
{ done->Run(); }
delete this;
}

Expand Down
48 changes: 24 additions & 24 deletions be/src/runtime/memory/jemalloc_hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ extern "C" {
// mem hook should avoid nesting new/malloc.

void* doris_malloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(jenallocx(size, 0));
CONSUME_MEM_TRACKER_USE_BY_HOOK(jenallocx(size, 0));
void* ptr = jemalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0));
RELEASE_MEM_TRACKER_USE_BY_HOOK(jenallocx(size, 0));
}
return ptr;
}

void doris_free(void* p) __THROW {
RELEASE_MEM_TRACKER(jemalloc_usable_size(p));
RELEASE_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(p));
jefree(p);
}

Expand All @@ -62,10 +62,10 @@ void* doris_realloc(void* p, size_t size) __THROW {
int64_t old_size = jemalloc_usable_size(p);
#endif

CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jenallocx(size, 0) - old_size);
void* ptr = jerealloc(p, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0) - old_size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(jenallocx(size, 0) - old_size);
}
return ptr;
}
Expand All @@ -75,72 +75,72 @@ void* doris_calloc(size_t n, size_t size) __THROW {
return nullptr;
}

CONSUME_MEM_TRACKER(n * size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(n * size);
void* ptr = jecalloc(n, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(n * size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(n * size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - n * size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr) - n * size);
}
return ptr;
}

void doris_cfree(void* ptr) __THROW {
RELEASE_MEM_TRACKER(jemalloc_usable_size(ptr));
RELEASE_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr));
jefree(ptr);
}

void* doris_memalign(size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr) - size);
}
return ptr;
}

void* doris_aligned_alloc(size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr) - size);
}
return ptr;
}

void* doris_valloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr) - size);
}
return ptr;
}

void* doris_pvalloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(ptr) - size);
}
return ptr;
}

int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(size);
int ret = jeposix_memalign(r, align, size);
if (UNLIKELY(ret != 0)) {
RELEASE_MEM_TRACKER(size);
RELEASE_MEM_TRACKER_USE_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(*r) - size);
CONSUME_MEM_TRACKER_USE_BY_HOOK(jemalloc_usable_size(*r) - size);
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
_parent_label = parent->label();
_parent_group_num = parent->group_num();
} else if (doris::is_thread_context_init()) {
} else if (is_thread_context_init()) {
_parent_label = thread_context()->thread_mem_tracker()->label();
_parent_group_num = thread_context()->thread_mem_tracker()->group_num();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
"be.INFO.",
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker());
doris::is_thread_context_init()
? doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()
: "");
} else if (_type == Type::SCHEMA_CHANGE) {
err_msg += fmt::format(
" can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class MemTrackerLimiter final : public MemTracker {

// Transfer 'bytes' of consumption from this tracker to 'dst'.
void transfer_to(int64_t size, MemTrackerLimiter* dst) {
if (label() == dst->label()) {
return;
}
cache_consume(-size);
dst->cache_consume(size);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/tcmalloc_hook.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
CONSUME_MEM_TRACKER(tc_nallocx(size, 0));
CONSUME_MEM_TRACKER_USE_BY_HOOK(tc_nallocx(size, 0));
}

void delete_hook(const void* ptr) {
RELEASE_MEM_TRACKER(tc_malloc_size(const_cast<void*>(ptr)));
RELEASE_MEM_TRACKER_USE_BY_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}

void init_hook() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {

inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
_untracked_mem += size;
if (!ExecEnv::ready()) {
if (!_init && !ExecEnv::ready()) {
return;
}
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
Expand Down
13 changes: 13 additions & 0 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,17 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
ThreadLocalHandle::release_thread_local();
}

AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook(
const std::shared_ptr<MemTracker>& mem_tracker)
: _mem_tracker(mem_tracker) {
ThreadLocalHandle::handle_thread_local();
DCHECK(mem_tracker != nullptr);
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
}

AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() {
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
ThreadLocalHandle::release_thread_local();
}

} // namespace doris
Loading

0 comments on commit 26f66bf

Please sign in to comment.