Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 12, 2024
1 parent 907fe99 commit d7623be
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 50 deletions.
63 changes: 42 additions & 21 deletions be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(

size_t key_size = _probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
std::vector<int> result_idxs(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
Expand All @@ -236,42 +237,62 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
result_idxs[i] = result_column_id;
}
}

int rows = in_block->rows();
bool stop_emplace_flag = false;
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows,
&stop_emplace_flag));
RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
// need use _cur_num_rows_returned to decide whether to do continue emplace into hash table
_cur_num_rows_returned += _distinct_row.size();

if (stop_emplace_flag) {
for (int i = 0; i < rows; ++i) {
_distinct_row.push_back(i);
}
}

// if (stop_emplace_flag) {
// vectorized::ColumnsWithTypeAndName columns_with_schema;
// for (int i = 0; i < key_size; ++i) {
// columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
// _probe_expr_ctxs[i]->root()->data_type(),
// _probe_expr_ctxs[i]->root()->expr_name());
// }
// out_block->swap(vectorized::Block(columns_with_schema));
// in_block->clear();
// return Status::OK();
// }
bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
out_block->mem_reuse();
if (mem_reuse) {
for (int i = 0; i < key_size; ++i) {
auto dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
auto output_column = out_block->get_by_position(i).column;
if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
in_block->replace_by_position(result_idxs[i], output_column);
} else {
auto dst = output_column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
}
} else {
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(std::move(distinct_column),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
if (_stop_emplace_flag) {
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
} else {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(std::move(distinct_column),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
}
}
out_block->swap(vectorized::Block(columns_with_schema));
if (_stop_emplace_flag) {
in_block->clear(); // clear the column ref with stop_emplace_flag = true
}
}
return Status::OK();
}
Expand All @@ -287,7 +308,7 @@ void DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block

void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows, bool* stop_emplace_flag) {
const size_t num_rows) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
Expand All @@ -297,7 +318,7 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg &&
hash_tbl.add_elem_size_overflow(num_rows)) {
if (!_should_expand_preagg_hash_tables()) {
*stop_emplace_flag = true;
_stop_emplace_flag = true;
return;
}
}
Expand Down Expand Up @@ -439,12 +460,12 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc
in_block, local_state._aggregated_block.get()));

// get enough data or reached limit rows, need push block to queue
if (_limit != -1 &&
if (!local_state._stop_emplace_flag && _limit != -1 &&
(local_state._aggregated_block->rows() + local_state._output_distinct_rows) >= _limit) {
auto limit_rows = _limit - local_state._output_distinct_rows;
local_state._aggregated_block->set_num_rows(limit_rows);
local_state._output_distinct_rows += limit_rows;
} else if (local_state._aggregated_block->rows() >= state->batch_size()) {
} else if (!local_state._stop_emplace_flag) {
local_state._output_distinct_rows += local_state._aggregated_block->rows();
}
}
Expand All @@ -471,7 +492,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
}

local_state.add_num_rows_returned(block->rows());
*eos = local_state._child_eos || (_limit != -1 && local_state._output_distinct_rows >= _limit);
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows, bool* stop_emplace_flag);
const size_t num_rows);
void _make_nullable_output_key(vectorized::Block* block);
bool _should_expand_preagg_hash_tables();

Expand All @@ -65,6 +65,7 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
size_t _input_num_rows = 0;
bool _should_expand_hash_table = true;
int64_t _cur_num_rows_returned = 0;
bool _stop_emplace_flag = false;

std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::B
}
RETURN_IF_ERROR(
_node->_distinct_pre_agg_with_serialized_key(in_block, _output_block.get()));

bool stop_emplace_flag = _node->is_stop_emplace_flag();
// get enough data or reached limit rows, need push block to queue
if (_node->limit() != -1 &&
if (!stop_emplace_flag && _node->limit() != -1 &&
(_output_block->rows() + _output_distinct_rows) >= _node->limit()) {
auto limit_rows = _node->limit() - _output_distinct_rows;
_output_block->set_num_rows(limit_rows);
_output_distinct_rows += limit_rows;
_data_queue->push_block(std::move(_output_block));
} else if (_output_block->rows() >= state->batch_size()) {
_output_distinct_rows += _output_block->rows();
} else if (stop_emplace_flag || _output_block->rows() >= state->batch_size()) {
if (!stop_emplace_flag) { // if stop_emplace_flag = true, will be return rows directly, not get distinct
_output_distinct_rows += _output_block->rows();
}
_data_queue->push_block(std::move(_output_block));
}
}
Expand Down
51 changes: 29 additions & 22 deletions be/src/vec/exec/distinct_vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(

size_t key_size = _probe_expr_ctxs.size();
ColumnRawPtrs key_columns(key_size);
std::vector<int> result_idxs(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
Expand All @@ -48,51 +49,57 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
result_idxs[i] = result_column_id;
}
}

int rows = in_block->rows();
bool stop_emplace_flag = false;
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows,
&stop_emplace_flag));
RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
// if get stop_emplace_flag = true, means have no need to emplace value into hash table
// so return block directly.
//TODO: maybe could insert key_columns into output_block, but need solve Check failed: d.column->use_count() == 1 (2 vs. 1)
//do not know the in_block whether be use after
if (stop_emplace_flag) {
for (int i = 0; i < rows; ++i) {
_distinct_row.push_back(i);
}
}

// so return block directly and notice the column ref is 2, need deal with.
SCOPED_TIMER(_insert_keys_to_column_timer);
bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
if (mem_reuse) {
for (int i = 0; i < key_size; ++i) {
auto dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
auto output_column = out_block->get_by_position(i).column;
if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
in_block->replace_by_position(result_idxs[i], output_column);
} else {
auto dst = output_column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
}
} else {
ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(std::move(distinct_column),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
if (_stop_emplace_flag) {
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
} else {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(std::move(distinct_column),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
}
}
out_block->swap(Block(columns_with_schema));
if (_stop_emplace_flag) {
in_block->clear(); // clear the column ref with stop_emplace_flag = true
}
}
return Status::OK();
}

void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
ColumnRawPtrs& key_columns,
const size_t num_rows,
bool* stop_emplace_flag) {
const size_t num_rows) {
SCOPED_TIMER(_exec_timer);
std::visit(
[&](auto&& agg_method) -> void {
Expand All @@ -102,7 +109,7 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele
auto& hash_tbl = *agg_method.hash_table;
if (is_streaming_preagg() && hash_tbl.add_elem_size_overflow(num_rows)) {
if (!_should_expand_preagg_hash_tables()) {
*stop_emplace_flag = true;
_stop_emplace_flag = true;
return;
}
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/distinct_vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class DistinctAggregationNode final : public AggregationNode {
Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block);
void add_num_rows_returned(int64_t rows) { _num_rows_returned += rows; }
vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }
bool is_stop_emplace_flag() const { return _stop_emplace_flag; }

private:
void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
ColumnRawPtrs& key_columns, const size_t num_rows,
bool* stop_emplace_flag);
ColumnRawPtrs& key_columns, const size_t num_rows);

char* dummy_mapped_data = nullptr;
IColumn::Selector _distinct_row;
bool _stop_emplace_flag = false;
};
} // namespace vectorized
} // namespace doris

0 comments on commit d7623be

Please sign in to comment.