Skip to content

Commit

Permalink
Revert "[improvement](scanner_schedule) reduce memory consumption of …
Browse files Browse the repository at this point in the history
…scanner apache#24199 (apache#25547)" (apache#26613)

This reverts commit 9a19581 to investigate ANALYZE TABLE WITH SYNC problem
  • Loading branch information
xiaokang authored Nov 8, 2023
1 parent e8ab033 commit ea676de
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 66 deletions.
3 changes: 1 addition & 2 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -206,7 +205,7 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed. ";
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;

Status result;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
return true;
} else {
if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
_node->_scanner_ctx->should_be_scheduled()) {
_node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
_node->_scanner_ctx->reschedule_scanner_ctx();
}
return _node->ready_to_read(); // there are some blocks to process
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/pretty_printer.h"
#include "util/stack_util.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "util/time.h"
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class PipScannerContext : public vectorized::ScannerContext {
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}

bool has_enough_space_in_blocks_queue() const override {
return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances;
}

void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
Expand Down Expand Up @@ -217,7 +221,8 @@ class PipScannerContext : public vectorized::ScannerContext {
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
_colocate_blocks[loc] = get_free_block();
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
Expand Down
68 changes: 29 additions & 39 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
limit(limit_),
_max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances),
_max_bytes_in_queue(max_bytes_in_blocks_queue_),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
Expand All @@ -63,21 +63,26 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
if (limit < 0) {
limit = -1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
// 1. Calculate max concurrency
// TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
_max_thread_num *= num_parallel_instances;
if (_parent->_shared_scan_opt) {
DCHECK(_num_parallel_instances > 0);
_max_thread_num *= _num_parallel_instances;
}
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_parent->should_run_serial()) {
_max_thread_num = 1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
Expand All @@ -99,9 +104,6 @@ Status ScannerContext::init() {
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
return_free_block(std::move(block));

#ifndef BE_TEST
// 3. get thread token
Expand All @@ -121,33 +123,27 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block() {
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_free_blocks_memory_usage->add(-block->allocated_bytes());
_serving_blocks_num++;
return block;
if (!get_block_not_empty || block->mem_reuse()) {
_free_blocks_capacity--;
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}

block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);

_serving_blocks_num++;
return block;
return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}

void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) {
_serving_blocks_num--;
if (block->mem_reuse()) {
// Only put blocks with schema to free blocks, because colocate blocks
// need schema.
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks.enqueue(std::move(block));
}
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks.enqueue(std::move(block));
++_free_blocks_capacity;
}

void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
Expand Down Expand Up @@ -180,15 +176,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
if (should_be_scheduled() && _num_running_scanners == 0) {
if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
}

// Wait for block from queue
if (wait) {
SCOPED_TIMER(_scanner_wait_batch_timer);
Expand All @@ -212,7 +207,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo

auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;

_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
Expand Down Expand Up @@ -359,13 +353,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_scanners.push_front(scanner);
}
std::lock_guard l(_transfer_lock);

// In pipeline engine, doris will close scanners when `no_schedule`.
// We have to decrease _num_running_scanners before schedule, otherwise
// schedule does not woring due to _num_running_scanners.
_num_running_scanners--;

if (should_be_scheduled()) {
if (has_enough_space_in_blocks_queue()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
Expand All @@ -385,6 +373,8 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
// In pipeline engine, doris will close scanners when `no_schedule`.
_num_running_scanners--;
_ctx_finish_cv.notify_one();
}

Expand All @@ -394,7 +384,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
thread_slot_num = get_available_thread_slot_num();
thread_slot_num = cal_thread_slot_num_by_free_block_num();
}

// 2. get #thread_slot_num scanners from ctx->scanners
Expand Down
28 changes: 10 additions & 18 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ class ScannerContext {
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1);
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0);

virtual ~ScannerContext() = default;
virtual Status init();

vectorized::BlockUPtr get_free_block();
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);

// Append blocks from scanners to the blocks queue.
Expand Down Expand Up @@ -136,25 +136,20 @@ class ScannerContext {
virtual bool empty_in_queue(int id);

// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
inline bool should_be_scheduled() const {
return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
virtual inline bool has_enough_space_in_blocks_queue() const {
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}

int get_available_thread_slot_num() {
int cal_thread_slot_num_by_free_block_num() {
int thread_slot_num = 0;
thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}

int32_t allowed_blocks_num() const {
int32_t blocks_num = std::min(_free_blocks_capacity,
int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes));
return blocks_num;
}

void reschedule_scanner_ctx();

// the unique id of this context
Expand Down Expand Up @@ -208,12 +203,10 @@ class ScannerContext {

// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
std::atomic<int32_t> _serving_blocks_num = 0;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
int32_t _free_blocks_capacity = 0;
int64_t _estimated_block_bytes = 0;
std::atomic_int32_t _free_blocks_capacity = 0;

int _batch_size;
// The limit from SQL's limit clause
Expand All @@ -238,7 +231,6 @@ class ScannerContext {
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
const int64_t _max_bytes_in_queue;
std::atomic<int64_t> _bytes_allocated = 0;

doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
bool has_free_block = true;
int num_rows_in_block = 0;

// Only set to true when ctx->done() return true.
Expand All @@ -330,16 +331,17 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
// queue, it will affect query latency and query concurrency for example ssb 3.3.
while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold &&
num_rows_in_block < state->batch_size()) {
while (!eos && raw_bytes_read < raw_bytes_threshold &&
((raw_rows_read < raw_rows_threshold && has_free_block) ||
num_rows_in_block < state->batch_size())) {
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
// Because done() maybe caused by "should_stop"
should_stop = true;
break;
}

BlockUPtr block = ctx->get_free_block();
BlockUPtr block = ctx->get_free_block(&has_free_block);
status = scanner->get_block(state, block.get(), &eos);
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos;
// The VFileScanner for external table may try to open not exist files,
Expand All @@ -355,11 +357,12 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete and fe cache has not been updated yet.
// Set status to OK.
LOG(INFO) << "scan range not found: " << scanner->get_current_scan_range_name();
status = Status::OK();
eos = true;
}

raw_bytes_read += block->allocated_bytes();
raw_bytes_read += block->bytes();
num_rows_in_block += block->rows();
if (UNLIKELY(block->rows() == 0)) {
ctx->return_free_block(std::move(block));
Expand Down Expand Up @@ -394,6 +397,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}

ctx->push_back_scanner_and_reschedule(scanner);
}

Expand Down

0 comments on commit ea676de

Please sign in to comment.