diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5ae89db55a45ac..265c4e0047b875 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -445,14 +445,14 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); task_runtime_state->set_task_num(pipeline->num_tasks()); - auto task = std::make_unique(pipeline, cur_task_id, + auto task = std::make_shared(pipeline, cur_task_id, task_runtime_state.get(), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); - _tasks[i].emplace_back(std::move(task)); + _tasks[i].emplace_back(task); } } @@ -1679,7 +1679,7 @@ Status PipelineFragmentContext::submit() { auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { for (auto& t : task) { - st = scheduler->schedule_task(t.get()); + st = scheduler->schedule_task(t); if (!st) { cancel(Status::InternalError("submit context to executor fail")); std::lock_guard l(_task_mutex); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1674afa886d520..e186bd15a098d3 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -226,7 +226,7 @@ class PipelineFragmentContext : public TaskExecutionContext { OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. - std::vector>> _tasks; + std::vector>> _tasks; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9ddc114eb5fb9c..42b6fc948e9561 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -394,7 +394,8 @@ Status PipelineTask::execute(bool* eos) { } } - RETURN_IF_ERROR(get_task_queue()->push_back(this, query_context()->is_cancelled())); + RETURN_IF_ERROR( + get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled())); return Status::OK(); } @@ -554,10 +555,40 @@ std::string PipelineTask::debug_string() { void PipelineTask::wake_up() { // call by dependency - static_cast(get_task_queue()->push_back(this, query_context()->is_cancelled())); + static_cast( + get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled())); +} + +void PipelineTask::clear_blocking_state(bool cancel) { + _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); + // We use a lock to assure all dependencies are not deconstructed here. + std::unique_lock lc(_dependency_lock); + if (!_finalized) { + if (cancel) { + DCHECK(query_context()->is_cancelled()); + wake_up(); + } + + _execution_dep->set_always_ready(); + for (auto* dep : _filter_dependencies) { + dep->set_always_ready(); + } + for (auto& deps : _read_dependencies) { + for (auto* dep : deps) { + dep->set_always_ready(); + } + } + for (auto* dep : _write_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _finish_dependencies) { + dep->set_always_ready(); + } + } } QueryContext* PipelineTask::query_context() { return _fragment_context->get_query_ctx(); } + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1c0c11952df582..01797e077c30c9 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -45,7 +45,7 @@ class MultiCoreTaskQueue; class PriorityTaskQueue; class Dependency; -class PipelineTask { +class PipelineTask final : public std::enable_shared_from_this { public: PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, @@ -137,28 +137,7 @@ class PipelineTask { void set_wake_up_early() { _wake_up_early = true; } - void clear_blocking_state() { - _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); - // We use a lock to assure all dependencies are not deconstructed here. - std::unique_lock lc(_dependency_lock); - if (!_finalized) { - _execution_dep->set_always_ready(); - for (auto* dep : _filter_dependencies) { - dep->set_always_ready(); - } - for (auto& deps : _read_dependencies) { - for (auto* dep : deps) { - dep->set_always_ready(); - } - } - for (auto* dep : _write_dependencies) { - dep->set_always_ready(); - } - for (auto* dep : _finish_dependencies) { - dep->set_always_ready(); - } - } - } + void clear_blocking_state(bool cancel = false); void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; } MultiCoreTaskQueue* get_task_queue() { return _task_queue; } @@ -187,9 +166,9 @@ class PipelineTask { */ static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); - bool put_in_runnable_queue() { + bool put_in_runnable_queue(bool eager) { bool expected = false; - if (!_in_queue.compare_exchange_strong(expected, true)) { + if (!_in_eager_queue.compare_exchange_strong(expected, eager)) { return false; } _schedule_time++; @@ -200,10 +179,10 @@ class PipelineTask { void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } bool is_running() { return _running.load(); } - void set_running(bool running) { + void set_running(bool running, bool eager = false) { _running = running; - if (!running) { - _in_queue = false; + if (running && eager) { + _in_eager_queue = false; } } @@ -328,7 +307,7 @@ class PipelineTask { std::mutex _dependency_lock; std::atomic _running = false; - std::atomic _in_queue = false; + std::atomic _in_eager_queue = false; std::atomic _eos = false; std::atomic _wake_up_early = false; }; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 197609168c698f..e7fe33d1ce13c1 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -29,7 +29,7 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -PipelineTask* SubTaskQueue::try_take(bool is_steal) { +std::shared_ptr SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } @@ -54,7 +54,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { +std::shared_ptr PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -90,13 +90,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { +std::shared_ptr PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return _try_take_unprotected(is_steal); } -PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { +std::shared_ptr PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = _try_take_unprotected(false); if (task) { @@ -111,7 +111,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } } -Status PriorityTaskQueue::push(PipelineTask* task) { +Status PriorityTaskQueue::push(std::shared_ptr& task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } @@ -149,8 +149,8 @@ void MultiCoreTaskQueue::close() { [](auto& prio_task_queue) { prio_task_queue.close(); }); } -PipelineTask* MultiCoreTaskQueue::take(int core_id) { - PipelineTask* task = nullptr; +std::shared_ptr MultiCoreTaskQueue::take(int core_id) { + std::shared_ptr task = nullptr; while (!_closed) { DCHECK(_prio_task_queues.size() > core_id) << " list size: " << _prio_task_queues.size() << " core_id: " << core_id @@ -178,7 +178,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { +std::shared_ptr MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -196,7 +196,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { return nullptr; } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, bool eager) { +Status MultiCoreTaskQueue::push_back(std::shared_ptr task, bool eager) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; @@ -204,12 +204,14 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, bool eager) { return push_back(task, eager ? _eager_queue_idx : core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { +Status MultiCoreTaskQueue::push_back(std::shared_ptr& task, int core_id) { DCHECK(core_id < _core_size || task->query_context()->is_cancelled()) << " core_id: " << core_id << " _core_size: " << _core_size << " task: " << task->debug_string(); - if (!task->put_in_runnable_queue()) { - DCHECK(task->query_context()->is_cancelled()) << task->debug_string(); + auto eager = core_id == _eager_queue_idx; + if (!task->put_in_runnable_queue(eager)) { + DCHECK(task->query_context()->is_cancelled() && !eager) + << " eager: " << eager << " task: " << task->debug_string(); return Status::OK(); } return _prio_task_queues[core_id].push(task); diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 96365af0063269..da7b0b6284ef09 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -38,9 +38,9 @@ class SubTaskQueue { friend class PriorityTaskQueue; public: - void push_back(PipelineTask* task) { _queue.emplace(task); } + void push_back(std::shared_ptr& task) { _queue.emplace(task); } - PipelineTask* try_take(bool is_steal); + std::shared_ptr try_take(bool is_steal); void set_level_factor(double level_factor) { _level_factor = level_factor; } @@ -58,7 +58,7 @@ class SubTaskQueue { bool empty() { return _queue.empty(); } private: - std::queue _queue; + std::queue> _queue; // depends on LEVEL_QUEUE_TIME_FACTOR double _level_factor = 1; @@ -72,18 +72,18 @@ class PriorityTaskQueue { void close(); - PipelineTask* try_take(bool is_steal); + std::shared_ptr try_take(bool is_steal); - PipelineTask* take(uint32_t timeout_ms = 0); + std::shared_ptr take(uint32_t timeout_ms = 0); - Status push(PipelineTask* task); + Status push(std::shared_ptr& task); void inc_sub_queue_runtime(int level, uint64_t runtime) { _sub_queues[level].inc_runtime(runtime); } private: - PipelineTask* _try_take_unprotected(bool is_steal); + std::shared_ptr _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -112,19 +112,20 @@ class MultiCoreTaskQueue { void close(); // Get the task by core id. - PipelineTask* take(int core_id); + std::shared_ptr take(int core_id); // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(PipelineTask* task, bool eager = false); + Status push_back(std::shared_ptr task, bool eager = false); - Status push_back(PipelineTask* task, int core_id); + Status push_back(std::shared_ptr& task, int core_id); void update_statistics(PipelineTask* task, int64_t time_spent); int num_queues() const { return cast_set(_prio_task_queues.size()); } + int eager_queue_idx() const { return _eager_queue_idx; } private: - PipelineTask* _steal_take(int core_id); + std::shared_ptr _steal_take(int core_id); std::vector _prio_task_queues; std::atomic _next_core = 0; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 6420e453a5bd9b..e01e382ecc46ec 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -66,7 +66,7 @@ Status TaskScheduler::start() { return Status::OK(); } -Status TaskScheduler::schedule_task(PipelineTask* task) { +Status TaskScheduler::schedule_task(std::shared_ptr& task) { return _task_queue.push_back(task); } @@ -76,10 +76,6 @@ void _close_task(PipelineTask* task, Status exec_status) { // Should count the memory to the query or the query's memory will not decrease when part of // task finished. SCOPED_ATTACH_TASK(task->runtime_state()); - if (task->is_finalized()) { - task->set_running(false); - return; - } // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. auto lock_for_context = task->fragment_context()->shared_from_this(); @@ -99,7 +95,7 @@ void _close_task(PipelineTask* task, Status exec_status) { void TaskScheduler::_do_work(int index) { while (_markers[index]) { - auto* task = _task_queue.take(index); + auto task = _task_queue.take(index); if (!task) { continue; } @@ -107,8 +103,13 @@ void TaskScheduler::_do_work(int index) { static_cast(_task_queue.push_back(task, index)); continue; } + auto eager = index == _task_queue.eager_queue_idx(); task->log_detail_if_need(); - task->set_running(true); + task->set_running(true, eager); + if (task->is_finalized()) { + task->set_running(false); + continue; + } task->set_task_queue(&_task_queue); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); @@ -122,7 +123,7 @@ void TaskScheduler::_do_work(int index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); + _close_task(task.get(), fragment_ctx->get_query_ctx()->exec_status()); continue; } @@ -165,7 +166,7 @@ void TaskScheduler::_do_work(int index) { LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", print_id(task->query_context()->query_id()), status.to_string()); - _close_task(task, status); + _close_task(task.get(), status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -178,7 +179,7 @@ void TaskScheduler::_do_work(int index) { task->set_running(false); } else { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, exec_status); + _close_task(task.get(), exec_status); } continue; } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 3c1b08063dfa61..33a15149a44aa1 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -51,7 +51,7 @@ class TaskScheduler { ~TaskScheduler(); - Status schedule_task(PipelineTask* task); + Status schedule_task(std::shared_ptr& task); Status start();