diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5ae89db55a45ac..a08ec185cd6ded 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -445,14 +445,14 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); task_runtime_state->set_task_num(pipeline->num_tasks()); - auto task = std::make_unique(pipeline, cur_task_id, - task_runtime_state.get(), this, - pipeline_id_to_profile[pip_idx].get(), - get_local_exchange_state(pipeline), i); + auto task = std::shared_ptr( + new PipelineTask(pipeline, cur_task_id, task_runtime_state.get(), this, + pipeline_id_to_profile[pip_idx].get(), + get_local_exchange_state(pipeline), i)); pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); - _tasks[i].emplace_back(std::move(task)); + _tasks[i].emplace_back(task); } } @@ -1679,7 +1679,7 @@ Status PipelineFragmentContext::submit() { auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { for (auto& t : task) { - st = scheduler->schedule_task(t.get()); + st = scheduler->schedule_task(t); if (!st) { cancel(Status::InternalError("submit context to executor fail")); std::lock_guard l(_task_mutex); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1674afa886d520..e186bd15a098d3 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -226,7 +226,7 @@ class PipelineFragmentContext : public TaskExecutionContext { OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. - std::vector>> _tasks; + std::vector>> _tasks; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5ed725010ec364..120035ee690c5f 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -394,7 +394,7 @@ Status PipelineTask::execute(bool* eos) { } } - RETURN_IF_ERROR(get_task_queue()->push_back(this)); + RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this())); return Status::OK(); } @@ -554,10 +554,34 @@ std::string PipelineTask::debug_string() { void PipelineTask::wake_up() { // call by dependency - static_cast(get_task_queue()->push_back(this)); + static_cast(get_task_queue()->push_back(shared_from_this())); +} + +void PipelineTask::clear_blocking_state() { + _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); + // We use a lock to assure all dependencies are not deconstructed here. + std::unique_lock lc(_dependency_lock); + if (!_finalized) { + _execution_dep->set_always_ready(); + for (auto* dep : _filter_dependencies) { + dep->set_always_ready(); + } + for (auto& deps : _read_dependencies) { + for (auto* dep : deps) { + dep->set_always_ready(); + } + } + for (auto* dep : _write_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _finish_dependencies) { + dep->set_always_ready(); + } + } } QueryContext* PipelineTask::query_context() { return _fragment_context->get_query_ctx(); } + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1a31e5954f479c..abc2f54b016d88 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -45,7 +45,7 @@ class MultiCoreTaskQueue; class PriorityTaskQueue; class Dependency; -class PipelineTask { +class PipelineTask final : public std::enable_shared_from_this { public: PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, @@ -137,28 +137,7 @@ class PipelineTask { void set_wake_up_early() { _wake_up_early = true; } - void clear_blocking_state() { - _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); - // We use a lock to assure all dependencies are not deconstructed here. - std::unique_lock lc(_dependency_lock); - if (!_finalized) { - _execution_dep->set_always_ready(); - for (auto* dep : _filter_dependencies) { - dep->set_always_ready(); - } - for (auto& deps : _read_dependencies) { - for (auto* dep : deps) { - dep->set_always_ready(); - } - } - for (auto* dep : _write_dependencies) { - dep->set_always_ready(); - } - for (auto* dep : _finish_dependencies) { - dep->set_always_ready(); - } - } - } + void clear_blocking_state(); void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; } MultiCoreTaskQueue* get_task_queue() { return _task_queue; } diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ea812ca9b12dd6..3523b4472d063e 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -29,7 +29,7 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -PipelineTask* SubTaskQueue::try_take(bool is_steal) { +std::shared_ptr SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } @@ -54,7 +54,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { +std::shared_ptr PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -90,13 +90,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { +std::shared_ptr PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return _try_take_unprotected(is_steal); } -PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { +std::shared_ptr PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = _try_take_unprotected(false); if (task) { @@ -111,7 +111,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } } -Status PriorityTaskQueue::push(PipelineTask* task) { +Status PriorityTaskQueue::push(std::shared_ptr& task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } @@ -132,8 +132,12 @@ Status PriorityTaskQueue::push(PipelineTask* task) { MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; +static constexpr int NUM_EAGER_QUEUES = 1; MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) - : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {} + : _prio_task_queues(core_size + NUM_EAGER_QUEUES), + _closed(false), + _core_size(core_size), + _urgent_queue_idx(core_size) {} void MultiCoreTaskQueue::close() { if (_closed) { @@ -145,8 +149,8 @@ void MultiCoreTaskQueue::close() { [](auto& prio_task_queue) { prio_task_queue.close(); }); } -PipelineTask* MultiCoreTaskQueue::take(int core_id) { - PipelineTask* task = nullptr; +std::shared_ptr MultiCoreTaskQueue::take(int core_id) { + std::shared_ptr task = nullptr; while (!_closed) { DCHECK(_prio_task_queues.size() > core_id) << " list size: " << _prio_task_queues.size() << " core_id: " << core_id @@ -156,9 +160,11 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { task->set_core_id(core_id); break; } - task = _steal_take(core_id); - if (task) { - break; + if (core_id != _urgent_queue_idx) { + task = _steal_take(core_id); + if (task) { + break; + } } task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { @@ -172,7 +178,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { +std::shared_ptr MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -190,7 +196,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { return nullptr; } -Status MultiCoreTaskQueue::push_back(PipelineTask* task) { +Status MultiCoreTaskQueue::push_back(std::shared_ptr task) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; @@ -198,10 +204,11 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { - DCHECK(core_id < _core_size); +Status MultiCoreTaskQueue::push_back(std::shared_ptr& task, int core_id) { + DCHECK(core_id < _core_size || task->query_context()->is_cancelled()); task->put_in_runnable_queue(); - return _prio_task_queues[core_id].push(task); + return _prio_task_queues[task->query_context()->is_cancelled() ? _urgent_queue_idx : core_id] + .push(task); } void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 1651eb50cac4ab..095b6dce2851cc 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -38,9 +38,9 @@ class SubTaskQueue { friend class PriorityTaskQueue; public: - void push_back(PipelineTask* task) { _queue.emplace(task); } + void push_back(std::shared_ptr& task) { _queue.emplace(task); } - PipelineTask* try_take(bool is_steal); + std::shared_ptr try_take(bool is_steal); void set_level_factor(double level_factor) { _level_factor = level_factor; } @@ -58,7 +58,7 @@ class SubTaskQueue { bool empty() { return _queue.empty(); } private: - std::queue _queue; + std::queue> _queue; // depends on LEVEL_QUEUE_TIME_FACTOR double _level_factor = 1; @@ -72,18 +72,18 @@ class PriorityTaskQueue { void close(); - PipelineTask* try_take(bool is_steal); + std::shared_ptr try_take(bool is_steal); - PipelineTask* take(uint32_t timeout_ms = 0); + std::shared_ptr take(uint32_t timeout_ms = 0); - Status push(PipelineTask* task); + Status push(std::shared_ptr& task); void inc_sub_queue_runtime(int level, uint64_t runtime) { _sub_queues[level].inc_runtime(runtime); } private: - PipelineTask* _try_take_unprotected(bool is_steal); + std::shared_ptr _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -112,25 +112,26 @@ class MultiCoreTaskQueue { void close(); // Get the task by core id. - PipelineTask* take(int core_id); + std::shared_ptr take(int core_id); // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(PipelineTask* task); + Status push_back(std::shared_ptr task); - Status push_back(PipelineTask* task, int core_id); + Status push_back(std::shared_ptr& task, int core_id); void update_statistics(PipelineTask* task, int64_t time_spent); - int cores() const { return _core_size; } + int num_queues() const { return cast_set(_prio_task_queues.size()); } private: - PipelineTask* _steal_take(int core_id); + std::shared_ptr _steal_take(int core_id); std::vector _prio_task_queues; std::atomic _next_core = 0; std::atomic _closed; - int _core_size; + const int _core_size; + const int _urgent_queue_idx; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; #include "common/compile_check_end.h" diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 45898e764175b2..d05029ab5e2973 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -51,27 +51,27 @@ TaskScheduler::~TaskScheduler() { } Status TaskScheduler::start() { - int cores = _task_queue.cores(); + int num_queues = _task_queue.num_queues(); RETURN_IF_ERROR(ThreadPoolBuilder(_name) - .set_min_threads(cores) - .set_max_threads(cores) + .set_min_threads(num_queues) + .set_max_threads(num_queues) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); - LOG_INFO("TaskScheduler set cores").tag("size", cores); - _markers.resize(cores, true); - for (int i = 0; i < cores; ++i) { + LOG_INFO("TaskScheduler set cores").tag("size", num_queues); + _markers.resize(num_queues, true); + for (int i = 0; i < num_queues; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } return Status::OK(); } -Status TaskScheduler::schedule_task(PipelineTask* task) { +Status TaskScheduler::schedule_task(std::shared_ptr& task) { return _task_queue.push_back(task); } // after _close_task, task maybe destructed. -void _close_task(PipelineTask* task, Status exec_status) { +void _close_task(std::shared_ptr task, Status exec_status) { // Has to attach memory tracker here, because the close task will also release some memory. // Should count the memory to the query or the query's memory will not decrease when part of // task finished. @@ -95,11 +95,12 @@ void _close_task(PipelineTask* task, Status exec_status) { task->finalize(); task->set_running(false); task->fragment_context()->close_a_pipeline(task->pipeline_id()); + task.reset(); } void TaskScheduler::_do_work(int index) { while (_markers[index]) { - auto* task = _task_queue.take(index); + auto task = _task_queue.take(index); if (!task) { continue; } @@ -122,7 +123,7 @@ void TaskScheduler::_do_work(int index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); + _close_task(std::move(task), fragment_ctx->get_query_ctx()->exec_status()); continue; } @@ -165,7 +166,7 @@ void TaskScheduler::_do_work(int index) { LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", print_id(task->query_context()->query_id()), status.to_string()); - _close_task(task, status); + _close_task(std::move(task), status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -178,7 +179,7 @@ void TaskScheduler::_do_work(int index) { task->set_running(false); } else { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, exec_status); + _close_task(std::move(task), exec_status); } continue; } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 3c1b08063dfa61..33a15149a44aa1 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -51,7 +51,7 @@ class TaskScheduler { ~TaskScheduler(); - Status schedule_task(PipelineTask* task); + Status schedule_task(std::shared_ptr& task); Status start();