From eb9079044b34f24386eb28adc250e39c6fb84078 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Fri, 15 Mar 2024 12:01:25 +0800 Subject: [PATCH] [improve](distinct agg) add check of hash table to decide whether emplace value (#32063) * [improve](distinct agg) add check of hash table to emplace value --- ...istinct_streaming_aggregation_operator.cpp | 130 ++++++++++++++++-- .../distinct_streaming_aggregation_operator.h | 4 + ...ct_streaming_aggregation_sink_operator.cpp | 10 +- ..._streaming_aggregation_source_operator.cpp | 3 +- ...ct_streaming_aggregation_source_operator.h | 1 - be/src/vec/core/block.cpp | 2 +- .../vec/exec/distinct_vaggregation_node.cpp | 41 ++++-- be/src/vec/exec/distinct_vaggregation_node.h | 4 +- be/src/vec/exec/vaggregation_node.h | 6 +- 9 files changed, 171 insertions(+), 30 deletions(-) diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 0ab0b810092e2a..7983b269488eca 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -31,6 +31,30 @@ class RuntimeState; namespace doris::pipeline { +struct StreamingHtMinReductionEntry { + // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in + // bytes is greater than this threshold. + int min_ht_mem; + // The minimum reduction factor to expand the hash tables. + double streaming_ht_min_reduction; +}; + +// TODO: experimentally tune these values and also programmatically get the cache size +// of the machine that we're running on. +static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {0, 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + // At present, The L2 cache is generally 1024k or more + {1024 * 1024, 1.1}, + // Expand into main memory if we're getting a significant reduction. + // The L3 cache is generally 16MB or more + {16 * 1024 * 1024, 2.0}, +}; + +static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = + sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); + DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent), @@ -69,6 +93,65 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& return Status::OK(); } +bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() { + if (!_should_expand_hash_table) { + return false; + } + + return std::visit( + [&](auto&& agg_method) -> bool { + auto& hash_tbl = *agg_method.hash_table; + auto [ht_mem, ht_rows] = + std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; + + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) { + return true; + } + + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && + ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t input_rows = _input_num_rows; + const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned; + // TODO chenhao + // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast(aggregated_input_rows) / ht_rows; + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) { + return true; + } + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + // double estimated_reduction = aggregated_input_rows >= expected_input_rows + // ? current_reduction + // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = + STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; + + // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); + // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + // return estimated_reduction > min_reduction; + _should_expand_hash_table = current_reduction > min_reduction; + return _should_expand_hash_table; + }, + _agg_data->method_variant); +} + void DistinctStreamingAggLocalState::_init_hash_method( const vectorized::VExprContextSPtrs& probe_exprs) { DCHECK(probe_exprs.size() >= 1); @@ -144,6 +227,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( size_t key_size = _probe_expr_ctxs.size(); vectorized::ColumnRawPtrs key_columns(key_size); + std::vector result_idxs(key_size); { SCOPED_TIMER(_expr_timer); for (size_t i = 0; i < key_size; ++i) { @@ -153,6 +237,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( in_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + result_idxs[i] = result_column_id; } } @@ -162,24 +247,41 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( RETURN_IF_CATCH_EXCEPTION( _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); + // need use _cur_num_rows_returned to decide whether to do continue emplace into hash table + _cur_num_rows_returned += _distinct_row.size(); bool mem_reuse = _parent->cast()._make_nullable_keys.empty() && out_block->mem_reuse(); if (mem_reuse) { for (int i = 0; i < key_size; ++i) { - auto dst = out_block->get_by_position(i).column->assume_mutable(); - key_columns[i]->append_data_by_selector(dst, _distinct_row); + auto output_column = out_block->get_by_position(i).column; + if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + out_block->replace_by_position(i, key_columns[i]->assume_mutable()); + in_block->replace_by_position(result_idxs[i], output_column); + } else { + auto dst = output_column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + } } } else { vectorized::ColumnsWithTypeAndName columns_with_schema; for (int i = 0; i < key_size; ++i) { - auto distinct_column = key_columns[i]->clone_empty(); - key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); - columns_with_schema.emplace_back(std::move(distinct_column), - _probe_expr_ctxs[i]->root()->data_type(), - _probe_expr_ctxs[i]->root()->expr_name()); + if (_stop_emplace_flag) { + columns_with_schema.emplace_back(key_columns[i]->assume_mutable(), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } else { + auto distinct_column = key_columns[i]->clone_empty(); + key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); + columns_with_schema.emplace_back(std::move(distinct_column), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } } out_block->swap(vectorized::Block(columns_with_schema)); + if (_stop_emplace_flag) { + in_block->clear(); // clear the column ref with stop_emplace_flag = true + } } return Status::OK(); } @@ -201,6 +303,14 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( SCOPED_TIMER(_hash_table_compute_timer); using HashMethodType = std::decay_t; using AggState = typename HashMethodType::State; + auto& hash_tbl = *agg_method.hash_table; + if (_parent->cast()._is_streaming_preagg && + hash_tbl.add_elem_size_overflow(num_rows)) { + if (!_should_expand_preagg_hash_tables()) { + _stop_emplace_flag = true; + return; + } + } AggState state(key_columns); agg_method.init_serialized_keys(key_columns, num_rows); size_t row = 0; @@ -339,12 +449,12 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc in_block, local_state._aggregated_block.get())); // get enough data or reached limit rows, need push block to queue - if (_limit != -1 && + if (!local_state._stop_emplace_flag && _limit != -1 && (local_state._aggregated_block->rows() + local_state._output_distinct_rows) >= _limit) { auto limit_rows = _limit - local_state._output_distinct_rows; local_state._aggregated_block->set_num_rows(limit_rows); local_state._output_distinct_rows += limit_rows; - } else if (local_state._aggregated_block->rows() >= state->batch_size()) { + } else if (!local_state._stop_emplace_flag) { local_state._output_distinct_rows += local_state._aggregated_block->rows(); } } @@ -371,7 +481,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR( vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); } - + local_state.add_num_rows_returned(block->rows()); *eos = local_state._child_eos || (_limit != -1 && local_state._output_distinct_rows >= _limit); return Status::OK(); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index d469cf5bf21d42..372e3a5a0ca6cf 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -56,12 +56,16 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState dummy_mapped_data; vectorized::IColumn::Selector _distinct_row; vectorized::Arena _arena; int64_t _output_distinct_rows = 0; size_t _input_num_rows = 0; + bool _should_expand_hash_table = true; + int64_t _cur_num_rows_returned = 0; + bool _stop_emplace_flag = false; std::unique_ptr _agg_arena_pool = nullptr; vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 765055a004b5af..3cb18168dcb215 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -52,16 +52,18 @@ Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::B } RETURN_IF_ERROR( _node->_distinct_pre_agg_with_serialized_key(in_block, _output_block.get())); - + bool stop_emplace_flag = _node->is_stop_emplace_flag(); // get enough data or reached limit rows, need push block to queue - if (_node->limit() != -1 && + if (!stop_emplace_flag && _node->limit() != -1 && (_output_block->rows() + _output_distinct_rows) >= _node->limit()) { auto limit_rows = _node->limit() - _output_distinct_rows; _output_block->set_num_rows(limit_rows); _output_distinct_rows += limit_rows; _data_queue->push_block(std::move(_output_block)); - } else if (_output_block->rows() >= state->batch_size()) { - _output_distinct_rows += _output_block->rows(); + } else if (stop_emplace_flag || _output_block->rows() >= state->batch_size()) { + if (!stop_emplace_flag) { // if stop_emplace_flag = true, will be return rows directly, not get distinct + _output_distinct_rows += _output_block->rows(); + } _data_queue->push_block(std::move(_output_block)); } } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp index fb653bdcbdba8a..5ab8bd30bc845f 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -59,7 +59,7 @@ Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vector block->columns())); } - rows_have_returned += block->rows(); + _node->add_num_rows_returned(block->rows()); return Status::OK(); } @@ -71,7 +71,6 @@ Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, vector std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); if (UNLIKELY(eos)) { - _node->set_num_rows_returned(rows_have_returned); source_state = SourceState::FINISHED; } else { source_state = SourceState::DEPEND_ON_SOURCE; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h index 0283e31c41fddd..e8fd21310bbd7b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h @@ -60,7 +60,6 @@ class DistinctStreamingAggSourceOperator final Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos); private: - int64_t rows_have_returned = 0; std::shared_ptr _data_queue; }; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index f4eddb65f57f03..047c7029a2e185 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -700,7 +700,7 @@ void Block::clear_column_data(int column_size) noexcept { } } for (auto& d : data) { - DCHECK_EQ(d.column->use_count(), 1); + DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count(); (*std::move(d.column)).assume_mutable()->clear(); } row_same_bit.clear(); diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index bacfb91e961d2d..66368fe3a11aa7 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -39,6 +39,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( size_t key_size = _probe_expr_ctxs.size(); ColumnRawPtrs key_columns(key_size); + std::vector result_idxs(key_size); { SCOPED_TIMER(_expr_timer); for (size_t i = 0; i < key_size; ++i) { @@ -48,6 +49,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( in_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + result_idxs[i] = result_column_id; } } @@ -57,24 +59,40 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( RETURN_IF_CATCH_EXCEPTION( _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); - + // if get stop_emplace_flag = true, means have no need to emplace value into hash table + // so return block directly and notice the column ref is 2, need deal with. SCOPED_TIMER(_insert_keys_to_column_timer); bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse(); if (mem_reuse) { for (int i = 0; i < key_size; ++i) { - auto dst = out_block->get_by_position(i).column->assume_mutable(); - key_columns[i]->append_data_by_selector(dst, _distinct_row); + auto output_column = out_block->get_by_position(i).column; + if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + out_block->replace_by_position(i, key_columns[i]->assume_mutable()); + in_block->replace_by_position(result_idxs[i], output_column); + } else { + auto dst = output_column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + } } } else { ColumnsWithTypeAndName columns_with_schema; for (int i = 0; i < key_size; ++i) { - auto distinct_column = key_columns[i]->clone_empty(); - key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); - columns_with_schema.emplace_back(std::move(distinct_column), - _probe_expr_ctxs[i]->root()->data_type(), - _probe_expr_ctxs[i]->root()->expr_name()); + if (_stop_emplace_flag) { + columns_with_schema.emplace_back(key_columns[i]->assume_mutable(), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } else { + auto distinct_column = key_columns[i]->clone_empty(); + key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); + columns_with_schema.emplace_back(std::move(distinct_column), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } } out_block->swap(Block(columns_with_schema)); + if (_stop_emplace_flag) { + in_block->clear(); // clear the column ref with stop_emplace_flag = true + } } return Status::OK(); } @@ -88,6 +106,13 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele SCOPED_TIMER(_hash_table_compute_timer); using HashMethodType = std::decay_t; using AggState = typename HashMethodType::State; + auto& hash_tbl = *agg_method.hash_table; + if (is_streaming_preagg() && hash_tbl.add_elem_size_overflow(num_rows)) { + if (!_should_expand_preagg_hash_tables()) { + _stop_emplace_flag = true; + return; + } + } AggState state(key_columns); agg_method.init_serialized_keys(key_columns, num_rows); diff --git a/be/src/vec/exec/distinct_vaggregation_node.h b/be/src/vec/exec/distinct_vaggregation_node.h index adeb042da89ce8..eb47e8fb10e05d 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.h +++ b/be/src/vec/exec/distinct_vaggregation_node.h @@ -43,8 +43,9 @@ class DistinctAggregationNode final : public AggregationNode { DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~DistinctAggregationNode() override = default; Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block); - void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; } + void add_num_rows_returned(int64_t rows) { _num_rows_returned += rows; } vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; } + bool is_stop_emplace_flag() const { return _stop_emplace_flag; } private: void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row, @@ -52,6 +53,7 @@ class DistinctAggregationNode final : public AggregationNode { char* dummy_mapped_data = nullptr; IColumn::Selector _distinct_row; + bool _stop_emplace_flag = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index f74d035caef99c..594f25f6039a1e 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -420,6 +420,9 @@ class AggregationNode : public ::doris::ExecNode { bool is_streaming_preagg() const { return _is_streaming_preagg; } bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); } void _make_nullable_output_key(Block* block); + /// Return true if we should keep expanding hash tables in the preagg. If false, + /// the preagg should pass through any rows it can't fit in its tables. + bool _should_expand_preagg_hash_tables(); protected: bool _is_streaming_preagg; @@ -498,9 +501,6 @@ class AggregationNode : public ::doris::ExecNode { std::unique_ptr _aggregate_data_container; void _release_self_resource(RuntimeState* state); - /// Return true if we should keep expanding hash tables in the preagg. If false, - /// the preagg should pass through any rows it can't fit in its tables. - bool _should_expand_preagg_hash_tables(); size_t _get_hash_table_size();