diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index b6cd342d3988be..93d22850dfcbb1 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -111,6 +111,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { } Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } auto p = _parent->cast(); Defer defer {[&]() { if (_should_build_hash_table) { @@ -132,8 +135,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { - return Status::OK(); + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + return Base::close(state, exec_status); } auto* block = _shared_state->build_block.get(); uint64_t hash_table_size = block ? block->rows() : 0; @@ -151,7 +154,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); - return Status::OK(); + return Base::close(state, exec_status); } bool HashJoinBuildSinkLocalState::build_unique() const { @@ -535,6 +538,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 04d8f0dc736f27..eca7d608437b06 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -356,6 +356,7 @@ class PipelineXSinkLocalStateBase { // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; + std::atomic _eos = false; //NOTICE: now add a faker profile, because sometimes the profile record is useless //so we want remove some counters and timers, eg: in join node, if it's broadcast_join //and shared hash table, some counter/timer about build hash table is useless, diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 259d7580877493..6b3a74c83df97c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -259,7 +259,6 @@ class PartitionedAggSinkLocalState std::unique_ptr _runtime_state; - bool _eos = false; std::shared_ptr _finish_dependency; // temp structures during spilling diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index e74b5d2a41401a..2c820d9fa09daf 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -54,7 +54,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _finish_dependency; }; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 19c37f3649bcc7..c5da36a7c4d286 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -110,11 +110,14 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu } RETURN_IF_ERROR(Base::close(state, exec_status)); if (exec_status.ok()) { - DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0) - << " state: { cancel = " << state->is_cancelled() << ", " - << state->cancel_reason().to_string() << "} query ctx: { cancel = " - << state->get_query_ctx()->is_cancelled() << ", " - << state->get_query_ctx()->exec_status().to_string() << "}"; + DCHECK(_release_count || _exchanger == nullptr || + _exchanger->_running_source_operators == 0) + << "Do not finish correctly! " << debug_string(0) + << " state: { cancel = " << state->is_cancelled() << ", " + << state->cancel_reason().to_string() + << "} query ctx: { cancel = " << state->get_query_ctx()->is_cancelled() << ", " + << state->get_query_ctx()->exec_status().to_string() + << "} Exchanger: " << (void*)_exchanger; } return Status::OK(); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 74e15d7cc93ea1..cef02d6374b9dc 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -65,4 +66,12 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +void Pipeline::make_all_runnable() { + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(true); + } + } +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index dfeb53ae006116..8a20ccb631cc47 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -47,6 +47,7 @@ class Pipeline : public std::enable_shared_from_this { std::weak_ptr context) : _pipeline_id(pipeline_id), _num_tasks(num_tasks) { _init_profile(); + _tasks.resize(_num_tasks, nullptr); } // Add operators for pipelineX @@ -104,14 +105,24 @@ class Pipeline : public std::enable_shared_from_this { void set_children(std::shared_ptr child) { _children.push_back(child); } void set_children(std::vector> children) { _children = children; } - void incr_created_tasks() { _num_tasks_created++; } + void incr_created_tasks(int i, PipelineTask* task) { + _num_tasks_created++; + _num_tasks_running++; + DCHECK_LT(i, _tasks.size()); + _tasks[i] = task; + } + + void make_all_runnable(); + void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; + _tasks.resize(_num_tasks, nullptr); for (auto& op : _operators) { op->set_parallel_tasks(_num_tasks); } } int num_tasks() const { return _num_tasks; } + bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } std::string debug_string() { fmt::memory_buffer debug_string_buffer; @@ -158,6 +169,10 @@ class Pipeline : public std::enable_shared_from_this { int _num_tasks = 1; // How many tasks are already created? std::atomic _num_tasks_created = 0; + // How many tasks are already created and not finished? + std::atomic _num_tasks_running = 0; + // Tasks in this pipeline. + std::vector _tasks; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e99d8a17262e2e..54c49321389db0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -146,6 +146,8 @@ PipelineFragmentContext::~PipelineFragmentContext() { runtime_state.reset(); } } + _dag.clear(); + _pip_id_to_pipeline.clear(); _pipelines.clear(); _sink.reset(); _root_op.reset(); @@ -365,6 +367,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); @@ -469,6 +472,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag task_runtime_state.get(), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); + pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); @@ -562,7 +566,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag } } _pipeline_parent_map.clear(); - _dag.clear(); _op_id_to_le_state.clear(); return Status::OK(); @@ -1752,7 +1755,16 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } -void PipelineFragmentContext::close_a_pipeline() { +void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { + // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here + DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); + if (_pip_id_to_pipeline[pipeline_id]->close_task()) { + if (_dag.contains(pipeline_id)) { + for (auto dep : _dag[pipeline_id]) { + _pip_id_to_pipeline[dep]->make_all_runnable(); + } + } + } std::lock_guard l(_task_mutex); ++_closed_tasks; if (_closed_tasks == _total_tasks) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index dcfcc2016199db..bcef1271b6025a 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -100,7 +100,7 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int get_fragment_id() const { return _fragment_id; } - void close_a_pipeline(); + void close_a_pipeline(PipelineId pipeline_id); Status send_report(bool); @@ -291,6 +291,7 @@ class PipelineFragmentContext : public TaskExecutionContext { std::map, std::shared_ptr>> _op_id_to_le_state; + std::map _pip_id_to_pipeline; // UniqueId -> runtime mgr std::map> _runtime_filter_mgr_map; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4f362ac5042e8f..35d09f4850930b 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -71,7 +71,6 @@ PipelineTask::PipelineTask( if (shared_state) { _sink_shared_state = shared_state; } - pipeline->incr_created_tasks(); } Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, @@ -279,7 +278,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); - _eos = _sink->is_finished(_state) || _eos; + _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; *eos = _eos; if (_eos) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dd2ead4b5dcc91..36362e15813238 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -135,7 +135,8 @@ class PipelineTask { int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } - void clear_blocking_state() { + void clear_blocking_state(bool wake_up_by_downstream = false) { + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); @@ -231,6 +232,8 @@ class PipelineTask { } } + PipelineId pipeline_id() const { return _pipeline->id(); } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -306,11 +309,12 @@ class PipelineTask { Dependency* _execution_dep = nullptr; - std::atomic _finalized {false}; + std::atomic _finalized = false; std::mutex _dependency_lock; - std::atomic _running {false}; - std::atomic _eos {false}; + std::atomic _running = false; + std::atomic _eos = false; + std::atomic _wake_up_by_downstream = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 8be30773ee11f1..475d3a8065f8b4 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -94,7 +94,7 @@ void _close_task(PipelineTask* task, Status exec_status) { } task->finalize(); task->set_running(false); - task->fragment_context()->close_a_pipeline(); + task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(size_t index) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 01fcf851321fc1..689c937455e8e8 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -150,7 +150,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( } *local_merge_filters = &iter->second; DCHECK(!iter->second.filters.empty()); - DCHECK_GT(iter->second.merge_time, 0); return Status::OK(); }