diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5723a58bea0660..d3688507fcd52f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -43,6 +43,8 @@ namespace doris::vectorized { +using namespace std::chrono_literals; + ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, @@ -217,7 +219,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - if (should_be_scheduled() && _num_running_scanners == 0) { + int64_t cur_bytes_in_queue = _cur_bytes_in_queue; + int32_t serving_blocks_num = _serving_blocks_num; + bool to_be_schedule = should_be_scheduled(); + int num_running_scanners = _num_running_scanners; + + bool is_scheduled = false; + if (to_be_schedule && _num_running_scanners == 0) { + is_scheduled = true; auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -235,7 +244,13 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo SCOPED_TIMER(_scanner_wait_batch_timer); while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || state->is_cancelled())) { - _blocks_queue_added_cv.wait(l); + if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { + LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue + << ", serving_blocks_num " << serving_blocks_num + << ", num_running_scanners " << num_running_scanners + << ", to_be_scheudle " << to_be_schedule << (void*)this; + } + _blocks_queue_added_cv.wait_for(l, 1s); } } @@ -297,10 +312,14 @@ void ScannerContext::set_should_stop() { _blocks_queue_added_cv.notify_one(); } -void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc) { +void ScannerContext::inc_num_running_scanners(int32_t inc) { + std::lock_guard l(_transfer_lock); + _num_running_scanners += inc; +} + +void ScannerContext::dec_num_scheduling_ctx() { std::lock_guard l(_transfer_lock); - _num_running_scanners += scanner_inc; - _num_scheduling_ctx += sched_inc; + _num_scheduling_ctx--; if (_finish_dependency) { if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { _finish_dependency->set_ready_to_finish(); @@ -308,8 +327,10 @@ void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc) _finish_dependency->block_finishing(); } } - _blocks_queue_added_cv.notify_one(); - _ctx_finish_cv.notify_one(); + + if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { + _ctx_finish_cv.notify_one(); + } } bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 932fd294ff93e1..244aedf87a3eb1 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -118,11 +118,12 @@ class ScannerContext { // Return true if this ScannerContext need no more process virtual bool done() { return _is_finished || _should_stop; } - // Update the running num of scanners and contexts - void update_num_running(int32_t scanner_inc, int32_t sched_inc); + void inc_num_running_scanners(int32_t scanner_inc); int get_num_running_scanners() const { return _num_running_scanners; } + void dec_num_scheduling_ctx(); + int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } void get_next_batch_of_scanners(std::list* current_run); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 6c40ccc242e135..2942114d432958 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -202,7 +202,10 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { watch.start(); ctx->incr_num_ctx_scheduling(1); size_t size = 0; - Defer defer {[&]() { ctx->update_num_running(size, -1); }}; + Defer defer {[&]() { + ctx->incr_num_scanner_scheduling(size); + ctx->dec_num_scheduling_ctx(); + }}; if (ctx->done()) { return; @@ -221,12 +224,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { return; } + ctx->inc_num_running_scanners(this_run.size()); + // Submit scanners to thread pool // TODO(cmy): How to handle this "nice"? int nice = 1; auto iter = this_run.begin(); auto submit_to_thread_pool = [&] { - ctx->incr_num_scanner_scheduling(this_run.size()); if (ctx->thread_token != nullptr) { // TODO llj tg how to treat this? while (iter != this_run.end()) {