Skip to content

Commit

Permalink
[Bug](join) fix broadcast join running when hash table build not fini…
Browse files Browse the repository at this point in the history
…shed (apache#37792)

in pr https://github.com/apache/doris/pull/37643/files
wants fix the bug of join which not build hash table, but running early
and not wait the finished signal.
But this may be a normal phenomenon, as it should allows all sinks to
run when the source operator have closed,
so here return eof status directly when signal == false.
  • Loading branch information
zhangstar333 authored Jul 15, 2024
1 parent 4a72ddc commit 7919f07
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
CHECK(_shared_hash_table_context->signaled);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit,
if (!_shared_hash_table_context->signaled) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}

if (!_shared_hash_table_context->status.ok()) {
return _shared_hash_table_context->status;
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return Status::OK();
}

Expand Down

0 comments on commit 7919f07

Please sign in to comment.