From 277b3cf4379c68521b8173478010dd25be028002 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 16 Mar 2024 09:04:03 +0800 Subject: [PATCH] [pipelineX](exchange) Make exchange buffer size configurable (#32201) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 ++++-- be/src/pipeline/exec/exchange_sink_buffer.h | 1 - be/src/pipeline/exec/scan_operator.cpp | 1 + 5 files changed, 7 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index cd31926f197e3b..ceb747de8136e1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -268,6 +268,7 @@ DEFINE_mInt32(doris_max_scan_key_num, "48"); DEFINE_mInt32(max_pushdown_conditions_per_column, "1024"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); +DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); DEFINE_mInt64(column_dictionary_key_ratio_threshold, "0"); DEFINE_mInt64(column_dictionary_key_size_threshold, "0"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c6000fed446ccc..0c7fdaf5f9c0d9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -310,6 +310,7 @@ DECLARE_mInt32(doris_max_scan_key_num); DECLARE_mInt32(max_pushdown_conditions_per_column); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); +DECLARE_mInt32(exchg_buffer_queue_capacity_factor); DECLARE_mInt64(column_dictionary_key_ratio_threshold); DECLARE_mInt64(column_dictionary_key_size_threshold); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ed7f18bfcb7c37..2b97551d8fb5c8 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -111,7 +111,8 @@ void ExchangeSinkBuffer::close() { template bool ExchangeSinkBuffer::can_write() const { - size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + size_t max_package_size = + config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); size_t total_package_size = 0; for (auto& [_, q] : _instance_to_package_queue) { total_package_size += q.size(); @@ -168,7 +169,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { std::queue, std::list>>(); _instance_to_broadcast_package_queue[low_id] = std::queue, std::list>>(); - _queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + _queue_capacity = + config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 0afa59bf731726..8c0375499c3f86 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -270,7 +270,6 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - static constexpr int QUEUE_CAPACITY_FACTOR = 64; std::shared_ptr _queue_dependency; std::shared_ptr _finish_dependency; std::atomic _should_stop {false}; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index c10e7777bdb649..8870ba619cbcf2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1413,6 +1413,7 @@ Status ScanLocalState::close(RuntimeState* state) { if (_scanner_ctx) { _scanner_ctx->stop_scanners(state); } + std::list> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());