diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a83d8bebd320cb7..5604d2b43d2eebe 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -19,6 +19,7 @@ #include +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "runtime/primitive_type.h" @@ -946,10 +947,12 @@ Status AggSinkLocalState::close(RuntimeState* state) { } class StreamingAggSinkLocalState; +class DistinctStreamingAggSinkLocalState; template class AggSinkOperatorX; template class AggSinkOperatorX; +template class AggSinkOperatorX; template class AggSinkLocalState; template class AggSinkLocalState; - +template class AggSinkLocalState; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 0a484803cbfa6d1..0013267e62fb7b2 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -354,6 +354,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { template friend class AggSinkLocalState; friend class StreamingAggSinkLocalState; + friend class DistinctStreamingAggSinkLocalState; std::vector _aggregate_evaluators; bool _can_short_circuit = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index a840a72f13bd1d0..b9c154ace8cc884 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -64,6 +64,7 @@ class AggLocalState : public PipelineXLocalState { protected: friend class AggSourceOperatorX; friend class StreamingAggSourceOperatorX; + friend class DistinctStreamingAggSourceOperatorX; void _close_without_key(); void _close_with_serialized_key(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 635817973c72c3f..1616cab1120ca24 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -19,6 +19,7 @@ #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -94,4 +95,182 @@ OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() { return std::make_shared(this, _node, _data_queue); } +DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState( + DataSinkOperatorXBase* parent, RuntimeState* state) + : AggSinkLocalState(parent, state) { + dummy_mapped_data = std::make_shared('A'); +} + +Status DistinctStreamingAggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + return Status::OK(); +} + +Status DistinctStreamingAggSinkLocalState::_distinct_pre_agg_with_serialized_key( + doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_shared_state->probe_expr_ctxs.empty()); + + size_t key_size = _shared_state->probe_expr_ctxs.size(); + vectorized::ColumnRawPtrs key_columns(key_size); + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR( + _shared_state->probe_expr_ctxs[i]->execute(in_block, &result_column_id)); + in_block->get_by_position(result_column_id).column = + in_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + } + } + + int rows = in_block->rows(); + _distinct_row.clear(); + _distinct_row.reserve(rows); + + RETURN_IF_CATCH_EXCEPTION( + _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); + + bool mem_reuse = _dependency->make_nullable_keys().empty() && out_block->mem_reuse(); + if (mem_reuse) { + for (int i = 0; i < key_size; ++i) { + auto dst = out_block->get_by_position(i).column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + } + } else { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + auto distinct_column = key_columns[i]->clone_empty(); + key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); + columns_with_schema.emplace_back( + std::move(distinct_column), + _shared_state->probe_expr_ctxs[i]->root()->data_type(), + _shared_state->probe_expr_ctxs[i]->root()->expr_name()); + } + out_block->swap(vectorized::Block(columns_with_schema)); + } + return Status::OK(); +} + +void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( + vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(_hash_table_compute_timer); + using HashMethodType = std::decay_t; + using HashTableType = std::decay_t; + using AggState = typename HashMethodType::State; + AggState state(key_columns, _shared_state->probe_key_sz, nullptr); + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + if constexpr (HashTableTraits::is_phmap) { + auto keys = state.get_keys(num_rows); + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } + + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); + } + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + auto result = state.emplace_with_key( + agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool), + _hash_values[i], i); + if (result.is_inserted()) { + distinct_row.push_back(i); + } + } + } else { + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool); + if (result.is_inserted()) { + result.set_mapped(dummy_mapped_data.get()); + distinct_row.push_back(i); + } + } + } + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + }, + _agg_data->method_variant); +} + +DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : AggSinkOperatorX(pool, tnode, descs) {} + +bool DistinctStreamingAggSinkOperatorX::can_write(RuntimeState* state) { + // sink and source in diff threads + return state->get_sink_local_state(id()) + ->cast() + ._shared_state->data_queue->has_enough_space_to_push(); +} + +Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); + _name = "DISTINCT_STREAMING_AGGREGATION_SINK_OPERATOR"; + return Status::OK(); +} + +Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + auto& local_state = + state->get_sink_local_state(id())->cast(); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._shared_state->input_num_rows += in_block->rows(); + Status ret = Status::OK(); + if (in_block && in_block->rows() > 0) { + if (local_state._output_block == nullptr) { + local_state._output_block = local_state._shared_state->data_queue->get_free_block(); + } + RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key( + in_block, local_state._output_block.get())); + + // get enough data or reached limit rows, need push block to queue + if (_limit != -1 && + (local_state._output_block->rows() + local_state._output_distinct_rows) >= _limit) { + auto limit_rows = _limit - local_state._output_distinct_rows; + local_state._output_block->set_num_rows(limit_rows); + local_state._output_distinct_rows += limit_rows; + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } else if (local_state._output_block->rows() >= state->batch_size()) { + local_state._output_distinct_rows += local_state._output_block->rows(); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } + } + + // reach limit or source finish + if ((UNLIKELY(source_state == SourceState::FINISHED)) || + (_limit != -1 && local_state._output_distinct_rows >= _limit)) { + if (local_state._output_block != nullptr) { //maybe the last block with eos + local_state._output_distinct_rows += local_state._output_block->rows(); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block)); + } + local_state._shared_state->data_queue->set_finish(); + return Status::Error(""); // need given finish signal + } + return Status::OK(); +} + +Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) { + // finish should be set, if not set here means error. + _shared_state->data_queue->set_canceled(); + } + return Base::close(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index 46b1dda008d0e35..c2bd6cd7a9aae9f 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -22,8 +22,10 @@ #include #include +#include "aggregation_sink_operator.h" #include "common/status.h" #include "operator.h" +#include "pipeline/exec/aggregation_sink_operator.h" #include "util/runtime_profile.h" #include "vec/core/block.h" #include "vec/exec/distinct_vaggregation_node.h" @@ -72,5 +74,44 @@ class DistinctStreamingAggSinkOperator final std::unique_ptr _output_block = vectorized::Block::create_unique(); }; +class DistinctStreamingAggSinkOperatorX; + +class DistinctStreamingAggSinkLocalState final + : public AggSinkLocalState { +public: + using Parent = DistinctStreamingAggSinkOperatorX; + using Base = AggSinkLocalState; + ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState); + DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status close(RuntimeState* state) override; + Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block, + vectorized::Block* out_block); + +private: + friend class DistinctStreamingAggSinkOperatorX; + void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row, + vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows); + + std::unique_ptr _output_block = vectorized::Block::create_unique(); + std::shared_ptr dummy_mapped_data = nullptr; + vectorized::IColumn::Selector _distinct_row; + int64_t _output_distinct_rows = 0; +}; + +class DistinctStreamingAggSinkOperatorX final + : public AggSinkOperatorX { +public: + DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + bool can_write(RuntimeState* state) override; +}; + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp index fb653bdcbdba8aa..bed911d13057ced 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -88,5 +88,53 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() { return std::make_shared(this, _node, _data_queue); } +DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(pool, tnode, descs) { + if (tnode.agg_node.__isset.use_streaming_preaggregation) { + _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; + if (_is_streaming_preagg) { + DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; + DCHECK(_limit == -1) << "Preaggs have no limits"; + } + } else { + _is_streaming_preagg = false; + } +} + +Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast(); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + std::unique_ptr agg_block; + RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block)); + if (agg_block != nullptr) { + block->swap(*agg_block); + agg_block->clear_column_data(block->columns()); + local_state._shared_state->data_queue->push_free_block(std::move(agg_block)); + } + + local_state._dependency->_make_nullable_output_key(block); + if (_is_streaming_preagg == false) { + // dispose the having clause, should not be execute in prestreaming agg + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + } + + if (UNLIKELY(local_state._shared_state->data_queue->data_exhausted())) { + source_state = SourceState::FINISHED; + } else { + source_state = SourceState::DEPEND_ON_SOURCE; + } + return Status::OK(); +} + +Status DistinctStreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + _op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR"; + return Status::OK(); +} + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h index 3534193bf8fb3cf..ad66a6d93c6bbe6 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/exec/aggregation_source_operator.h" #include "vec/exec/distinct_vaggregation_node.h" #include "vec/exec/vaggregation_node.h" @@ -63,5 +64,19 @@ class DistinctStreamingAggSourceOperator final std::shared_ptr _data_queue; }; +class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX { +public: + using Base = AggSourceOperatorX; + DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + ~DistinctStreamingAggSourceOperatorX() = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + bool _is_streaming_preagg = false; +}; + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 9623f025c18c812..7da6f0c5cd9af63 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -108,8 +108,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._num_partition > config::partition_topn_partition_threshold && local_state.child_input_rows < 10000 * local_state._num_partition) { { - std::lock_guard lock(local_state._shared_state->_buffer_mutex); - local_state._shared_state->_blocks_buffer.push(std::move(*input_block)); + std::lock_guard lock(local_state._shared_state->buffer_mutex); + local_state._shared_state->blocks_buffer.push(std::move(*input_block)); // buffer have data, source could read this. local_state._dependency->set_ready_for_read(); } @@ -133,7 +133,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, _child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr, _has_global_limit, _partition_inner_limit, _top_n_algorithm, - local_state._shared_state->_previous_row.get()); + local_state._shared_state->previous_row.get()); DCHECK(_child_x->row_desc().num_materialized_slots() == local_state._value_places[i]->blocks.back()->columns()); @@ -143,7 +143,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } sorter->init_profile(local_state._profile); RETURN_IF_ERROR(sorter->prepare_for_read()); - local_state._shared_state->_partition_sorts.push_back(std::move(sorter)); + local_state._shared_state->partition_sorts.push_back(std::move(sorter)); } COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition)); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 6e8d2e06426d27e..b5370d2ca5166c3 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -33,8 +33,8 @@ Status PartitionSortSourceLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - _shared_state->_previous_row = nullptr; - _shared_state->_partition_sorts.clear(); + _shared_state->previous_row = nullptr; + _shared_state->partition_sorts.clear(); return PipelineXLocalState::close(state); } @@ -44,28 +44,25 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: auto& local_state = state->get_local_state(id())->cast(); output_block->clear_column_data(); { - std::lock_guard lock(local_state._shared_state->_buffer_mutex); - if (local_state._shared_state->_blocks_buffer.empty() == false) { - local_state._shared_state->_blocks_buffer.front().swap(*output_block); - local_state._shared_state->_blocks_buffer.pop(); + std::lock_guard lock(local_state._shared_state->buffer_mutex); + if (local_state._shared_state->blocks_buffer.empty() == false) { + local_state._shared_state->blocks_buffer.front().swap(*output_block); + local_state._shared_state->blocks_buffer.pop(); //if buffer have no data, block reading and wait for signal again - if (local_state._shared_state->_blocks_buffer.empty()) { + if (local_state._shared_state->blocks_buffer.empty()) { local_state._dependency->block_reading(); } return Status::OK(); } } - // this is set by sink node using: local_state._dependency->set_ready_for_read() - if (local_state._dependency->is_ready_for_read()) { - bool current_eos = false; - RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos, local_state)); - } + // is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read() + RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); { - std::lock_guard lock(local_state._shared_state->_buffer_mutex); - if (local_state._shared_state->_blocks_buffer.empty() && - local_state._shared_state->_sort_idx >= - local_state._shared_state->_partition_sorts.size()) { + std::lock_guard lock(local_state._shared_state->buffer_mutex); + if (local_state._shared_state->blocks_buffer.empty() && + local_state._shared_state->sort_idx >= + local_state._shared_state->partition_sorts.size()) { source_state = SourceState::FINISHED; } } @@ -79,25 +76,24 @@ Dependency* PartitionSortSourceOperatorX::wait_for_dependency(RuntimeState* stat Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, vectorized::Block* output_block, - bool* current_eos, PartitionSortSourceLocalState& local_state) { SCOPED_TIMER(local_state._get_sorted_timer); //sorter output data one by one - if (local_state._shared_state->_sort_idx < local_state._shared_state->_partition_sorts.size()) { + bool current_eos = false; + if (local_state._shared_state->sort_idx < local_state._shared_state->partition_sorts.size()) { RETURN_IF_ERROR( - local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx] - ->get_next(state, output_block, current_eos)); + local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] + ->get_next(state, output_block, ¤t_eos)); } - if (*current_eos) { + if (current_eos) { //current sort have eos, so get next idx - local_state._shared_state->_previous_row->reset(); - auto rows = - local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx] - ->get_output_rows(); + local_state._shared_state->previous_row->reset(); + auto rows = local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] + ->get_output_rows(); local_state._num_rows_returned += rows; - local_state._shared_state->_partition_sorts[local_state._shared_state->_sort_idx].reset( + local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset( nullptr); - local_state._shared_state->_sort_idx++; + local_state._shared_state->sort_idx++; } return Status::OK(); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 8468da4dde9f95a..859e2d8b58f5297 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -63,7 +63,7 @@ class PartitionSortSourceLocalState final : public PipelineXLocalState::init(state, info)); _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); - _shared_state->_previous_row = std::make_unique(); + _shared_state->previous_row = std::make_unique(); return Status::OK(); } @@ -93,9 +93,9 @@ class PartitionSortSourceOperatorX final : public OperatorXrows(); - _make_nullable_output_key(output_block); + _dependency->_make_nullable_output_key(output_block); // COUNTER_SET(_rows_returned_counter, _num_rows_returned); _executor.update_memusage(); return Status::OK(); } -void StreamingAggSinkLocalState::_make_nullable_output_key(vectorized::Block* block) { - if (block->rows() != 0) { - for (auto cid : _dependency->make_nullable_keys()) { - block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); - block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); - } - } -} - bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() { if (!_should_expand_hash_table) { return false; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 5b9c635580c8879..d6027201dc677d9 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -90,7 +90,6 @@ class StreamingAggSinkLocalState final Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block, doris::vectorized::Block* out_block); - void _make_nullable_output_key(vectorized::Block* block); bool _should_expand_preagg_hash_tables(); vectorized::Block _preagg_block = vectorized::Block(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 6f3ed212314bdc0..fcce87d55613fec 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -297,7 +297,15 @@ class AggDependency : public Dependency { void set_make_nullable_keys(std::vector& make_nullable_keys) { _make_nullable_keys = make_nullable_keys; } - + void _make_nullable_output_key(vectorized::Block* block) { + if (block->rows() != 0) { + for (auto cid : _make_nullable_keys) { + block->get_by_position(cid).column = + make_nullable(block->get_by_position(cid).column); + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } + } + } const std::vector& make_nullable_keys() { return _make_nullable_keys; } void release_tracker(); @@ -484,11 +492,11 @@ class NestedLoopJoinDependency final : public Dependency { struct PartitionSortNodeSharedState { public: - std::queue _blocks_buffer; - std::mutex _buffer_mutex; - std::vector> _partition_sorts; - std::unique_ptr _previous_row = nullptr; - int _sort_idx = 0; + std::queue blocks_buffer; + std::mutex buffer_mutex; + std::vector> partition_sorts; + std::unique_ptr previous_row = nullptr; + int sort_idx = 0; }; class PartitionSortDependency final : public Dependency { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index bd2fc8cf2521632..e1eff50000356d8 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -24,6 +24,7 @@ #include "pipeline/exec/analytic_sink_operator.h" #include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/empty_set_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" @@ -316,6 +317,7 @@ DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) DECLARE_OPERATOR_X(StreamingAggSinkLocalState) +DECLARE_OPERATOR_X(DistinctStreamingAggSinkLocalState) DECLARE_OPERATOR_X(ExchangeSinkLocalState) DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) DECLARE_OPERATOR_X(UnionSinkLocalState) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5307998e2af65df..8af14197295e744 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -48,6 +48,8 @@ #include "pipeline/exec/assert_num_rows_operator.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/datagen_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h" #include "pipeline/exec/empty_set_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" @@ -500,8 +502,22 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN break; } case TPlanNodeType::AGGREGATION_NODE: { - if (tnode.agg_node.__isset.use_streaming_preaggregation && - tnode.agg_node.use_streaming_preaggregation) { + if (tnode.agg_node.aggregate_functions.empty()) { + op.reset(new DistinctStreamingAggSourceOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + DataSinkOperatorXPtr sink; + sink.reset(new DistinctStreamingAggSinkOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + } else if (tnode.agg_node.__isset.use_streaming_preaggregation && + tnode.agg_node.use_streaming_preaggregation) { op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); diff --git a/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out b/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out new file mode 100644 index 000000000000000..7d0eb9fd4fd1ade --- /dev/null +++ b/regression-test/data/pipelineX/test_distinct_streaming_agg_operator.out @@ -0,0 +1,69 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !pipeline_1 -- +\N \N \N +1 1989 1001 +2 1986 1001 +3 1989 1002 +4 1991 3021 +5 1985 5014 +6 32767 3021 +7 -32767 1002 +8 255 2147483647 +9 1991 -2147483647 +10 1991 5014 +11 1989 25699 +12 32767 -2147483647 +13 -32767 2147483647 +14 255 103 +15 1992 3021 + +-- !pipeline_2 -- +\N +-2147483647 +103 +1001 +1002 +3021 +5014 +25699 +2147483647 + +-- !pipeline_3 -- +\N +false +true + +-- !pipelineX_1 -- +\N \N \N +1 1989 1001 +2 1986 1001 +3 1989 1002 +4 1991 3021 +5 1985 5014 +6 32767 3021 +7 -32767 1002 +8 255 2147483647 +9 1991 -2147483647 +10 1991 5014 +11 1989 25699 +12 32767 -2147483647 +13 -32767 2147483647 +14 255 103 +15 1992 3021 + +-- !pipelineX_2 -- +\N +-2147483647 +103 +1001 +1002 +3021 +5014 +25699 +2147483647 + +-- !pipelineX_3 -- +\N +false +true + diff --git a/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy b/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy new file mode 100644 index 000000000000000..f0337fcfc7f346e --- /dev/null +++ b/regression-test/suites/pipelineX/test_distinct_streaming_agg_operator.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_distinct_streaming_agg_operator") { + def dbName = "test_distinct_streaming_agg_operator" + sql "DROP DATABASE IF EXISTS ${dbName}" + sql "CREATE DATABASE ${dbName}" + sql "USE $dbName" + sql """ DROP TABLE IF EXISTS baseall """ + sql """ + CREATE TABLE IF NOT EXISTS baseall ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ set forbid_unknown_col_stats = false """ + streamLoad { + table "baseall" + db dbName + set 'column_separator', ',' + file "../query_p0/baseall.txt" + } + + sql"""set enable_pipeline_engine = true; """ + + qt_pipeline_1 """ + select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3; + """ + qt_pipeline_2 """ + select k3 from baseall group by k3 order by k3; + """ + + qt_pipeline_3 """ + select k6 from baseall group by k6 order by k6; + """ + + sql"""set experimental_enable_pipeline_x_engine=true; """ + + qt_pipelineX_1 """ + select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3; + """ + qt_pipelineX_2 """ + select k3 from baseall group by k3 order by k3; + """ + qt_pipelineX_3 """ + select k6 from baseall group by k6 order by k6; + """ + + +} \ No newline at end of file