Skip to content

Commit

Permalink
[Enhancement](filter) support only min/max runtime filter in BE (apac…
Browse files Browse the repository at this point in the history
…he#25290)

this PR apache#25193 have achieve about FE.
eg: select count() from lineorder join supplier on lo_partkey < s_suppkey;
will have a max filter after build hash table , so could use it to filter probe table data.
  • Loading branch information
zhangstar333 committed Oct 13, 2023
1 parent 78ecf7c commit 6a40811
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 3 deletions.
155 changes: 154 additions & 1 deletion be/src/exprs/minmax_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,164 @@ class MinMaxNumFunc : public MinMaxFuncBase {
return Status::OK();
}

private:
protected:
T _max = type_limit<T>::min();
T _min = type_limit<T>::max();
// we use _empty to avoid compare twice
bool _empty = true;
};

template <class T>
class MinNumFunc : public MinMaxNumFunc<T> {
public:
MinNumFunc() = default;
~MinNumFunc() override = default;

void insert(const void* data) override {
if (data == nullptr) {
return;
}

T val_data = *reinterpret_cast<const T*>(data);

if (this->_empty) {
this->_min = val_data;
this->_empty = false;
return;
}
if (val_data < this->_min) {
this->_min = val_data;
}
}

void insert_fixed_len(const char* data, const int* offsets, int number) override {
if (!number) {
return;
}
if (this->_empty) {
this->_min = *((T*)data + offsets[0]);
}
for (int i = this->_empty; i < number; i++) {
this->_min = std::min(this->_min, *((T*)data + offsets[i]));
}
this->_empty = false;
}

bool find(void* data) override {
if (data == nullptr) {
return false;
}

T val_data = *reinterpret_cast<T*>(data);
return val_data >= this->_min;
}

Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
if constexpr (std::is_same_v<T, StringRef>) {
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
if (other_minmax->_min < this->_min) {
auto& other_min = other_minmax->_min;
auto str = pool->add(new std::string(other_min.data, other_min.size));
this->_min.data = str->data();
this->_min.size = str->length();
}
} else {
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
if (other_minmax->_min < this->_min) {
this->_min = other_minmax->_min;
}
}

return Status::OK();
}

//min filter the max is useless, so return nullptr directly
void* get_max() override {
DCHECK(false);
return nullptr;
}

Status assign(void* min_data, void* max_data) override {
this->_min = *(T*)min_data;
return Status::OK();
}
};

template <class T>
class MaxNumFunc : public MinMaxNumFunc<T> {
public:
MaxNumFunc() = default;
~MaxNumFunc() override = default;

void insert(const void* data) override {
if (data == nullptr) {
return;
}

T val_data = *reinterpret_cast<const T*>(data);

if (this->_empty) {
this->_max = val_data;
this->_empty = false;
return;
}
if (val_data > this->_max) {
this->_max = val_data;
}
}

void insert_fixed_len(const char* data, const int* offsets, int number) override {
if (!number) {
return;
}
if (this->_empty) {
this->_max = *((T*)data + offsets[0]);
}
for (int i = this->_empty; i < number; i++) {
this->_max = std::max(this->_max, *((T*)data + offsets[i]));
}
this->_empty = false;
}

bool find(void* data) override {
if (data == nullptr) {
return false;
}

T val_data = *reinterpret_cast<T*>(data);
return val_data <= this->_max;
}

Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
if constexpr (std::is_same_v<T, StringRef>) {
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);

if (other_minmax->_max > this->_max) {
auto& other_max = other_minmax->_max;
auto str = pool->add(new std::string(other_max.data, other_max.size));
this->_max.data = str->data();
this->_max.size = str->length();
}
} else {
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);
if (other_minmax->_max > this->_max) {
this->_max = other_minmax->_max;
}
}

return Status::OK();
}

//max filter the min is useless, so return nullptr directly
void* get_min() override {
DCHECK(false);
return nullptr;
}

Status assign(void* min_data, void* max_data) override {
this->_max = *(T*)max_data;
return Status::OK();
}
};

} // namespace doris
61 changes: 60 additions & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set.reset(create_set(_column_return_type));
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
break;
Expand Down Expand Up @@ -488,6 +490,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set->insert(data);
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert(data);
break;
Expand Down Expand Up @@ -531,6 +535,8 @@ class RuntimePredicateWrapper {
_context.hybrid_set->insert_fixed_len(data, offsets, number);
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->insert_fixed_len(data, offsets, number);
break;
Expand Down Expand Up @@ -658,6 +664,8 @@ class RuntimePredicateWrapper {
}
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
_context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool);
break;
Expand Down Expand Up @@ -1301,7 +1309,21 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
if (desc->type == TRuntimeFilterType::BLOOM) {
_runtime_filter_type = RuntimeFilterType::BLOOM_FILTER;
} else if (desc->type == TRuntimeFilterType::MIN_MAX) {
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
if (desc->__isset.min_max_type) {
switch (desc->min_max_type) {
case TMinMaxRuntimeFilterType::MIN: {
_runtime_filter_type = RuntimeFilterType::MIN_FILTER;
}
case TMinMaxRuntimeFilterType::MAX: {
_runtime_filter_type = RuntimeFilterType::MAX_FILTER;
}
case TMinMaxRuntimeFilterType::MIN_MAX: {
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
}
}
} else {
_runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
}
} else if (desc->type == TRuntimeFilterType::IN) {
_runtime_filter_type = RuntimeFilterType::IN_FILTER;
} else if (desc->type == TRuntimeFilterType::IN_OR_BLOOM) {
Expand Down Expand Up @@ -1878,6 +1900,43 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
}
break;
}
case RuntimeFilterType::MIN_FILTER: {
// create min filter
vectorized::VExprSPtr min_pred;
TExprNode min_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred,
&min_pred_node));
vectorized::VExprSPtr min_literal;
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_min(),
min_literal));
min_pred->add_child(probe_ctx->root());
min_pred->add_child(min_literal);
container.push_back(
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
vectorized::VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
probe_ctxs.push_back(new_probe_ctx);
break;
}
case RuntimeFilterType::MAX_FILTER: {
vectorized::VExprSPtr max_pred;
// create max filter
TExprNode max_pred_node;
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred,
&max_pred_node));
vectorized::VExprSPtr max_literal;
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(),
max_literal));
max_pred->add_child(probe_ctx->root());
max_pred->add_child(max_literal);
container.push_back(
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));

vectorized::VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
probe_ctxs.push_back(new_probe_ctx);
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
vectorized::VExprSPtr max_pred;
// create max filter
Expand Down
10 changes: 9 additions & 1 deletion be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ enum class RuntimeFilterType {
MINMAX_FILTER = 1,
BLOOM_FILTER = 2,
IN_OR_BLOOM_FILTER = 3,
BITMAP_FILTER = 4
BITMAP_FILTER = 4,
MIN_FILTER = 5, // only min // now only support at local
MAX_FILTER = 6 // only max // now only support at local
};

inline std::string to_string(RuntimeFilterType type) {
Expand All @@ -86,6 +88,12 @@ inline std::string to_string(RuntimeFilterType type) {
case RuntimeFilterType::BLOOM_FILTER: {
return std::string("bloomfilter");
}
case RuntimeFilterType::MIN_FILTER: {
return std::string("only_min");
}
case RuntimeFilterType::MAX_FILTER: {
return std::string("only_max");
}
case RuntimeFilterType::MINMAX_FILTER: {
return std::string("minmax");
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
*eos = true;
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (_short_circuit_for_probe_and_additional_data) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
Expand Down

0 comments on commit 6a40811

Please sign in to comment.