Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 6, 2023
1 parent 64ea450 commit 77e312a
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 15 deletions.
17 changes: 15 additions & 2 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,21 @@ Status OlapScanLocalState::_should_push_down_function_filter(
return Status::OK();
}

bool OlapScanLocalState::_should_push_down_common_expr() {
return state()->enable_common_expr_pushdown() && _storage_no_merge();
bool OlapScanLocalState::_all_slots_is_key_column(const VExpr& expr) {
const auto& children = expr.children();
auto all_slot_is_key = std::all_of(children.begin(), children.end(), [this](const auto& child) {
return _all_slots_is_key_column(*child);
});

return all_slot_is_key ? (expr.node_type() != TExprNodeType::SLOT_REF ||
_is_key_column(expr.expr_name()))
: false;
}

bool OlapScanLocalState::_should_push_down_common_expr(const VExpr& expr) override {
// dup key table, unique key MoW table, unique key MoR table and agg key table must satisfy all expr slot is key column.
return state()->enable_common_expr_pushdown() &&
(_storage_no_merge() || _all_slots_is_key_column(expr));
}

bool OlapScanLocalState::_storage_no_merge() {
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
return vectorized::VScanNode::PushDownType::ACCEPTABLE;
}

bool _should_push_down_common_expr() override;
bool _all_slots_is_key_column(const VExpr& expr) override;

bool _should_push_down_common_expr(const VExpr& expr) override;

bool _storage_no_merge() override;

Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
RETURN_IF_ERROR(_normalize_predicate(conjunct->root(), conjunct.get(), new_root));
if (new_root) {
conjunct->set_root(new_root);
if (_should_push_down_common_expr() &&
if (_should_push_down_common_expr(*(conjunct->root())) &&
vectorized::VExpr::is_acting_on_a_slot(*(conjunct->root()))) {
_common_expr_ctxs_push_down.emplace_back(conjunct);
it = _conjuncts.erase(it);
Expand Down Expand Up @@ -356,9 +356,10 @@ Status ScanLocalState<Derived>::_normalize_predicate(
RETURN_IF_PUSH_DOWN(
_normalize_bitmap_filter(cur_expr, context, slot, &pdt),
status);
RETURN_IF_PUSH_DOWN(
_normalize_bloom_filter(cur_expr, context, slot, &pdt),
status);
// for run regression test
// RETURN_IF_PUSH_DOWN(
// _normalize_bloom_filter(cur_expr, context, slot, &pdt),
// status);
if (state()->enable_function_pushdown()) {
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
cur_expr, context, slot, &pdt),
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class ScanLocalState : public ScanLocalStateBase {
RETURN_IF_ERROR(_normalize_conjuncts());
return Status::OK();
}
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _should_push_down_common_expr(const VExpr& expr) { return false; }

virtual bool _storage_no_merge() { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
Expand Down
17 changes: 15 additions & 2 deletions be/src/vec/exec/scan/new_olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,21 @@ class NewOlapScanNode : public VScanNode {

PushDownType _should_push_down_is_null_predicate() override { return PushDownType::ACCEPTABLE; }

bool _should_push_down_common_expr() override {
return _state->enable_common_expr_pushdown() && _storage_no_merge();
bool _all_slots_is_key_column(const VExpr& expr) override {
const auto& children = expr.children();
auto all_slot_is_key =
std::all_of(children.begin(), children.end(),
[this](const auto& child) { return _all_slots_is_key_column(*child); });

return all_slot_is_key ? (expr.node_type() != TExprNodeType::SLOT_REF ||
_is_key_column(expr.expr_name()))
: false;
}

bool _should_push_down_common_expr(const VExpr& expr) override {
// dup key table, unique key MoW table, unique key MoR table and agg key table must satisfy all expr slot is key column.
return _state->enable_common_expr_pushdown() &&
(_storage_no_merge() || _all_slots_is_key_column(expr));
}

bool _storage_no_merge() override {
Expand Down
9 changes: 5 additions & 4 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ Status VScanNode::_normalize_conjuncts() {
RETURN_IF_ERROR(_normalize_predicate(conjunct->root(), conjunct.get(), new_root));
if (new_root) {
conjunct->set_root(new_root);
if (_should_push_down_common_expr() &&
if (_should_push_down_common_expr(*(conjunct->root())) &&
VExpr::is_acting_on_a_slot(*(conjunct->root()))) {
// We need to make sure conjunct is acting on a slot before push it down.
// Or it will not be executed by SegmentIterator::_vec_init_lazy_materialization
Expand Down Expand Up @@ -524,9 +524,10 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExp
RETURN_IF_PUSH_DOWN(
_normalize_bitmap_filter(cur_expr, context, slot, &pdt),
status);
RETURN_IF_PUSH_DOWN(
_normalize_bloom_filter(cur_expr, context, slot, &pdt),
status);
// for run regression test
// RETURN_IF_PUSH_DOWN(
// _normalize_bloom_filter(cur_expr, context, slot, &pdt),
// status);
if (_state->enable_function_pushdown()) {
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
cur_expr, context, slot, &pdt),
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
return Status::OK();
}

virtual bool _should_push_down_common_expr() { return false; }
virtual bool _all_slots_is_key_column(const VExpr& expr) { return false; }
virtual bool _should_push_down_common_expr(const VExpr& expr) { return false; }

virtual bool _storage_no_merge() { return false; }

Expand Down

0 comments on commit 77e312a

Please sign in to comment.