Skip to content

Commit

Permalink
[improve](pipelineX) improve partition node dependency logical
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 14, 2023
1 parent e158753 commit 898be25
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 38 deletions.
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition));
//so all data from child have sink completed
((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready();
{
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read();
}
}

return Status::OK();
Expand Down
12 changes: 9 additions & 3 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
if (local_state._shared_state->blocks_buffer.empty() == false) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, output_block, output_block->columns()));
if (local_state._shared_state->blocks_buffer.empty()) {
local_state._dependency->block();
if (local_state._shared_state->blocks_buffer.empty() &&
local_state._shared_state->sink_eos == false) {
// add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
if (local_state._shared_state->sink_eos == false) {
local_state._dependency->block();
}
}
return Status::OK();
}
Expand Down
34 changes: 0 additions & 34 deletions be/src/pipeline/exec/partition_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,6 @@ class PartitionSortSourceDependency final : public Dependency {
PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "PartitionSortSourceDependency", query_ctx) {}
~PartitionSortSourceDependency() override = default;

void block() override {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_always_ready) {
return;
}
Dependency::block();
}

void set_always_ready() {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_always_ready) {
return;
}
_always_ready = true;
set_ready();
}

std::string debug_string(int indentation_level = 0) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _always_ready = {}",
Dependency::debug_string(indentation_level), _always_ready);
return fmt::to_string(debug_string_buffer);
}

private:
bool _always_ready {false};
std::mutex _always_done_lock;
};

class PartitionSortSourceOperatorX;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ struct PartitionSortNodeSharedState : public BasicSharedState {
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row;
bool sink_eos = false;
std::mutex sink_eos_lock;
};

class AsyncWriterDependency final : public Dependency {
Expand Down

0 comments on commit 898be25

Please sign in to comment.