Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](DataQueue) Fix thread conflict issue caused by concurrent calls to DataQueue::remaining_has_data #46094

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
_shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
11 changes: 0 additions & 11 deletions be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,6 @@ void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int ch
_free_blocks[child_idx].emplace_back(std::move(block));
}

//use sink to check can_write
bool DataQueue::has_enough_space_to_push() {
DCHECK(_cur_bytes_in_queue.size() == 1);
return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2;
}

//use source to check can_read
bool DataQueue::has_data_or_finished(int child_idx) {
return remaining_has_data() || _is_finished[child_idx];
}

//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ class DataQueue {
bool is_finish(int child_idx = 0);
bool is_all_finish();

bool has_enough_space_to_push();
bool has_data_or_finished(int child_idx = 0);
// This function is not thread safe, should be called in Operator::get_block()
bool remaining_has_data();

bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }

int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
int64_t max_size_of_queue() const { return _max_size_of_queue; }

Expand Down Expand Up @@ -102,7 +103,7 @@ class DataQueue {
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;

// data queue is multi sink one source
std::shared_ptr<Dependency> _source_dependency = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
_shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
Loading