Skip to content

Commit

Permalink
reserve bock in scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Nov 18, 2024
1 parent 815134f commit 5551491
Showing 8 changed files with 68 additions and 27 deletions.
43 changes: 23 additions & 20 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
@@ -426,7 +426,7 @@ Status PipelineTask::execute(bool* eos) {
_root->reset_reserve_mem_size(_state);

auto workload_group = _state->get_query_ctx()->workload_group();
if (workload_group && reserve_size > 0) {
if (workload_group && _state->enable_reserve_memory() && reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);

COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -458,25 +458,28 @@ Status PipelineTask::execute(bool* eos) {
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos);
status = thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: "
<< PrettyPrinter::print(sink_reserve_size, TUnit::BYTES)
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << ", task id: " << _state->task_id()
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
_state->get_query_ctx()->set_low_memory_mode();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size);
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
_eos = *eos;
*eos = false;
continue;
if (_state->enable_reserve_memory()) {
status = thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: "
<< PrettyPrinter::print(sink_reserve_size, TUnit::BYTES)
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id()
<< ", task id: " << _state->task_id()
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
_state->get_query_ctx()->set_low_memory_mode();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size);
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
_eos = *eos;
*eos = false;
continue;
}
}

// Define a lambda function to catch sink exception, because sink will check
4 changes: 4 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
@@ -561,6 +561,10 @@ class RuntimeState {
std::shared_ptr<IRuntimeFilter>* producer_filter);
bool is_nereids() const;

bool enable_reserve_memory() const {
return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory;
}

bool enable_join_spill() const {
return (_query_options.__isset.enable_force_spill && _query_options.enable_force_spill) ||
(_query_options.__isset.enable_join_spill && _query_options.enable_join_spill);
20 changes: 16 additions & 4 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
@@ -222,7 +222,7 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr ScannerContext::get_free_block(size_t block_avg_bytes, bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
@@ -232,9 +232,21 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
// The caller of get_free_block will increase the memory usage
update_peak_memory_usage(-block->allocated_bytes());
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
Status status;
if (!force && _state->enable_reserve_memory()) {
status = thread_context()->try_reserve_memory(block_avg_bytes);
if (!status.ok()) {
LOG(INFO) << "query: " << print_id(_query_id) << ", scanner try to reserve: "
<< PrettyPrinter::print(block_avg_bytes, TUnit::BYTES)
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
}
}
if (status.ok()) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
}
}
return block;
}
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
}
Status init();

vectorized::BlockUPtr get_free_block(bool force);
vectorized::BlockUPtr get_free_block(size_t block_avg_bytes, bool force);
void return_free_block(vectorized::BlockUPtr block);
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }

9 changes: 8 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -277,6 +277,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
bool first_read = true;
// If the first block is full, then it is true. Or the first block + second block > batch_size
bool has_first_full_block = false;
size_t block_avg_bytes = ctx->batch_size();

// During low memory mode, every scan task will return at most 2 block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -291,7 +292,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
BlockUPtr free_block = ctx->get_free_block(first_read);
BlockUPtr free_block = ctx->get_free_block(block_avg_bytes, first_read);
if (free_block == nullptr) {
break;
}
@@ -338,6 +339,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (scan_task->cached_blocks.back().first->rows() > 0) {
block_avg_bytes = (scan_task->cached_blocks.back().first->allocated_bytes() +
scan_task->cached_blocks.back().first->rows() - 1) /
scan_task->cached_blocks.back().first->rows() *
ctx->batch_size();
}
} // end for while

if (UNLIKELY(!status.ok())) {
4 changes: 3 additions & 1 deletion be/src/vec/spill/spill_stream.cpp
Original file line number Diff line number Diff line change
@@ -116,7 +116,9 @@ Status SpillStream::prepare() {
}

SpillReaderUPtr SpillStream::create_separate_reader() const {
return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
return std::make_unique<SpillReader>(
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
writer_->get_file_path());
}

const TUniqueId& SpillStream::query_id() const {
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
@@ -552,6 +552,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold";
public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits";
public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit";
public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
@@ -2132,6 +2133,14 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;

@VariableMgr.VarAttr(
name = ENABLE_RESERVE_MEMORY,
description = {"控制是否启用分配内存前先reverve memory的功能。默认为 false。",
"Controls whether to enable reserve memory before allocating memory. "
+ "The default value is false."},
needForward = true, fuzzy = true)
public boolean enableReserveMemory = false;

@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,
description = {"控制是否启用join算子落盘。默认为 false。",
@@ -3863,6 +3872,7 @@ public TQueryOptions toThrift() {
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setSkipBadTablet(skipBadTablet);
tResult.setDisableFileCache(disableFileCache);
tResult.setEnableReserveMemory(enableReserveMemory);
tResult.setEnableJoinSpill(enableJoinSpill);
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
@@ -356,6 +356,9 @@ struct TQueryOptions {
139: optional i32 query_slot_count = 0;

140: optional bool enable_auto_create_when_overwrite = false;

141: optional bool enable_reserve_memory = false

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.

0 comments on commit 5551491

Please sign in to comment.