diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index bb27012b3d4994..add92cabf37065 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -86,7 +86,6 @@ void PipelineTask::_fresh_profile_counter() { COUNTER_SET(_schedule_counts, (int64_t)_schedule_time); COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time()); COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time()); - COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time()); COUNTER_SET(_begin_execute_timer, _begin_execute_time); COUNTER_SET(_eos_timer, _eos_time); COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time); @@ -117,7 +116,6 @@ void PipelineTask::_init_profile() { _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime"); _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); - _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT); _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 4311c48f3258cf..b8b8e89215f565 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -133,8 +133,6 @@ class PipelineTask { _wait_worker_watcher.start(); } void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } - void start_schedule_watcher() { _wait_schedule_watcher.start(); } - void stop_schedule_watcher() { _wait_schedule_watcher.stop(); } PipelineTaskState get_state() { return _cur_state; } void set_state(PipelineTaskState state); @@ -311,8 +309,6 @@ class PipelineTask { MonotonicStopWatch _wait_worker_watcher; RuntimeProfile::Counter* _wait_worker_timer; // TODO we should calculate the time between when really runnable and runnable - MonotonicStopWatch _wait_schedule_watcher; - RuntimeProfile::Counter* _wait_schedule_timer; RuntimeProfile::Counter* _yield_counts; RuntimeProfile::Counter* _core_change_times; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e4a4ec38af9f16..c4278c38077cdc 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -85,7 +85,6 @@ void BlockedTaskScheduler::_schedule() { _started.store(true); std::list local_blocked_tasks; int empty_times = 0; - std::vector ready_tasks; while (!_shutdown) { { @@ -105,6 +104,7 @@ void BlockedTaskScheduler::_schedule() { } } + auto origin_local_block_tasks_size = local_blocked_tasks.size(); auto iter = local_blocked_tasks.begin(); vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time(); while (iter != local_blocked_tasks.end()) { @@ -116,57 +116,52 @@ void BlockedTaskScheduler::_schedule() { VLOG_DEBUG << "Task pending" << task->debug_string(); iter++; } else { - _make_task_run(local_blocked_tasks, iter, ready_tasks, - PipelineTaskState::PENDING_FINISH); + _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH); } } else if (task->query_context()->is_cancelled()) { - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } else if (task->query_context()->is_timeout(now)) { LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id()) << ", instance_id=" << print_id(task->instance_id()) << ", task info: " << task->debug_string(); task->query_context()->cancel(true, "", Status::Cancelled("")); - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { if (task->has_dependency()) { iter++; } else { - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } } else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { if (task->source_can_read()) { - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } else { iter++; } } else if (state == PipelineTaskState::BLOCKED_FOR_RF) { if (task->runtime_filters_are_ready_or_timeout()) { - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } else { iter++; } } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) { if (task->sink_can_write()) { - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } else { iter++; } } else { // TODO: DCHECK the state - _make_task_run(local_blocked_tasks, iter, ready_tasks); + _make_task_run(local_blocked_tasks, iter); } } - if (ready_tasks.empty()) { + if (origin_local_block_tasks_size == 0 || + local_blocked_tasks.size() == origin_local_block_tasks_size) { empty_times += 1; } else { empty_times = 0; - for (auto& task : ready_tasks) { - task->stop_schedule_watcher(); - _task_queue->push_back(task); - } - ready_tasks.clear(); } if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) { @@ -186,13 +181,11 @@ void BlockedTaskScheduler::_schedule() { void BlockedTaskScheduler::_make_task_run(std::list& local_tasks, std::list::iterator& task_itr, - std::vector& ready_tasks, PipelineTaskState t_state) { auto task = *task_itr; - task->start_schedule_watcher(); task->set_state(t_state); local_tasks.erase(task_itr++); - ready_tasks.emplace_back(task); + _task_queue->push_back(task); } TaskScheduler::~TaskScheduler() { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index b9d3dfbac3cee6..13b9e734d699d4 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -71,7 +71,6 @@ class BlockedTaskScheduler { void _schedule(); void _make_task_run(std::list& local_tasks, std::list::iterator& task_itr, - std::vector& ready_tasks, PipelineTaskState state = PipelineTaskState::RUNNABLE); }; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 0c11314fb2d295..09ec0e3a5531f3 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -70,12 +70,17 @@ class PipScannerContext : public vectorized::ScannerContext { } { - if (!_blocks_queues[id].try_dequeue(*block)) { + std::unique_lock l(*_queue_mutexs[id]); + if (_blocks_queues[id].empty()) { *eos = _is_finished || _should_stop; return Status::OK(); - } - if (_blocks_queues[id].size_approx() == 0 && _data_dependency) { - _data_dependency->block_reading(); + } else { + *block = std::move(_blocks_queues[id].front()); + _blocks_queues[id].pop_front(); + + if (_blocks_queues[id].empty() && _data_dependency) { + _data_dependency->block_reading(); + } } } _current_used_bytes -= (*block)->allocated_bytes(); @@ -133,8 +138,9 @@ class PipScannerContext : public vectorized::ScannerContext { for (int i = 0; i < queue_size && i < block_size; ++i) { int queue = _next_queue_to_feed; { + std::lock_guard l(*_queue_mutexs[queue]); for (int j = i; j < block_size; j += queue_size) { - _blocks_queues[queue].enqueue(std::move(blocks[j])); + _blocks_queues[queue].emplace_back(std::move(blocks[j])); } } _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; @@ -146,11 +152,15 @@ class PipScannerContext : public vectorized::ScannerContext { _current_used_bytes += local_bytes; } - bool empty_in_queue(int id) override { return _blocks_queues[id].size_approx() == 0; } + bool empty_in_queue(int id) override { + std::unique_lock l(*_queue_mutexs[id]); + return _blocks_queues[id].empty(); + } Status init() override { for (int i = 0; i < _num_parallel_instances; ++i) { - _blocks_queues.emplace_back(moodycamel::ConcurrentQueue()); + _queue_mutexs.emplace_back(std::make_unique()); + _blocks_queues.emplace_back(std::list()); } RETURN_IF_ERROR(ScannerContext::init()); if (_need_colocate_distribute) { @@ -182,9 +192,10 @@ class PipScannerContext : public vectorized::ScannerContext { void _dispose_coloate_blocks_not_in_queue() override { if (_need_colocate_distribute) { for (int i = 0; i < _num_parallel_instances; ++i) { + std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]); if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) { _current_used_bytes += _colocate_blocks[i]->allocated_bytes(); - _blocks_queues[i].enqueue(std::move(_colocate_blocks[i])); + _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); _colocate_mutable_blocks[i]->clear(); } if (_data_dependency) { @@ -198,14 +209,15 @@ class PipScannerContext : public vectorized::ScannerContext { auto res = ScannerContext::debug_string(); for (int i = 0; i < _blocks_queues.size(); ++i) { res += " queue " + std::to_string(i) + ":size " + - std::to_string(_blocks_queues[i].size_approx()); + std::to_string(_blocks_queues[i].size()); } return res; } private: int _next_queue_to_feed = 0; - std::vector> _blocks_queues; + std::vector> _queue_mutexs; + std::vector> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; const std::vector& _col_distribute_ids; @@ -238,7 +250,10 @@ class PipScannerContext : public vectorized::ScannerContext { if (row_add == max_add) { _current_used_bytes += _colocate_blocks[loc]->allocated_bytes(); - _blocks_queues[loc].enqueue(std::move(_colocate_blocks[loc])); + { + std::lock_guard queue_l(*_queue_mutexs[loc]); + _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); + } if (_data_dependency) { _data_dependency->set_ready_for_read(); }