Skip to content

Commit

Permalink
[fix][mem tracker] Fix MemTracker null pointer in vectorized (apache#…
Browse files Browse the repository at this point in the history
…8925)

Fix ThreadMemTrackerMgr::update_tracker null pointer and some details.

Issue Number: close apache#8920
  • Loading branch information
xinyiZzz authored Apr 12, 2022
1 parent f992247 commit 66d2f4e
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 9 deletions.
1 change: 1 addition & 0 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));

ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
while ((_num_rows_skipped < _offset)) {
_num_rows_skipped += output_batch->num_rows();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/data_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class DataStreamRecvr {
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
PlanNodeId dest_node_id() const { return _dest_node_id; }
const RowDescriptor& row_desc() const { return _row_desc; }
const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }

void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) {
_sub_plan_query_statistics_recvr->insert(statistics, sender_id);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/fold_constant_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ TUniqueId FoldConstantExecutor::_dummy_id;

Status FoldConstantExecutor::fold_constant_expr(
const TFoldConstantParams& params, PConstantExprResult* response) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const auto& expr_map = params.expr_map;
auto expr_result_map = response->mutable_expr_result_map();

Expand All @@ -54,6 +53,7 @@ Status FoldConstantExecutor::fold_constant_expr(
if (UNLIKELY(!status.ok())) {
return status;
}
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);

for (const auto& m : expr_map) {
PExprResultMap pexpr_result_map;
Expand Down Expand Up @@ -108,9 +108,9 @@ Status FoldConstantExecutor::fold_constant_vexpr(
// init
Status status = _init(query_globals);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return status;
}
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);

for (const auto& m : expr_map) {
PExprResultMap pexpr_result_map;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class ThreadContext {
// The func provided by pthread and std::thread doesn't help either.
//
// So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by
// Thread-scopedthread local + Class-scoped thread local.
// Thread-scoped thread local + Class-scoped thread local.
//
// This may look very trick, but it's the best way I can find.
//
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class ThreadMemTrackerMgr {
_mem_trackers[0] = MemTracker::get_process_tracker();
_untracked_mems[0] = 0;
_tracker_id = 0;
_mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
start_thread_mem_tracker = true;
}
~ThreadMemTrackerMgr() {
Expand All @@ -75,7 +76,8 @@ class ThreadMemTrackerMgr {
void clear_untracked_mems() {
for (const auto& untracked_mem : _untracked_mems) {
if (untracked_mem.second != 0) {
DCHECK(_mem_trackers[untracked_mem.first]) << ", label: " << _mem_tracker_labels[untracked_mem.first];
DCHECK(_mem_trackers[untracked_mem.first])
<< ", label: " << _mem_tracker_labels[untracked_mem.first];
if (_mem_trackers[untracked_mem.first]) {
_mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
} else {
Expand Down Expand Up @@ -195,7 +197,7 @@ inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTrac
_untracked_mems[_tracker_id] += _untracked_mem;
_untracked_mem = 0;
std::swap(_tracker_id, _temp_tracker_id);
DCHECK(_mem_trackers[_tracker_id]);
DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
return _temp_tracker_id; // old tracker_id
}

Expand All @@ -204,7 +206,8 @@ inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
_untracked_mems[_tracker_id] += _untracked_mem;
_untracked_mem = 0;
_tracker_id = tracker_id;
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end())
<< ", label: " << _mem_tracker_labels[_tracker_id];
DCHECK(_mem_trackers[_tracker_id]);
}
}
Expand All @@ -217,14 +220,14 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
// Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion.
// Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
start_thread_mem_tracker = false;
// When switching to the current tracker last time, the remaining untracked memory.
if (_untracked_mems[_tracker_id] != 0) {
_untracked_mem += _untracked_mems[_tracker_id];
_untracked_mems[_tracker_id] = 0;
}
// Avoid getting stuck in infinite loop if there is memory allocation in noncache_consume.
// For example: GC function when try_consume; mem_limit_exceeded.
noncache_consume();
start_thread_mem_tracker = true;
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/runtime/vdata_stream_recvr.h"

Expand Down Expand Up @@ -48,6 +49,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status VExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
Expand All @@ -63,6 +65,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
}
Status VExchangeNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));

if (_is_merging) {
Expand All @@ -80,6 +83,8 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e

Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
SCOPED_TIMER(runtime_profile()->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
auto status = _stream_recvr->get_next(block, eos);
if (block != nullptr) {
if (_num_rows_returned + block->rows() < _limit) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class VDataStreamRecvr {
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
PlanNodeId dest_node_id() const { return _dest_node_id; }
const RowDescriptor& row_desc() const { return _row_desc; }
const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }

void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) {
_sub_plan_query_statistics_recvr->insert(statistics, sender_id);
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ Status VOlapTableSink::init(const TDataSink& sink) {
}

Status VOlapTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OlapTableSink::prepare(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc,
_expr_mem_tracker));
return OlapTableSink::prepare(state);
return Status::OK();
}

Status VOlapTableSink::open(RuntimeState* state) {
Expand Down

0 comments on commit 66d2f4e

Please sign in to comment.