Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed May 24, 2024
1 parent 0284c84 commit 90bcb47
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 27 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(),
(vectorized::ColumnString*)(column.get()), _agg_arena_pool,
_deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _shared_state->offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()),
_deserialize_buffer.data(), column.get(),
_shared_state->agg_arena_pool.get(), rows);
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ Status StreamingAggLocalState::_merge_with_serialized_key_helper(vectorized::Blo
_places.data(),
Base::_parent->template cast<StreamingAggOperatorX>()
._offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
_deserialize_buffer.data(), column.get(), _agg_arena_pool.get(), rows);
}
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class IAggregateFunction {
size_t num_rows) const = 0;

virtual void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
AggregateDataPtr rhs, const IColumn* column,
Arena* arena, const size_t num_rows) const = 0;

virtual void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
Expand Down Expand Up @@ -376,13 +376,14 @@ class IAggregateFunctionHelper : public IAggregateFunction {
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
const auto* column_string = assert_cast<const ColumnString*>(column);
for (size_t i = 0; i != num_rows; ++i) {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
VectorBufferReader buffer_reader(column_string->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
assert_cast<const Derived*>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/aggregate_functions/aggregate_function_avg.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class AggregateFunctionAvg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ class AggregateFunctionBitmapSerializationHelper
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset).merge(data[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ class AggregateFunctionBitmapAgg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
const auto& col = assert_cast<const ColumnBitmap&>(*column);
const auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset).value |= data[i];
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,11 @@ class AggregateFunctionCollect
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
if constexpr (ShowNull::value) {
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i] + offset)
.deserialize_and_merge(*assert_cast<const IColumn*>(column), i);
this->data(places[i] + offset).deserialize_and_merge(*column, i);
}
} else {
return BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, arena,
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class AggregateFunctionCount final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down Expand Up @@ -284,7 +284,7 @@ class AggregateFunctionCountNotNullUnary final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ class AggregateFunctionMapAgg final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
auto& col = assert_cast<const ColumnMap&>(*assert_cast<const IColumn*>(column));
const auto& col = assert_cast<const ColumnMap&>(*column);
for (size_t i = 0; i != num_rows; ++i) {
auto map = doris::vectorized::get<Map>(col[i]);
this->data(places[i] + offset).add(map[0], map[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ class AggregateFunctionsSingleValue final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/aggregate_functions/aggregate_function_sum.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class AggregateFunctionSum final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class AggregateFunctionUniqDistributeKey final
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
AggregateDataPtr rhs, const IColumn* column, Arena* arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ class AggregationNode : public ::doris::ExecNode {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), (ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
_deserialize_buffer.data(), column.get(), _agg_arena_pool.get(),
rows);
}
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
Expand Down

0 comments on commit 90bcb47

Please sign in to comment.