Skip to content

Commit

Permalink
[fix](orc) check all the cases before build_search_argument (apache#4…
Browse files Browse the repository at this point in the history
…4615)

### What problem does this PR solve?
In the old logic, the `check_expr_can_push_down` function does not check
whether the `orc::Literal` are constructed successfully, but only checks
during `build_search_argument`. However, if it is found that the
`orc::Literal` fails to be constructed after `builder->startNot`, it
will fail because the builder cannot end `startNot`.
Therefore, we advance the behavior of constructing `orc::Literal` to the
`check_expr_can_push_down` function and save the result to the map, so
that it will never fail in the `build_search_argument` phase.

Related PR: apache#43255
  • Loading branch information
suxiaogang223 authored Nov 29, 2024
1 parent cb04281 commit 08b187b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 120 deletions.
176 changes: 88 additions & 88 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <memory>
#include <ostream>
#include <tuple>
#include <utility>

#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
Expand Down Expand Up @@ -567,12 +568,14 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,

std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_literal(
const VSlotRef* slot_ref, const VLiteral* literal) {
DCHECK(_col_name_to_file_col_name_low_case.contains(slot_ref->expr_name()));
auto file_col_name_low_case = _col_name_to_file_col_name_low_case[slot_ref->expr_name()];
if (!_type_map.contains(file_col_name_low_case)) {
// TODO: this is for acid table
LOG(WARNING) << "Column " << slot_ref->expr_name() << " not found in _type_map";
return std::make_tuple(false, orc::Literal(false), orc::PredicateDataType::LONG);
}
DCHECK(_type_map.contains(file_col_name_low_case));
const auto* orc_type = _type_map[file_col_name_low_case];
if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" << orc_type->getKind() << "]";
Expand Down Expand Up @@ -624,15 +627,37 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
}
}

// check if the slot of expr can be pushed down to orc reader
// check if the slot of expr can be pushed down to orc reader and make orc predicate type
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
// check if the slot exists in orc file and not partition column
return _col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
if (!_col_name_to_file_col_name.contains(slot_ref->expr_name()) ||
_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name())) {
return false;
}
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
return valid;
}

// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) {
if (!expr->children()[child_id]->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
auto [valid, orc_literal, _] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
return valid;
}

// check if there are rest children of expr can be pushed down to orc reader
Expand All @@ -642,7 +667,7 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
}

for (size_t i = 1; i < expr->children().size(); ++i) {
if (!expr->children()[i]->is_literal()) {
if (!_check_literal_can_push_down(expr, i)) {
return false;
}
}
Expand All @@ -651,7 +676,10 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
DCHECK(expr != nullptr);
if (expr == nullptr) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
Expand Down Expand Up @@ -693,198 +721,167 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
}
}

bool OrcReader::_build_less_than(const VExprSPtr& expr,
void OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_less_than_equals(const VExprSPtr& expr,
void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_equals(const VExprSPtr& expr,
void OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_filter_in(const VExprSPtr& expr,
void OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
std::vector<orc::Literal> literals;
orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG;
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
for (size_t i = 1; i < expr->children().size(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
literals.emplace_back(orc_literal);
predicate_type = type;
}
DCHECK(!literals.empty());
builder->in(slot_ref->expr_name(), predicate_type, literals);
return true;
}

bool OrcReader::_build_is_null(const VExprSPtr& expr,
void OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
builder->isNull(slot_ref->expr_name(), predicate_type);
return true;
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
if (expr == nullptr) {
return false;
}

// if expr can not be pushed down, skip it and continue to next expr
// OPTIMIZE: check expr only once
if (!_check_expr_can_push_down(expr)) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
bool at_least_one_can_push_down = false;
builder->startAnd();
bool at_least_one_can_push_down = false;
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
}
DCHECK(at_least_one_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR:
case TExprOpcode::COMPOUND_OR: {
builder->startOr();
bool all_can_push_down = true;
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
return false;
all_can_push_down = false;
}
}
DCHECK(all_can_push_down);
builder->end();
break;
case TExprOpcode::COMPOUND_NOT:
builder->startNot();
}
case TExprOpcode::COMPOUND_NOT: {
DCHECK_EQ(expr->children().size(), 1);
if (!_build_search_argument(expr->children()[0], builder)) {
return false;
}
builder->startNot();
auto res = _build_search_argument(expr->children()[0], builder);
DCHECK(res);
builder->end();
break;
}
case TExprOpcode::GE:
builder->startNot();
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
builder->end();
break;
case TExprOpcode::LE:
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
break;
case TExprOpcode::LT:
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
break;
case TExprOpcode::EQ:
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
break;
case TExprOpcode::NE:
builder->startNot();
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
builder->end();
break;
case TExprOpcode::FILTER_IN:
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE: {
case TExprOpcode::INVALID_OPCODE:
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;
}
default: {

default:
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
}
return true;
}

Expand All @@ -898,6 +895,8 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr_ctx : conjuncts) {
_vslot_ref_to_orc_predicate_data_type.clear();
_vliteral_to_orc_literal.clear();
if (_build_search_argument(expr_ctx->root(), builder)) {
at_least_one_can_push_down = true;
}
Expand Down Expand Up @@ -943,7 +942,7 @@ Status OrcReader::set_fill_columns(
visit_slot(child.get());
}
} else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) {
if (in_predicate->get_num_children() > 0) {
if (!in_predicate->children().empty()) {
visit_slot(in_predicate->children()[0].get());
}
} else {
Expand Down Expand Up @@ -1179,7 +1178,8 @@ Status OrcReader::_fill_partition_columns(
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
Expand Down
Loading

0 comments on commit 08b187b

Please sign in to comment.