Skip to content

Commit

Permalink
Separate fixed key hash map context creator (apache#25438)
Browse files Browse the repository at this point in the history
Separate fixed key hash map context creator
  • Loading branch information
BiteTheDDDDt authored Oct 16, 2023
1 parent c482c22 commit d00d029
Show file tree
Hide file tree
Showing 36 changed files with 259 additions and 1,177 deletions.
1 change: 1 addition & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Checks: |
readability-*,
-readability-identifier-length,
-readability-implicit-bool-conversion,
-readability-function-cognitive-complexity,
portability-simd-intrinsics,
performance-type-promotion-in-math-fn,
performance-faster-string-find,
Expand Down
63 changes: 9 additions & 54 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "runtime/primitive_type.h"
#include "vec/common/hash_table/hash.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -109,8 +110,6 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
Base::_shared_state->agg_profile_arena = std::make_unique<vectorized::Arena>();

if (Base::_shared_state->probe_expr_ctxs.empty()) {
_agg_data->init(vectorized::AggregatedDataVariants::Type::without_key);

_agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>(
Base::_shared_state->agg_profile_arena->alloc(p._total_size_of_aggregate_states));

Expand Down Expand Up @@ -500,9 +499,8 @@ void AggSinkLocalState<DependencyType, Derived>::_emplace_into_hash_table(
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, Base::_shared_state->probe_key_sz);
agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz,
num_rows);
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, *_agg_arena_pool);
Expand Down Expand Up @@ -545,9 +543,8 @@ void AggSinkLocalState<DependencyType, Derived>::_find_in_hash_table(
[&](auto&& agg_method) -> void {
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, Base::_shared_state->probe_key_sz);
agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz,
num_rows);
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);

/// For all rows.
for (size_t i = 0; i < num_rows; ++i) {
Expand Down Expand Up @@ -625,52 +622,10 @@ void AggSinkLocalState<DependencyType, Derived>::_init_hash_method(
!Base::_parent->template cast<typename Derived::Parent>()._is_first_phase),
is_nullable);
} else {
bool use_fixed_key = true;
bool has_null = false;
size_t key_byte_size = 0;
size_t bitmap_size =
vectorized::get_bitmap_size(Base::_shared_state->probe_expr_ctxs.size());

Base::_shared_state->probe_key_sz.resize(Base::_shared_state->probe_expr_ctxs.size());
for (int i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); ++i) {
const auto& expr = Base::_shared_state->probe_expr_ctxs[i]->root();
const auto& data_type = expr->data_type();

if (!data_type->have_maximum_size_of_value()) {
use_fixed_key = false;
break;
}

auto is_null = data_type->is_nullable();
has_null |= is_null;
Base::_shared_state->probe_key_sz[i] =
data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
key_byte_size += Base::_shared_state->probe_key_sz[i];
}

if (!has_null) {
bitmap_size = 0;
}

if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
use_fixed_key = false;
}

if (use_fixed_key) {
if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
t = Type::int64_keys;
} else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt128)) {
t = Type::int128_keys;
} else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt136)) {
t = Type::int136_keys;
} else {
t = Type::int256_keys;
}
_agg_data->init(get_hash_key_type_with_phase(
t, !Base::_parent->template cast<typename Derived::Parent>()
._is_first_phase),
has_null);
} else {
if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
vectorized::AggregateDataPtr,
vectorized::AggregatedMethodVariants>(
_agg_data->method_variant, probe_exprs)) {
_agg_data->init(Type::serialized);
}
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
}
}

{
context.insert_keys_into_columns(keys, key_columns, num_rows,
Base::_shared_state->probe_key_sz);
}
{ context.insert_keys_into_columns(keys, key_columns, num_rows); }

if (hash_table.has_null_key_data()) {
// only one key of group by support wrap null key
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta

{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows,
_shared_state->probe_key_sz);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows);
}

if (iter == _shared_state->aggregate_data_container->end()) {
Expand Down Expand Up @@ -358,8 +357,7 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st

{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows,
_shared_state->probe_key_sz);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows);
}

for (size_t i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct(
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _shared_state->probe_key_sz);
agg_method.init_serialized_keys(key_columns, Base::_shared_state->probe_key_sz,
num_rows);
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);
size_t row = 0;
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, _arena);
Expand Down
75 changes: 2 additions & 73 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_shared_hash_table_dependency = SharedHashTableDependency::create_shared(_parent->id());
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_shared_state->probe_key_sz = p._build_key_sz;
if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) {
_shared_state->build_blocks = p._shared_hash_table_context->blocks;
} else {
Expand Down Expand Up @@ -144,10 +143,6 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_key_sz;
}

bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
Expand Down Expand Up @@ -326,62 +321,8 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
}
return;
}

bool use_fixed_key = true;
bool has_null = false;
size_t key_byte_size = 0;
size_t bitmap_size = vectorized::get_bitmap_size(_build_expr_ctxs.size());

for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
const auto vexpr = _build_expr_ctxs[i]->root();
const auto& data_type = vexpr->data_type();

if (!data_type->have_maximum_size_of_value()) {
use_fixed_key = false;
break;
}

auto is_null = data_type->is_nullable();
has_null |= is_null;
key_byte_size += p._build_key_sz[i];
}

if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
use_fixed_key = false;
}

if (use_fixed_key) {
// TODO: may we should support uint256 in the future
if (has_null) {
if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
_shared_state->hash_table_variants
->emplace<vectorized::I64FixedKeyHashTableContext<
true, RowRefListType>>();
} else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt128)) {
_shared_state->hash_table_variants
->emplace<vectorized::I128FixedKeyHashTableContext<
true, RowRefListType>>();
} else {
_shared_state->hash_table_variants
->emplace<vectorized::I256FixedKeyHashTableContext<
true, RowRefListType>>();
}
} else {
if (key_byte_size <= sizeof(vectorized::UInt64)) {
_shared_state->hash_table_variants
->emplace<vectorized::I64FixedKeyHashTableContext<
false, RowRefListType>>();
} else if (key_byte_size <= sizeof(vectorized::UInt128)) {
_shared_state->hash_table_variants
->emplace<vectorized::I128FixedKeyHashTableContext<
false, RowRefListType>>();
} else {
_shared_state->hash_table_variants
->emplace<vectorized::I256FixedKeyHashTableContext<
false, RowRefListType>>();
}
}
} else {
if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, RowRefListType>(
*_shared_state->hash_table_variants, _build_expr_ctxs)) {
_shared_state->hash_table_variants
->emplace<vectorized::SerializedHashTableContext<RowRefListType>>();
}
Expand Down Expand Up @@ -448,18 +389,6 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
null_aware ||
(_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null));
}

for (const auto& expr : _build_expr_ctxs) {
const auto& data_type = expr->root()->data_type();
if (!data_type->have_maximum_size_of_value()) {
break;
}

auto is_null = data_type->is_nullable();
_build_key_sz.push_back(data_type->get_maximum_size_of_value_in_memory() -
(is_null ? 1 : 0));
}

return Status::OK();
}

Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class HashJoinBuildSinkLocalState final
void init_short_circuit_for_probe();
HashJoinBuildSinkOperatorX* join_build() { return (HashJoinBuildSinkOperatorX*)_parent; }

vectorized::Sizes& build_key_sz();
bool build_unique() const;
std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
Expand Down Expand Up @@ -168,8 +167,6 @@ class HashJoinBuildSinkOperatorX final
// mark the join column whether support null eq
std::vector<bool> _is_null_safe_eq_join;

vectorized::Sizes _build_key_sz;

bool _is_broadcast_join = false;
std::shared_ptr<vectorized::SharedHashTableController> _shared_hashtable_controller = nullptr;

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class HashJoinProbeLocalState final
std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
return _shared_state->build_blocks;
}
vectorized::Sizes probe_key_sz() const { return _shared_state->probe_key_sz; }

private:
void _prepare_probe_block();
Expand Down
59 changes: 6 additions & 53 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "partition_sort_sink_operator.h"

#include "common/status.h"
#include "vec/common/hash_table/hash.h"

namespace doris {

Expand Down Expand Up @@ -180,10 +181,9 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;

AggState state(key_columns, local_state._partition_key_sz);
AggState state(key_columns);
size_t num_rows = input_block->rows();
agg_method.init_serialized_keys(key_columns, local_state._partition_key_sz,
num_rows);
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool);
Expand Down Expand Up @@ -282,56 +282,9 @@ void PartitionSortSinkLocalState::_init_hash_method() {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
}
} else {
bool use_fixed_key = true;
bool has_null = false;
size_t key_byte_size = 0;
size_t bitmap_size = vectorized::get_bitmap_size(_partition_exprs_num);

_partition_key_sz.resize(_partition_exprs_num);
for (int i = 0; i < _partition_exprs_num; ++i) {
const auto& data_type = _partition_expr_ctxs[i]->root()->data_type();

if (!data_type->have_maximum_size_of_value()) {
use_fixed_key = false;
break;
}

auto is_null = data_type->is_nullable();
has_null |= is_null;
_partition_key_sz[i] =
data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
key_byte_size += _partition_key_sz[i];
}

if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
use_fixed_key = false;
}

if (use_fixed_key) {
if (has_null) {
if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
} else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt128)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
} else {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
}
} else {
if (key_byte_size <= sizeof(vectorized::UInt64)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
} else if (key_byte_size <= sizeof(vectorized::UInt128)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
} else {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
}
}
} else {
if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
vectorized::PartitionDataPtr>(
_partitioned_data->method_variant, _partition_expr_ctxs)) {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
std::vector<const vectorized::IColumn*> _partition_columns;
std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
std::vector<size_t> _partition_key_sz;
int _partition_exprs_num = 0;

RuntimeProfile::Counter* _build_timer;
Expand Down
Loading

0 comments on commit d00d029

Please sign in to comment.