Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 11, 2024
1 parent 9f1022f commit 907fe99
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 17 deletions.
106 changes: 103 additions & 3 deletions be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ class RuntimeState;

namespace doris::pipeline {

struct StreamingHtMinReductionEntry {
// Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
// bytes is greater than this threshold.
int min_ht_mem;
// The minimum reduction factor to expand the hash tables.
double streaming_ht_min_reduction;
};

// TODO: experimentally tune these values and also programmatically get the cache size
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
{0, 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
{1024 * 1024, 1.1},
// Expand into main memory if we're getting a significant reduction.
// The L3 cache is generally 16MB or more
{16 * 1024 * 1024, 2.0},
};

static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);

DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
Expand Down Expand Up @@ -69,6 +93,65 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo&
return Status::OK();
}

bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
if (!_should_expand_hash_table) {
return false;
}

return std::visit(
[&](auto&& agg_method) -> bool {
auto& hash_tbl = *agg_method.hash_table;
auto [ht_mem, ht_rows] =
std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};

// Need some rows in tables to have valid statistics.
if (ht_rows == 0) {
return true;
}

// Find the appropriate reduction factor in our table for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
++cache_level;
}

// Compare the number of rows in the hash table with the number of input rows that
// were aggregated into it. Exclude passed through rows from this calculation since
// they were not in hash tables.
const int64_t input_rows = _input_num_rows;
const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned;
// TODO chenhao
// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;

// TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
// inaccurate, which could lead to a divide by zero below.
if (aggregated_input_rows <= 0) {
return true;
}

// Extrapolate the current reduction factor (r) using the formula
// R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
// set, N is the number of input rows, excluding passed-through rows, and n is the
// number of rows inserted or merged into the hash tables. This is a very rough
// approximation but is good enough to be useful.
// TODO: consider collecting more statistics to better estimate reduction.
// double estimated_reduction = aggregated_input_rows >= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
double min_reduction =
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;

// COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
// return estimated_reduction > min_reduction;
_should_expand_hash_table = current_reduction > min_reduction;
return _should_expand_hash_table;
},
_agg_data->method_variant);
}

void DistinctStreamingAggLocalState::_init_hash_method(
const vectorized::VExprContextSPtrs& probe_exprs) {
DCHECK(probe_exprs.size() >= 1);
Expand Down Expand Up @@ -157,11 +240,20 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}

int rows = in_block->rows();
bool stop_emplace_flag = false;
_distinct_row.clear();
_distinct_row.reserve(rows);

RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows,
&stop_emplace_flag));
// need use _cur_num_rows_returned to decide whether to do continue emplace into hash table
_cur_num_rows_returned += _distinct_row.size();

if (stop_emplace_flag) {
for (int i = 0; i < rows; ++i) {
_distinct_row.push_back(i);
}
}

bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
out_block->mem_reuse();
Expand Down Expand Up @@ -195,12 +287,20 @@ void DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block

void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
const size_t num_rows, bool* stop_emplace_flag) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
auto& hash_tbl = *agg_method.hash_table;
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg &&
hash_tbl.add_elem_size_overflow(num_rows)) {
if (!_should_expand_preagg_hash_tables()) {
*stop_emplace_flag = true;
return;
}
}
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);
size_t row = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows);
const size_t num_rows, bool* stop_emplace_flag);
void _make_nullable_output_key(vectorized::Block* block);
bool _should_expand_preagg_hash_tables();

std::shared_ptr<char> dummy_mapped_data;
vectorized::IColumn::Selector _distinct_row;
vectorized::Arena _arena;
int64_t _output_distinct_rows = 0;
size_t _input_num_rows = 0;
bool _should_expand_hash_table = true;
int64_t _cur_num_rows_returned = 0;

std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vector
block->columns()));
}

rows_have_returned += block->rows();
_node->add_num_rows_returned(block->rows());
return Status::OK();
}

Expand All @@ -71,7 +71,6 @@ Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, vector
std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)));
if (UNLIKELY(eos)) {
_node->set_num_rows_returned(rows_have_returned);
source_state = SourceState::FINISHED;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class DistinctStreamingAggSourceOperator final
Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos);

private:
int64_t rows_have_returned = 0;
std::shared_ptr<DataQueue> _data_queue;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ void Block::clear_column_data(int column_size) noexcept {
}
}
for (auto& d : data) {
DCHECK_EQ(d.column->use_count(), 1);
DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count();
(*std::move(d.column)).assume_mutable()->clear();
}
row_same_bit.clear();
Expand Down
13 changes: 5 additions & 8 deletions be/src/vec/exec/distinct_vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,13 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows,
&stop_emplace_flag));
// if get stop_emplace_flag = true, means have no need to emplace value into hash table
// so return block directly
// so return block directly.
//TODO: maybe could insert key_columns into output_block, but need solve Check failed: d.column->use_count() == 1 (2 vs. 1)
//do not know the in_block whether be use after
if (stop_emplace_flag) {
ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
for (int i = 0; i < rows; ++i) {
_distinct_row.push_back(i);
}
out_block->swap(Block(columns_with_schema));
return Status::OK();
}

SCOPED_TIMER(_insert_keys_to_column_timer);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/distinct_vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DistinctAggregationNode final : public AggregationNode {
DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~DistinctAggregationNode() override = default;
Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block);
void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; }
void add_num_rows_returned(int64_t rows) { _num_rows_returned += rows; }
vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }

private:
Expand Down

0 comments on commit 907fe99

Please sign in to comment.