Skip to content

Commit

Permalink
[Improvementation](scan) allow pushdown when filter's target column i…
Browse files Browse the repository at this point in the history
…s value column (apache#42057)

## Proposed changes
allow pushdown when filter's target column is value column
after apache#35960, storage layer can
judge value filter apply some special rowset(non-overlaping first
rowset)
  • Loading branch information
BiteTheDDDDt authored Oct 18, 2024
1 parent 19ad414 commit 8c34574
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 27 deletions.
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,13 @@ Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) {
}

bool OlapScanLocalState::_is_key_column(const std::string& key_name) {
auto& p = _parent->cast<OlapScanOperatorX>();
// all column in dup_keys table or unique_keys with merge on write table olap scan node threat
// as key column
if (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
(p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
p._olap_scan_node.enable_unique_key_merge_on_write)) {
if (_storage_no_merge()) {
return true;
}

auto& p = _parent->cast<OlapScanOperatorX>();
auto res = std::find(p._olap_scan_node.key_column_name.begin(),
p._olap_scan_node.key_column_name.end(), key_name);
return res != p._olap_scan_node.key_column_name.end();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
if (!predicate.target_is_slot(_parent->node_id())) {
return false;
}
return _is_key_column(predicate.get_col_name(_parent->node_id())) || _storage_no_merge();
return _is_key_column(predicate.get_col_name(_parent->node_id()));
}

Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
Expand Down
35 changes: 14 additions & 21 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,15 @@ Status ScanLocalState<Derived>::_normalize_predicate(
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
cur_expr, context, slot, value_range, &pdt),
status);
if (_is_key_column(slot->col_name())) {
RETURN_IF_PUSH_DOWN(
_normalize_bitmap_filter(cur_expr, context, slot, &pdt),
status);
RETURN_IF_PUSH_DOWN(
_normalize_bloom_filter(cur_expr, context, slot, &pdt), status);
if (state()->enable_function_pushdown()) {
RETURN_IF_PUSH_DOWN(
_normalize_bitmap_filter(cur_expr, context, slot, &pdt),
_normalize_function_filters(cur_expr, context, slot, &pdt),
status);
RETURN_IF_PUSH_DOWN(
_normalize_bloom_filter(cur_expr, context, slot, &pdt),
status);
if (state()->enable_function_pushdown()) {
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
cur_expr, context, slot, &pdt),
status);
}
}
},
*range);
Expand All @@ -330,8 +327,7 @@ Status ScanLocalState<Derived>::_normalize_predicate(
return Status::OK();
}

if (pdt == PushDownType::ACCEPTABLE &&
(_is_key_column(slot->col_name()) || _storage_no_merge())) {
if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name()))) {
output_expr = nullptr;
return Status::OK();
} else {
Expand Down Expand Up @@ -524,15 +520,15 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
if (vexpr->is_constant()) {
std::shared_ptr<ColumnPtrWrapper> const_col_wrapper;
RETURN_IF_ERROR(vexpr->get_const_col(expr_ctx, &const_col_wrapper));
if (const vectorized::ColumnConst* const_column =
if (const auto* const_column =
check_and_get_column<vectorized::ColumnConst>(const_col_wrapper->column_ptr)) {
constant_val = const_cast<char*>(const_column->get_data_at(0).data);
if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) {
*pdt = PushDownType::ACCEPTABLE;
_eos = true;
_scan_dependency->set_ready();
}
} else if (const vectorized::ColumnVector<vectorized::UInt8>* bool_column =
} else if (const auto* bool_column =
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
const_col_wrapper->column_ptr)) {
// TODO: If `vexpr->is_constant()` is true, a const column is expected here.
Expand Down Expand Up @@ -583,24 +579,21 @@ Status ScanLocalState<Derived>::_normalize_in_and_eq_predicate(vectorized::VExpr
if (hybrid_set->size() <=
_parent->cast<typename Derived::Parent>()._max_pushdown_conditions_per_column) {
iter = hybrid_set->begin();
} else if (_is_key_column(slot->col_name()) || _storage_no_merge()) {
} else {
_filter_predicates.in_filters.emplace_back(slot->col_name(), expr->get_set_func());
*pdt = PushDownType::ACCEPTABLE;
return Status::OK();
} else {
*pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
} else {
// normal in predicate
vectorized::VInPredicate* pred = static_cast<vectorized::VInPredicate*>(expr);
auto* pred = static_cast<vectorized::VInPredicate*>(expr);
PushDownType temp_pdt = _should_push_down_in_predicate(pred, expr_ctx, false);
if (temp_pdt == PushDownType::UNACCEPTABLE) {
return Status::OK();
}

// begin to push InPredicate value into ColumnValueRange
vectorized::InState* state = reinterpret_cast<vectorized::InState*>(
auto* state = reinterpret_cast<vectorized::InState*>(
expr_ctx->fn_context(pred->fn_context_index())
->get_function_state(FunctionContext::FRAGMENT_LOCAL));

Expand All @@ -619,7 +612,7 @@ Status ScanLocalState<Derived>::_normalize_in_and_eq_predicate(vectorized::VExpr
iter->next();
continue;
}
auto value = const_cast<void*>(iter->get_value());
auto* value = const_cast<void*>(iter->get_value());
RETURN_IF_ERROR(_change_value_range<true>(
temp_range, value, ColumnValueRange<T>::add_fixed_value_range, ""));
iter->next();
Expand Down

0 comments on commit 8c34574

Please sign in to comment.