diff --git a/be/src/http/action/pipeline_task_action.h b/be/src/http/action/pipeline_task_action.h index 488a1148a533d6..00c1c062cad8ef 100644 --- a/be/src/http/action/pipeline_task_action.h +++ b/be/src/http/action/pipeline_task_action.h @@ -23,7 +23,6 @@ namespace doris { class HttpRequest; -// Get BE health state from http API. class PipelineTaskAction : public HttpHandler { public: PipelineTaskAction() = default; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 48189c8eb4a296..566845639cbb2a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -70,8 +70,6 @@ class ExchangeSinkQueueDependency final : public WriteDependency { ExchangeSinkQueueDependency(int id, int node_id) : WriteDependency(id, node_id, "ResultQueueDependency") {} ~ExchangeSinkQueueDependency() override = default; - - void* shared_state() override { return nullptr; } }; class BroadcastDependency final : public WriteDependency { @@ -95,19 +93,6 @@ class BroadcastDependency final : public WriteDependency { } } - void* shared_state() override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); - return nullptr; - } - - void set_ready_for_write() override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); - } - - void block_writing() override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); - } - int available_blocks() const { return _available_block; } private: @@ -138,7 +123,6 @@ class LocalExchangeChannelDependency final : public WriteDependency { LocalExchangeChannelDependency(int id, int node_id) : WriteDependency(id, node_id, "LocalExchangeChannelDependency") {} ~LocalExchangeChannelDependency() override = default; - void* shared_state() override { return nullptr; } // TODO(gabriel): blocked by memory }; diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 5d754747bee743..c00319c1e9d150 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -56,15 +56,13 @@ struct ExchangeDataDependency final : public Dependency { ExchangeDataDependency(int id, int node_id, vectorized::VDataStreamRecvr::SenderQueue* sender_queue) : Dependency(id, node_id, "DataDependency"), _always_done(false) {} - void* shared_state() override { return nullptr; } void set_always_done() { - _always_done = true; - if (_ready_for_read) { + if (_always_done) { return; } - _read_dependency_watcher.stop(); - _ready_for_read = true; + _always_done = true; + Dependency::set_ready_for_read(); } void block_reading() override { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index a1815ca511854e..9771ae43e8e9aa 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -52,8 +52,6 @@ class SharedHashTableDependency final : public WriteDependency { SharedHashTableDependency(int id, int node_id) : WriteDependency(id, node_id, "SharedHashTableDependency") {} ~SharedHashTableDependency() override = default; - - void* shared_state() override { return nullptr; } }; class HashJoinBuildSinkLocalState final diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 53e0292f68f7dd..9bda54e79b6907 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -49,8 +49,6 @@ class ResultSinkDependency final : public WriteDependency { ResultSinkDependency(int id, int node_id) : WriteDependency(id, node_id, "ResultSinkDependency") {} ~ResultSinkDependency() override = default; - - void* shared_state() override { return nullptr; } }; class ResultSinkLocalState final : public PipelineXSinkLocalState<> { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index f058225580daad..f4dd2c45d51315 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -62,8 +62,6 @@ class ScanDependency final : public Dependency { ScanDependency(int id, int node_id) : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) {} - void* shared_state() override { return nullptr; } - // TODO(gabriel): [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 && @@ -73,6 +71,8 @@ class ScanDependency final : public Dependency { return Dependency::read_blocked_by(task); } + bool push_to_blocking_queue() override { return true; } + void block_reading() override { if (_eos) { return; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 52bd8aa3cf715f..4c92c37e48f40d 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -165,6 +165,7 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState _build_timer = ADD_TIMER(_profile, "BuildTime"); Parent& parent = _parent->cast(); + _dependency->set_cur_child_id(parent._cur_child_id); _child_exprs.resize(parent._child_exprs.size()); for (size_t i = 0; i < _child_exprs.size(); i++) { RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i])); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 947f341880235d..a302dc7c34d8b1 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -133,7 +133,7 @@ class PipelineTask { _wait_worker_watcher.start(); } void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } - PipelineTaskState get_state() { return _cur_state; } + PipelineTaskState get_state() const { return _cur_state; } void set_state(PipelineTaskState state); virtual bool is_pending_finish() { @@ -154,6 +154,7 @@ class PipelineTask { } virtual bool source_can_read() { return _source->can_read() || _pipeline->_always_can_read; } + virtual bool push_blocked_task_to_queue() const { return true; } virtual bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 2e43007dee7296..f21a48dc5fc562 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -29,10 +29,8 @@ namespace doris::pipeline { void Dependency::add_block_task(PipelineXTask* task) { - // TODO(gabriel): support read dependency - if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == task) { - return; - } + DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task) + << "Duplicate task: " << task->debug_string(); _blocked_task.push_back(task); } @@ -74,6 +72,17 @@ void Dependency::set_ready_for_read() { _ready_for_read = true; local_block_task.swap(_blocked_task); } + for (auto* task : local_block_task) { + task->try_wake_up(this); + } +} + +void SetDependency::set_ready_for_read() { + if (_child_idx == 0) { + WriteDependency::set_ready_for_read(); + } else { + _set_state->probe_finished_children_dependency[0]->set_ready_for_read(); + } } void WriteDependency::set_ready_for_write() { @@ -84,7 +93,7 @@ void WriteDependency::set_ready_for_write() { std::vector local_block_task {}; { - std::unique_lock lc(_task_lock); + std::unique_lock lc(_write_task_lock); if (_ready_for_write) { return; } @@ -133,7 +142,7 @@ Dependency* Dependency::read_blocked_by(PipelineXTask* task) { std::unique_lock lc(_task_lock); auto ready_for_read = _ready_for_read.load(); - if (!ready_for_read && task) { + if (!ready_for_read && !push_to_blocking_queue() && task) { add_block_task(task); } return ready_for_read ? nullptr : this; @@ -162,7 +171,7 @@ FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) { } WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) { - std::unique_lock lc(_task_lock); + std::unique_lock lc(_write_task_lock); const auto ready_for_write = _ready_for_write.load(); if (!ready_for_write && task) { add_write_block_task(task); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index f6a37766525f46..937aa01bd868b1 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -57,9 +57,10 @@ class Dependency : public std::enable_shared_from_this { [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } - virtual void* shared_state() = 0; + virtual void* shared_state() { return nullptr; } virtual std::string debug_string(int indentation_level = 0); virtual bool is_write_dependency() { return false; } + virtual bool push_to_blocking_queue() { return false; } // Start the watcher. We use it to count how long this dependency block the current pipeline task. void start_read_watcher() { @@ -145,6 +146,7 @@ class WriteDependency : public Dependency { protected: friend class Dependency; std::atomic _ready_for_write {true}; + std::mutex _write_task_lock; MonotonicStopWatch _write_dependency_watcher; private: @@ -172,7 +174,6 @@ class FinishDependency final : public Dependency { void set_ready_to_finish(); - void* shared_state() override { return nullptr; } std::string debug_string(int indentation_level = 0) override; void add_block_task(PipelineXTask* task) override; @@ -222,7 +223,6 @@ class RuntimeFilterDependency final : public Dependency { RuntimeFilterDependency(int id, int node_id, std::string name) : Dependency(id, node_id, name) {} RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task); - void* shared_state() override { return nullptr; } void add_filters(IRuntimeFilter* runtime_filter); void sub_filters(); void set_blocked_by_rf(std::shared_ptr blocked_by_rf) { @@ -255,8 +255,6 @@ class AndDependency final : public WriteDependency { return fmt::to_string(debug_string_buffer); } - void* shared_state() override { return nullptr; } - std::string debug_string(int indentation_level = 0) override; [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { @@ -284,7 +282,6 @@ struct FakeDependency final : public WriteDependency { public: FakeDependency(int id, int node_id) : WriteDependency(id, node_id, "FakeDependency") {} using SharedState = FakeSharedState; - void* shared_state() override { return nullptr; } [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { return nullptr; } [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override { return nullptr; @@ -465,6 +462,7 @@ class UnionDependency final : public WriteDependency { } return this; } + bool push_to_blocking_queue() override { return true; } void block_reading() override {} void block_writing() override {} @@ -667,7 +665,6 @@ class AsyncWriterDependency final : public WriteDependency { AsyncWriterDependency(int id, int node_id) : WriteDependency(id, node_id, "AsyncWriterDependency") {} ~AsyncWriterDependency() override = default; - void* shared_state() override { return nullptr; } }; class SetDependency; @@ -769,30 +766,10 @@ class SetDependency final : public WriteDependency { void set_shared_state(std::shared_ptr set_state) { _set_state = set_state; } - // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. - [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { - if (config::enable_fuzzy_mode && !_set_state->ready_for_read && - _should_log(_read_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id() << " " << _node_id << " block tasks: " << _blocked_task.size(); - } - std::unique_lock lc(_task_lock); - if (!_set_state->ready_for_read && task) { - add_block_task(task); - } - return _set_state->ready_for_read ? nullptr : this; - } - - // Notify downstream pipeline tasks this dependency is ready. - void set_ready_for_read() override { - if (_set_state->ready_for_read) { - return; - } - _read_dependency_watcher.stop(); - _set_state->ready_for_read = true; - } + void set_ready_for_read() override; void set_cur_child_id(int id) { + _child_idx = id; _set_state->probe_finished_children_dependency[id] = this; if (id != 0) { block_writing(); @@ -801,6 +778,7 @@ class SetDependency final : public WriteDependency { private: std::shared_ptr _set_state; + int _child_idx {0}; }; using PartitionedBlock = std::pair, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 7295f38a124b5b..ade718c1498efc 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -186,7 +186,7 @@ Status PipelineXTask::_open() { _blocked_dep = _filter_dependency->filter_blocked_by(this); if (_blocked_dep) { set_state(PipelineTaskState::BLOCKED_FOR_RF); - set_use_blocking_queue(false); + set_use_blocking_queue(); RETURN_IF_ERROR(st); } else if (i == 1) { CHECK(false) << debug_string(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 04c5ddc1974646..f920bf219b0c9c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -127,7 +127,7 @@ class PipelineXTask : public PipelineTask { OperatorXs operatorXs() { return _operators; } - bool push_blocked_task_to_queue() { + bool push_blocked_task_to_queue() const override { /** * Push task into blocking queue if: * 1. `_use_blocking_queue` is true. @@ -135,15 +135,19 @@ class PipelineXTask : public PipelineTask { */ return _use_blocking_queue || get_state() == PipelineTaskState::BLOCKED_FOR_DEPENDENCY; } - void set_use_blocking_queue(bool use_blocking_queue) { - _use_blocking_queue = use_blocking_queue; + void set_use_blocking_queue() { + if (_blocked_dep->push_to_blocking_queue()) { + _use_blocking_queue = true; + return; + } + _use_blocking_queue = false; } private: Dependency* _write_blocked_dependency() { _blocked_dep = _write_dependencies->write_blocked_by(this); if (_blocked_dep != nullptr) { - set_use_blocking_queue(false); + set_use_blocking_queue(); static_cast(_blocked_dep)->start_write_watcher(); return _blocked_dep; } @@ -154,7 +158,7 @@ class PipelineXTask : public PipelineTask { for (auto* fin_dep : _finish_dependencies) { _blocked_dep = fin_dep->finish_blocked_by(this); if (_blocked_dep != nullptr) { - set_use_blocking_queue(false); + set_use_blocking_queue(); static_cast(_blocked_dep)->start_finish_watcher(); return _blocked_dep; } @@ -166,8 +170,7 @@ class PipelineXTask : public PipelineTask { for (auto* op_dep : _read_dependencies) { _blocked_dep = op_dep->read_blocked_by(this); if (_blocked_dep != nullptr) { - // TODO(gabriel): - set_use_blocking_queue(true); + set_use_blocking_queue(); _blocked_dep->start_read_watcher(); return _blocked_dep; } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 9ce2711c27ab37..8885a0601a1888 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -77,7 +77,7 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) { return Status::InternalError("BlockedTaskScheduler shutdown"); } std::unique_lock lock(_task_mutex); - if (task->is_pipelineX() && !static_cast(task)->push_blocked_task_to_queue()) { + if (!static_cast(task)->push_blocked_task_to_queue()) { // put this task into current dependency's blocking queue and wait for event notification // instead of using a separate BlockedTaskScheduler. return Status::OK(); @@ -142,6 +142,11 @@ void BlockedTaskScheduler::_schedule() { } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { if (task->source_can_read()) { _make_task_run(local_blocked_tasks, iter); + } else if (!task->push_blocked_task_to_queue()) { + // TODO(gabriel): This condition means this task is in blocking queue now and we should + // remove it because this new dependency should not be put into blocking queue. We + // will delete this strange behavior after ScanDependency and UnionDependency done. + local_blocked_tasks.erase(iter++); } else { iter++; }