diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4f8be8b6f85ad63..26c95a56a3ab1ef 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -394,8 +394,7 @@ Status PipelineTask::execute(bool* eos) { } } - RETURN_IF_ERROR( - get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled())); + RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this())); return Status::OK(); } @@ -555,8 +554,7 @@ std::string PipelineTask::debug_string() { void PipelineTask::wake_up() { // call by dependency - static_cast( - get_task_queue()->push_back(shared_from_this(), query_context()->is_cancelled())); + static_cast(get_task_queue()->push_back(shared_from_this())); } void PipelineTask::clear_blocking_state(bool cancel) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 480ead89aa0ab2c..4151d8177928ae8 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -166,26 +166,15 @@ class PipelineTask final : public std::enable_shared_from_this { */ static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); - bool put_in_runnable_queue(bool eager) { - bool expected = false; - if (!_in_eager_queue.compare_exchange_strong(expected, eager)) { - DCHECK(query_context()->is_cancelled()) << debug_string(); - return false; - } + void put_in_runnable_queue() { _schedule_time++; _wait_worker_watcher.start(); - return true; } void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } bool is_running() { return _running.load(); } - void set_running(bool running, bool eager = false) { - _running = running; - if (running && eager) { - _in_eager_queue = false; - } - } + void set_running(bool running) { _running = running; } bool is_exceed_debug_timeout() { if (_has_exceed_timeout) { @@ -308,7 +297,6 @@ class PipelineTask final : public std::enable_shared_from_this { std::mutex _dependency_lock; std::atomic _running = false; - std::atomic _in_eager_queue = false; std::atomic _eos = false; std::atomic _wake_up_early = false; }; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index d262b971f374439..d70fb4828ba295f 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -137,7 +137,7 @@ MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : _prio_task_queues(core_size + NUM_EAGER_QUEUES), _closed(false), _core_size(core_size), - _eager_queue_idx(core_size) {} + _urgent_queue_idx(core_size) {} void MultiCoreTaskQueue::close() { if (_closed) { @@ -160,7 +160,7 @@ std::shared_ptr MultiCoreTaskQueue::take(int core_id) { task->set_core_id(core_id); break; } - if (core_id != _eager_queue_idx) { + if (core_id != _urgent_queue_idx) { task = _steal_take(core_id); if (task) { break; @@ -196,24 +196,19 @@ std::shared_ptr MultiCoreTaskQueue::_steal_take(int core_id) { return nullptr; } -Status MultiCoreTaskQueue::push_back(std::shared_ptr task, bool eager) { +Status MultiCoreTaskQueue::push_back(std::shared_ptr task) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; } - return push_back(task, eager ? _eager_queue_idx : core_id); + return push_back(task, core_id); } Status MultiCoreTaskQueue::push_back(std::shared_ptr& task, int core_id) { - DCHECK(core_id < _core_size || task->query_context()->is_cancelled()) - << " core_id: " << core_id << " _core_size: " << _core_size - << " task: " << task->debug_string(); - auto eager = core_id == _eager_queue_idx; - if (!task->put_in_runnable_queue(eager)) { - DCHECK(task->query_context()->is_cancelled()) << " task: " << task->debug_string(); - return Status::OK(); - } - return _prio_task_queues[core_id].push(task); + DCHECK(core_id < _core_size); + task->put_in_runnable_queue(); + return _prio_task_queues[task->query_context()->is_cancelled() ? _urgent_queue_idx : core_id] + .push(task); } void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index da7b0b6284ef092..095b6dce2851cc8 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -115,14 +115,13 @@ class MultiCoreTaskQueue { std::shared_ptr take(int core_id); // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(std::shared_ptr task, bool eager = false); + Status push_back(std::shared_ptr task); Status push_back(std::shared_ptr& task, int core_id); void update_statistics(PipelineTask* task, int64_t time_spent); int num_queues() const { return cast_set(_prio_task_queues.size()); } - int eager_queue_idx() const { return _eager_queue_idx; } private: std::shared_ptr _steal_take(int core_id); @@ -132,7 +131,7 @@ class MultiCoreTaskQueue { std::atomic _closed; const int _core_size; - const int _eager_queue_idx; + const int _urgent_queue_idx; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; #include "common/compile_check_end.h" diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index f1bafbe5ea6cb48..d05029ab5e2973c 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -108,9 +108,8 @@ void TaskScheduler::_do_work(int index) { static_cast(_task_queue.push_back(task, index)); continue; } - auto eager = index == _task_queue.eager_queue_idx(); task->log_detail_if_need(); - task->set_running(true, eager); + task->set_running(true); task->set_task_queue(&_task_queue); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled();