From 7919f0702e01cb42dabe7faf0ebfed086319fda1 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Mon, 15 Jul 2024 19:43:51 +0800 Subject: [PATCH] [Bug](join) fix broadcast join running when hash table build not finished (#37792) in pr https://github.com/apache/doris/pull/37643/files wants fix the bug of join which not build hash table, but running early and not wait the finished signal. But this may be a normal phenomenon, as it should allows all sinks to run when the source operator have closed, so here return eof status directly when signal == false. --- be/src/pipeline/exec/analytic_source_operator.cpp | 5 ----- be/src/pipeline/exec/hashjoin_build_sink.cpp | 6 +++++- be/src/pipeline/exec/operator.cpp | 5 +++++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 93e87cbce5d822..a036481d727789 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -559,11 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) { std::vector tmp_result_window_columns; _result_window_columns.swap(tmp_result_window_columns); - // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). - // We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed. - if (_shared_state && _shared_state->sink_deps.size() == 1) { - _shared_state->sink_deps.front()->set_always_ready(); - } return PipelineXLocalState::close(state); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 7887628b7fa476..3a55fdd9b8698e 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -567,7 +567,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); - CHECK(_shared_hash_table_context->signaled); + // the instance which is not build hash table, it's should wait the signal of hash table build finished. + // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit, + if (!_shared_hash_table_context->signaled) { + return Status::Error("source have closed"); + } if (!_shared_hash_table_context->status.ok()) { return _shared_hash_table_context->status; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index eba380f4386aa3..ba3602a91cb1ef 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -511,6 +511,11 @@ Status PipelineXLocalState::close(RuntimeState* state) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } _closed = true; + // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). + // We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed. + if (_shared_state && _shared_state->sink_deps.size() == 1) { + _shared_state->sink_deps.front()->set_always_ready(); + } return Status::OK(); }