diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 2e0ffaf9a37bb3..fcd7d014c6fe6b 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -92,7 +92,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _recvr->_blocks_memory_usage->add(-block_byte_size); + _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); if (!_pending_closures.empty()) { @@ -168,7 +168,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } - _recvr->_blocks_memory_usage->add(block_byte_size); + _recvr->update_blocks_memory_usage(block_byte_size); _data_arrival_cv.notify_one(); } @@ -208,7 +208,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _block_queue.emplace_back(std::move(nblock), block_mem_size); _data_arrival_cv.notify_one(); - if (_recvr->exceeds_limit(block_mem_size)) { + // Careful: Accessing members of _recvr that are allocated by Object pool + // should be done before the following logic, because the _lock will be released + // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may + // have been closed and resouces in _recvr are released. + _recvr->update_blocks_memory_usage(block_mem_size); + if (_recvr->exceeds_limit(0)) { // yiguolei // It is too tricky here, if the running thread is bthread then the tid may be wrong. std::thread::id tid = std::this_thread::get_id(); @@ -223,7 +228,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { iter->second->wait(l); } - _recvr->_blocks_memory_usage->add(block_mem_size); } void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 03bf6f9db28bc5..0059c8ddf0e017 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -102,14 +102,21 @@ class VDataStreamRecvr { void close(); + // Careful: stream sender will call this function for a local receiver, + // accessing members of receiver that are allocated by Object pool + // in this function is not safe. bool exceeds_limit(int batch_size) { - return _blocks_memory_usage->current_value() + batch_size > + return _blocks_memory_usage_current_value + batch_size > config::exchg_node_buffer_size_bytes; } bool is_closed() const { return _is_closed; } private: + void update_blocks_memory_usage(int64_t size) { + _blocks_memory_usage->add(size); + _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); + } class SenderQueue; class PipSenderQueue; @@ -154,6 +161,7 @@ class VDataStreamRecvr { RuntimeProfile::Counter* _decompress_bytes; RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage; + std::atomic _blocks_memory_usage_current_value = 0; RuntimeProfile::Counter* _peak_memory_usage_counter; // Number of rows received @@ -266,7 +274,7 @@ class VDataStreamRecvr::PipSenderQueue : public SenderQueue { } _block_queue.emplace_back(std::move(nblock), block_mem_size); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); - _recvr->_blocks_memory_usage->add(block_mem_size); + _recvr->update_blocks_memory_usage(block_mem_size); _data_arrival_cv.notify_one(); } }