From 083aeaf6506c77a9aeaff339b82babf914c6d574 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Fri, 27 Dec 2024 14:41:08 +0800 Subject: [PATCH] [fix](DataQueue) Fix thread conflict issue caused by concurrent calls to DataQueue::remaining_has_data --- be/src/pipeline/exec/cache_source_operator.cpp | 2 +- be/src/pipeline/exec/data_queue.cpp | 11 ----------- be/src/pipeline/exec/data_queue.h | 7 ++++--- be/src/pipeline/exec/union_source_operator.cpp | 2 +- 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index cace8465fc2d46..b515aeb495751e 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int indentation_level) const { if (_shared_state) { fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", _shared_state->data_queue.is_all_finish(), - _shared_state->data_queue.remaining_has_data()); + _shared_state->data_queue.has_more_data()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 436a98e6b0369e..85354ece76af85 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -72,17 +72,6 @@ void DataQueue::push_free_block(std::unique_ptr block, int ch _free_blocks[child_idx].emplace_back(std::move(block)); } -//use sink to check can_write -bool DataQueue::has_enough_space_to_push() { - DCHECK(_cur_bytes_in_queue.size() == 1); - return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2; -} - -//use source to check can_read -bool DataQueue::has_data_or_finished(int child_idx) { - return remaining_has_data() || _is_finished[child_idx]; -} - //check which queue have data, and save the idx in _flag_queue_idx, //so next loop, will check the record idx + 1 first //maybe it's useful with many queue, others maybe always 0 diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index d8a22888d9bf1a..f2a849ee7dd699 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -52,10 +52,11 @@ class DataQueue { bool is_finish(int child_idx = 0); bool is_all_finish(); - bool has_enough_space_to_push(); - bool has_data_or_finished(int child_idx = 0); + // This function is not thread safe, should be called in Operator::get_block() bool remaining_has_data(); + bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; } + int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; } int64_t max_size_of_queue() const { return _max_size_of_queue; } @@ -102,7 +103,7 @@ class DataQueue { //this only use to record the queue[0] for profile int64_t _max_bytes_in_queue = 0; int64_t _max_size_of_queue = 0; - static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; + static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10; // data queue is multi sink one source std::shared_ptr _source_dependency = nullptr; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 942135453b40cf..f43cd604b688ba 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const { if (_shared_state) { fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", _shared_state->data_queue.is_all_finish(), - _shared_state->data_queue.remaining_has_data()); + _shared_state->data_queue.has_more_data()); } return fmt::to_string(debug_string_buffer); }