Skip to content

Commit

Permalink
[fix](DataQueue) Fix thread conflict issue caused by concurrent calls…
Browse files Browse the repository at this point in the history
… to DataQueue::remaining_has_data
  • Loading branch information
mrhhsg committed Dec 27, 2024
1 parent d46495e commit 083aeaf
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 16 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
_shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
11 changes: 0 additions & 11 deletions be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,6 @@ void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int ch
_free_blocks[child_idx].emplace_back(std::move(block));
}

//use sink to check can_write
bool DataQueue::has_enough_space_to_push() {
DCHECK(_cur_bytes_in_queue.size() == 1);
return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2;
}

//use source to check can_read
bool DataQueue::has_data_or_finished(int child_idx) {
return remaining_has_data() || _is_finished[child_idx];
}

//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ class DataQueue {
bool is_finish(int child_idx = 0);
bool is_all_finish();

bool has_enough_space_to_push();
bool has_data_or_finished(int child_idx = 0);
// This function is not thread safe, should be called in Operator::get_block()
bool remaining_has_data();

bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }

int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
int64_t max_size_of_queue() const { return _max_size_of_queue; }

Expand Down Expand Up @@ -102,7 +103,7 @@ class DataQueue {
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;

// data queue is multi sink one source
std::shared_ptr<Dependency> _source_dependency = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
_shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
Expand Down

0 comments on commit 083aeaf

Please sign in to comment.