Skip to content

Commit

Permalink
[improve](distinct agg) add check of hash table to emplace value
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 11, 2024
1 parent 3e07897 commit 9f1022f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
27 changes: 24 additions & 3 deletions be/src/vec/exec/distinct_vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,24 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
}

int rows = in_block->rows();
bool stop_emplace_flag = false;
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows,
&stop_emplace_flag));
// if get stop_emplace_flag = true, means have no need to emplace value into hash table
// so return block directly
if (stop_emplace_flag) {
ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
}
out_block->swap(Block(columns_with_schema));
return Status::OK();
}

SCOPED_TIMER(_insert_keys_to_column_timer);
bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
Expand All @@ -81,13 +94,21 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(

void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
ColumnRawPtrs& key_columns,
const size_t num_rows) {
const size_t num_rows,
bool* stop_emplace_flag) {
SCOPED_TIMER(_exec_timer);
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
auto& hash_tbl = *agg_method.hash_table;
if (is_streaming_preagg() && hash_tbl.add_elem_size_overflow(num_rows)) {
if (!_should_expand_preagg_hash_tables()) {
*stop_emplace_flag = true;
return;
}
}
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/distinct_vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class DistinctAggregationNode final : public AggregationNode {

private:
void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
ColumnRawPtrs& key_columns, const size_t num_rows);
ColumnRawPtrs& key_columns, const size_t num_rows,
bool* stop_emplace_flag);

char* dummy_mapped_data = nullptr;
IColumn::Selector _distinct_row;
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ class AggregationNode : public ::doris::ExecNode {
bool is_streaming_preagg() const { return _is_streaming_preagg; }
bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); }
void _make_nullable_output_key(Block* block);
/// Return true if we should keep expanding hash tables in the preagg. If false,
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();

protected:
bool _is_streaming_preagg;
Expand Down Expand Up @@ -498,9 +501,6 @@ class AggregationNode : public ::doris::ExecNode {
std::unique_ptr<AggregateDataContainer> _aggregate_data_container;

void _release_self_resource(RuntimeState* state);
/// Return true if we should keep expanding hash tables in the preagg. If false,
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();

size_t _get_hash_table_size();

Expand Down

0 comments on commit 9f1022f

Please sign in to comment.