diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 93e87cbce5d822..a036481d727789 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -559,11 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) { std::vector tmp_result_window_columns; _result_window_columns.swap(tmp_result_window_columns); - // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). - // We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed. - if (_shared_state && _shared_state->sink_deps.size() == 1) { - _shared_state->sink_deps.front()->set_always_ready(); - } return PipelineXLocalState::close(state); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 7887628b7fa476..3a55fdd9b8698e 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -567,7 +567,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); - CHECK(_shared_hash_table_context->signaled); + // the instance which is not build hash table, it's should wait the signal of hash table build finished. + // but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit, + if (!_shared_hash_table_context->signaled) { + return Status::Error("source have closed"); + } if (!_shared_hash_table_context->status.ok()) { return _shared_hash_table_context->status; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index eba380f4386aa3..ba3602a91cb1ef 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -511,6 +511,11 @@ Status PipelineXLocalState::close(RuntimeState* state) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } _closed = true; + // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). + // We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed. + if (_shared_state && _shared_state->sink_deps.size() == 1) { + _shared_state->sink_deps.front()->set_always_ready(); + } return Status::OK(); }