Skip to content

Commit

Permalink
fix local exchanger low mem mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Dec 11, 2024
1 parent f8af843 commit 57446b7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
6 changes: 2 additions & 4 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,7 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

virtual void set_low_memory_mode() {
_buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024);
_buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
}
};

Expand Down Expand Up @@ -896,8 +895,7 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
}

void set_low_memory_mode() override {
_buffer_mem_limit =
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024);
_buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
DCHECK(!_queues_mem_usage.empty());
_each_queue_limit =
std::max<int64_t>(64 * 1024, _buffer_mem_limit / _queues_mem_usage.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

if (state->get_query_ctx()->low_memory_mode()) {
local_state._shared_state->set_low_memory_mode();
local_state._exchanger->set_low_memory_mode();
}

RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state));
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,16 @@ void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) {
// do nothing
}
}

void ExchangerBase::set_low_memory_mode() {
_free_block_limit = 0;

vectorized::Block block;
while (_free_blocks.try_dequeue(block)) {
// do nothing
}
}

void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& local_state) {
BlockWrapperSPtr next_block;
vectorized::Block block;
Expand Down
11 changes: 6 additions & 5 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class ExchangerBase {

virtual std::string data_queue_debug_string(int i) = 0;

void set_low_memory_mode();

protected:
friend struct LocalExchangeSharedState;
friend struct BlockWrapper;
Expand All @@ -83,7 +85,7 @@ class ExchangerBase {
const int _num_partitions;
const int _num_senders;
const int _num_sources;
const int _free_block_limit = 0;
int _free_block_limit = 0;
moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
};

Expand Down Expand Up @@ -181,10 +183,9 @@ struct BlockWrapper {
if (ref_count.fetch_sub(1) == 1) {
DCHECK_GT(allocated_bytes, 0);
shared_state->sub_total_mem_usage(allocated_bytes, channel_id);
if (shared_state->exchanger->_free_block_limit == 0 ||
shared_state->exchanger->_free_blocks.size_approx() <
shared_state->exchanger->_free_block_limit *
shared_state->exchanger->_num_sources) {
if (shared_state->exchanger->_free_blocks.size_approx() <
shared_state->exchanger->_free_block_limit *
shared_state->exchanger->_num_sources) {
data_block.clear_column_data();
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
}
Expand Down

0 comments on commit 57446b7

Please sign in to comment.