Skip to content

Commit

Permalink
[fix](local exchange) fix bug of accessing released counter of local …
Browse files Browse the repository at this point in the history
…data stream receiver
  • Loading branch information
jacktengg committed Sep 11, 2023
1 parent 78a48df commit d98a4d6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
12 changes: 8 additions & 4 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block

DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_recvr->_blocks_memory_usage->add(-block_byte_size);
_recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();

if (!_pending_closures.empty()) {
Expand Down Expand Up @@ -168,7 +168,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
_recvr->_blocks_memory_usage->add(block_byte_size);
_recvr->update_blocks_memory_usage(block_byte_size);
_data_arrival_cv.notify_one();
}

Expand Down Expand Up @@ -208,7 +208,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_data_arrival_cv.notify_one();

if (_recvr->exceeds_limit(block_mem_size)) {
// Careful: Accessing members of _recvr that are allocated by Object pool
// should be done before the following logic, because the _lock will be released
// by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may
// have been closed and resouces in _recvr are released.
_recvr->update_blocks_memory_usage(block_mem_size);
if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
Expand All @@ -223,7 +228,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
iter->second->wait(l);
}

_recvr->_blocks_memory_usage->add(block_mem_size);
}

void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ class VDataStreamRecvr {

void close();

// Careful: stream sender will call this function for a local receiver,
// accessing members of receiver that are allocated by Object pool
// in this function is not safe.
bool exceeds_limit(int batch_size) {
return _blocks_memory_usage->current_value() + batch_size >
return _blocks_memory_usage_current_value + batch_size >
config::exchg_node_buffer_size_bytes;
}

bool is_closed() const { return _is_closed; }

private:
void update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
_blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
}
class SenderQueue;
class PipSenderQueue;

Expand Down Expand Up @@ -154,6 +161,7 @@ class VDataStreamRecvr {
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter;

// Number of rows received
Expand Down Expand Up @@ -266,7 +274,7 @@ class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_blocks_memory_usage->add(block_mem_size);
_recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
Expand Down

0 comments on commit d98a4d6

Please sign in to comment.