Skip to content

Commit

Permalink
[feature](spill) Implement spill to disk for hash join, aggregation a…
Browse files Browse the repository at this point in the history
…nd sort for pipelineX (apache#31910)

Co-authored-by: Jerry Hu <[email protected]>
  • Loading branch information
jacktengg and mrhhsg authored Mar 11, 2024
1 parent 340427c commit 3e07897
Show file tree
Hide file tree
Showing 67 changed files with 5,043 additions and 715 deletions.
9 changes: 9 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,15 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");

// create tablet in partition random robin idx lru size, default 10000
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

Expand Down
8 changes: 8 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,14 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);

// create tablet in partition random robin idx lru size, default 10000
DECLARE_Int32(partition_disk_index_lru_size);
DECLARE_String(spill_storage_root_path);
DECLARE_mInt64(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
11 changes: 0 additions & 11 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/block_spill_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand Down Expand Up @@ -359,13 +358,6 @@ void Daemon::calculate_metrics_thread() {
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
}

// clean up stale spilled files
void Daemon::block_spill_gc_thread() {
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60))) {
ExecEnv::GetInstance()->block_spill_mgr()->gc(200);
}
}

void Daemon::report_runtime_query_statistics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
Expand Down Expand Up @@ -414,9 +406,6 @@ void Daemon::start() {
[this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
}
st = Thread::create(
"Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); },
&_threads.emplace_back());
st = Thread::create(
"Daemon", "je_purge_dirty_pages_thread",
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
Expand Down
1 change: 0 additions & 1 deletion be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class Daemon {
void memory_gc_thread();
void memtable_memory_limiter_tracker_refresh_thread();
void calculate_metrics_thread();
void block_spill_gc_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();

Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ DataDir::~DataDir() {
delete _meta;
}

Status DataDir::init() {
Status DataDir::init(bool init_meta) {
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
if (!exists) {
Expand All @@ -153,7 +153,9 @@ Status DataDir::init() {
RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity_and_create_shards(),
"_init_capacity_and_create_shards failed");
RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
if (init_meta) {
RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
}

_is_used = true;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DataDir {
TStorageMedium::type storage_medium = TStorageMedium::HDD);
~DataDir();

Status init();
Status init(bool init_meta = true);
void stop_bg_worker();

const std::string& path() const { return _path; }
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ static const std::string ERROR_LOG_PREFIX = "error_log";
static const std::string PENDING_DELTA_PREFIX = "pending_delta";
static const std::string INCREMENTAL_DELTA_PREFIX = "incremental_delta";
static const std::string CLONE_PREFIX = "clone";
static const std::string SPILL_DIR_PREFIX = "spill";
static const std::string SPILL_GC_DIR_PREFIX = "spill_gc";

// define paths
static inline std::string remote_tablet_path(int64_t tablet_id) {
Expand Down
113 changes: 32 additions & 81 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
Base::_shared_state->total_size_of_aggregate_states = p._total_size_of_aggregate_states;
Base::_shared_state->offsets_of_aggregate_states = p._offsets_of_aggregate_states;
Base::_shared_state->make_nullable_keys = p._make_nullable_keys;
Base::_shared_state->init_spill_partition_helper(p._spill_partition_count_bits);
for (auto& evaluator : p._aggregate_evaluators) {
Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
}
Expand All @@ -86,7 +85,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
Expand Down Expand Up @@ -200,6 +198,9 @@ Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) {
}

size_t AggSinkLocalState::_memory_usage() const {
if (0 == _get_hash_table_size()) {
return 0;
}
size_t usage = 0;
if (_agg_arena_pool) {
usage += _agg_arena_pool->size();
Expand All @@ -209,6 +210,13 @@ size_t AggSinkLocalState::_memory_usage() const {
usage += Base::_shared_state->aggregate_data_container->memory_usage();
}

std::visit(
[&](auto&& agg_method) -> void {
auto data = agg_method.hash_table;
usage += data->get_buffer_size_in_bytes();
},
_agg_data->method_variant);

return usage;
}

Expand Down Expand Up @@ -274,7 +282,8 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b

for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
int col_id = _get_slot_column_id(Base::_shared_state->aggregate_evaluators[i]);
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
Expand Down Expand Up @@ -317,7 +326,8 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
if constexpr (for_spill) {
col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
} else {
col_id = _get_slot_column_id(Base::_shared_state->aggregate_evaluators[i]);
col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
}
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
Expand Down Expand Up @@ -361,23 +371,13 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
return Status::OK();
}

// We should call this function only at 1st phase.
// 1st phase: is_merge=true, only have one SlotRef.
// 2nd phase: is_merge=false, maybe have multiple exprs.
int AggSinkLocalState::_get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) {
auto ctxs = evaluator->input_exprs_ctxs();
CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
<< "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
<< ctxs[0]->root()->debug_string();
return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id();
}

Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
SCOPED_TIMER(_merge_timer);
DCHECK(_agg_data->without_key != nullptr);
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
int col_id = _get_slot_column_id(Base::_shared_state->aggregate_evaluators[i]);
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
Expand Down Expand Up @@ -465,7 +465,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
_places.data(), _agg_arena_pool));
}

if (_should_limit_output) {
if (_should_limit_output && !Base::_shared_state->enable_spill) {
_reach_limit = _get_hash_table_size() >=
Base::_parent->template cast<AggSinkOperatorX>()._limit;
if (_reach_limit &&
Expand All @@ -479,14 +479,14 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
return Status::OK();
}

size_t AggSinkLocalState::_get_hash_table_size() {
size_t AggSinkLocalState::_get_hash_table_size() const {
return std::visit([&](auto&& agg_method) { return agg_method.hash_table->size(); },
_agg_data->method_variant);
}

void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
size_t num_rows) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
Expand Down Expand Up @@ -619,56 +619,6 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p
}
}

Status AggSinkLocalState::_reset_hash_table() {
auto& ss = *Base::_shared_state;
return std::visit(
[&](auto&& agg_method) {
auto& hash_table = *agg_method.hash_table;
using HashTableType = std::decay_t<decltype(hash_table)>;

agg_method.reset();

hash_table.for_each_mapped([&](auto& mapped) {
if (mapped) {
static_cast<void>(_destroy_agg_status(mapped));
mapped = nullptr;
}
});

ss.aggregate_data_container.reset(new vectorized::AggregateDataContainer(
sizeof(typename HashTableType::key_type),
((ss.total_size_of_aggregate_states + ss.align_aggregate_states - 1) /
ss.align_aggregate_states) *
ss.align_aggregate_states));
agg_method.hash_table.reset(new HashTableType());
ss.agg_arena_pool.reset(new vectorized::Arena);
return Status::OK();
},
ss.agg_data->method_variant);
}

Status AggSinkLocalState::try_spill_disk(bool eos) {
if (Base::_parent->template cast<AggSinkOperatorX>()._external_agg_bytes_threshold == 0) {
return Status::OK();
}
return std::visit(
[&](auto&& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
if (!eos && _memory_usage() < Base::_parent->template cast<AggSinkOperatorX>()
._external_agg_bytes_threshold) {
return Status::OK();
}

if (_get_hash_table_size() == 0) {
return Status::OK();
}

RETURN_IF_ERROR(_spill_hash_table(agg_method, hash_table));
return _reset_hash_table();
},
_agg_data->method_variant);
}

AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
Expand Down Expand Up @@ -712,13 +662,6 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
const auto& agg_functions = tnode.agg_node.aggregate_functions;
_external_agg_bytes_threshold = state->external_agg_bytes_threshold();

if (_external_agg_bytes_threshold > 0) {
_spill_partition_count_bits = 4;
if (state->query_options().__isset.external_agg_partition_bits) {
_spill_partition_count_bits = state->query_options().external_agg_partition_bits;
}
}

_is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
[](const auto& e) { return e.nodes[0].agg_expr.is_merge_agg; });

Expand Down Expand Up @@ -796,19 +739,27 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_
local_state._shared_state->input_num_rows += in_block->rows();
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state._executor->execute(&local_state, in_block));
RETURN_IF_ERROR(local_state.try_spill_disk());
local_state._executor->update_memusage(&local_state);
}
if (eos) {
if (local_state._shared_state->spill_context.has_data) {
RETURN_IF_ERROR(local_state.try_spill_disk(true));
RETURN_IF_ERROR(local_state._shared_state->spill_context.prepare_for_reading());
}
local_state._dependency->set_ready_to_read();
}
return Status::OK();
}

size_t AggSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state._memory_usage();
}

Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) {
auto& local_state = get_local_state(state);
auto& ss = *local_state.Base::_shared_state;
RETURN_IF_ERROR(ss.reset_hash_table());
local_state._agg_arena_pool = ss.agg_arena_pool.get();
return Status::OK();
}

Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
Expand Down
Loading

0 comments on commit 3e07897

Please sign in to comment.