Skip to content

Commit

Permalink
Merge branch 'master' into 20230819_fix_stacktrace
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei authored Sep 20, 2023
2 parents 50e4511 + b02398b commit 36fb5e7
Show file tree
Hide file tree
Showing 98 changed files with 3,821 additions and 184 deletions.
24 changes: 2 additions & 22 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -506,34 +506,14 @@ endif()
if (absl_FOUND)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
absl::flat_hash_set
absl::str_format
absl::flags
absl::flags_commandlineflag
absl::flags_commandlineflag_internal
absl::flags_config
absl::flags_internal
absl::flags_marshalling
absl::flags_parse
absl::flags_private_handle_accessor
absl::flags_program_name
absl::flags_reflection
absl::flags_usage
absl::flags_usage_internal
absl::random_distributions
absl::random_internal_distribution_test_util
absl::random_internal_platform
absl::random_internal_pool_urbg
absl::random_internal_randen
absl::random_internal_randen_hwaes
absl::random_internal_randen_hwaes_impl
absl::random_internal_randen_slow
absl::random_internal_seed_material
absl::random_seed_gen_exception
absl::random_seed_sequences
absl::spinlock_wait
absl::status
absl::statusor
absl::strerror
absl::strings
)
endif()

Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <string>

#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "runtime/primitive_type.h"
Expand Down Expand Up @@ -946,10 +947,12 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
}

class StreamingAggSinkLocalState;
class DistinctStreamingAggSinkLocalState;

template class AggSinkOperatorX<BlockingAggSinkLocalState>;
template class AggSinkOperatorX<StreamingAggSinkLocalState>;
template class AggSinkOperatorX<DistinctStreamingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;

template class AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
} // namespace doris::pipeline
1 change: 1 addition & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
template <typename DependencyType, typename Derived>
friend class AggSinkLocalState;
friend class StreamingAggSinkLocalState;
friend class DistinctStreamingAggSinkLocalState;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;

Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class AggLocalState : public PipelineXLocalState<AggDependency> {
friend class AggSourceOperatorX;
friend class StreamingAggSourceOperatorX;
friend class StreamingAggSinkOperatorX;
friend class DistinctStreamingAggSourceOperatorX;
friend class DistinctStreamingAggSinkOperatorX;

void _close_without_key();
void _close_with_serialized_key();
Expand Down
166 changes: 166 additions & 0 deletions be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/Metrics_types.h>

#include <memory>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -94,4 +95,169 @@ OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, _data_queue);
}

DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState(
DataSinkOperatorXBase* parent, RuntimeState* state)
: AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>(parent, state),
dummy_mapped_data(std::make_shared<char>('A')) {}

Status DistinctStreamingAggSinkLocalState::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_shared_state->probe_expr_ctxs.empty());

size_t key_size = _shared_state->probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(
_shared_state->probe_expr_ctxs[i]->execute(in_block, &result_column_id));
in_block->get_by_position(result_column_id).column =
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
}
}

int rows = in_block->rows();
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));

bool mem_reuse = _dependency->make_nullable_keys().empty() && out_block->mem_reuse();
if (mem_reuse) {
for (int i = 0; i < key_size; ++i) {
auto dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
} else {
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(
std::move(distinct_column),
_shared_state->probe_expr_ctxs[i]->root()->data_type(),
_shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
out_block->swap(vectorized::Block(columns_with_schema));
}
return Status::OK();
}

void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _shared_state->probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);

if constexpr (HashTableTraits<HashTableType>::is_phmap) {
auto keys = state.get_keys(num_rows);
if (_hash_values.size() < num_rows) {
_hash_values.resize(num_rows);
}

for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] = agg_method.data.hash(keys[i]);
}
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
}
auto result = state.emplace_with_key(
agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool),
_hash_values[i], i);
if (result.is_inserted()) {
distinct_row.push_back(i);
}
}
} else {
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool);
if (result.is_inserted()) {
result.set_mapped(dummy_mapped_data.get());
distinct_row.push_back(i);
}
}
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
},
_agg_data->method_variant);
}

DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, tnode, descs) {}

Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode, state));
_name = "DISTINCT_STREAMING_AGGREGATION_SINK_OPERATOR";
return Status::OK();
}

Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
auto& local_state =
state->get_sink_local_state(id())->cast<DistinctStreamingAggSinkLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
if (local_state._output_block == nullptr) {
local_state._output_block = local_state._shared_state->data_queue->get_free_block();
}
RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
in_block, local_state._output_block.get()));

// get enough data or reached limit rows, need push block to queue
if (_limit != -1 &&
(local_state._output_block->rows() + local_state._output_distinct_rows) >= _limit) {
auto limit_rows = _limit - local_state._output_distinct_rows;
local_state._output_block->set_num_rows(limit_rows);
local_state._output_distinct_rows += limit_rows;
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
} else if (local_state._output_block->rows() >= state->batch_size()) {
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
}

// reach limit or source finish
if ((UNLIKELY(source_state == SourceState::FINISHED)) ||
(_limit != -1 && local_state._output_distinct_rows >= _limit)) {
if (local_state._output_block != nullptr) { //maybe the last block with eos
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
local_state._shared_state->data_queue->set_finish();
return Status::Error<ErrorCode::END_OF_FILE>(""); // need given finish signal
}
return Status::OK();
}

Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) {
// finish should be set, if not set here means error.
_shared_state->data_queue->set_canceled();
}
return Base::close(state);
}

} // namespace doris::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
#include <cstdint>
#include <memory>

#include "aggregation_sink_operator.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/distinct_vaggregation_node.h"
Expand Down Expand Up @@ -72,5 +75,50 @@ class DistinctStreamingAggSinkOperator final
std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
};

class DistinctStreamingAggSinkOperatorX;

class DistinctStreamingAggSinkLocalState final
: public AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState> {
public:
using Parent = DistinctStreamingAggSinkOperatorX;
using Base = AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState);
DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));
_shared_state->data_queue.reset(new DataQueue(1, _dependency));
return Status::OK();
}

Status close(RuntimeState* state) override;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);

private:
friend class DistinctStreamingAggSinkOperatorX;
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows);

std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
std::shared_ptr<char> dummy_mapped_data = nullptr;
vectorized::IColumn::Selector _distinct_row;
int64_t _output_distinct_rows = 0;
};

class DistinctStreamingAggSinkOperatorX final
: public AggSinkOperatorX<DistinctStreamingAggSinkLocalState> {
public:
DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

WriteDependency* wait_for_dependency(RuntimeState* state) override {
return state->get_local_state(id())->cast<AggLocalState>()._dependency->write_blocked_by();
}
};

} // namespace pipeline
} // namespace doris
} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,53 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, _data_queue);
}

DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, descs) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
DCHECK(_limit == -1) << "Preaggs have no limits";
}
} else {
_is_streaming_preagg = false;
}
}

Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
std::unique_ptr<vectorized::Block> agg_block;
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
if (agg_block != nullptr) {
block->swap(*agg_block);
agg_block->clear_column_data(block->columns());
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
}

local_state._dependency->_make_nullable_output_key(block);
if (_is_streaming_preagg == false) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
}

if (UNLIKELY(local_state._shared_state->data_queue->data_exhausted())) {
source_state = SourceState::FINISHED;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
}
return Status::OK();
}

Status DistinctStreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
_op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}

} // namespace pipeline
} // namespace doris
Loading

0 comments on commit 36fb5e7

Please sign in to comment.