Skip to content

Commit

Permalink
[Bug](top-n) do not get runtime predicate when predicate not initiali…
Browse files Browse the repository at this point in the history
…zed (apache#32208)
  • Loading branch information
BiteTheDDDDt authored Mar 15, 2024
1 parent 20c10ce commit eeda6f1
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 10 deletions.
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
if (read_options.use_topn_opt) {
auto* query_ctx = read_options.runtime_state->get_query_ctx();
for (int id : read_options.topn_filter_source_node_ids) {
if (!query_ctx->get_runtime_predicate(id).need_update()) {
continue;
}
auto runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate();

int32_t uid =
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns);
auto* query_ctx = _opts.runtime_state->get_query_ctx();
for (int id : _opts.topn_filter_source_node_ids) {
if (!query_ctx->get_runtime_predicate(id).need_update()) {
continue;
}

std::shared_ptr<doris::ColumnPredicate> runtime_predicate =
query_ctx->get_runtime_predicate(id).get_predicate();
if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(),
Expand Down Expand Up @@ -1512,6 +1516,10 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
if (_opts.use_topn_opt &&
(_opts.read_orderby_key_columns == nullptr || _opts.read_orderby_key_columns->empty())) {
for (int id : _opts.topn_filter_source_node_ids) {
if (!_opts.runtime_state->get_query_ctx()->get_runtime_predicate(id).need_update()) {
continue;
}

auto& runtime_predicate =
_opts.runtime_state->get_query_ctx()->get_runtime_predicate(id);
_col_predicates.push_back(runtime_predicate.get_predicate().get());
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ Status TabletReader::_init_params(const ReaderParams& read_params) {
_reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants;

RETURN_IF_ERROR(_init_conditions_param(read_params));
_init_conditions_param_except_leafnode_of_andnode(read_params);
RETURN_IF_ERROR(_init_conditions_param_except_leafnode_of_andnode(read_params));

Status res = _init_delete_condition(read_params);
if (!res.ok()) {
Expand Down Expand Up @@ -558,7 +558,7 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
return Status::OK();
}

void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
Status TabletReader::_init_conditions_param_except_leafnode_of_andnode(
const ReaderParams& read_params) {
for (const auto& condition : read_params.conditions_except_leafnode_of_andnode) {
TCondition tmp_cond = condition;
Expand All @@ -578,9 +578,10 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
for (int id : read_params.topn_filter_source_node_ids) {
auto& runtime_predicate =
read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
runtime_predicate.set_tablet_schema(_tablet_schema);
RETURN_IF_ERROR(runtime_predicate.set_tablet_schema(_tablet_schema));
}
}
return Status::OK();
}

ColumnPredicate* TabletReader::_parse_to_predicate(
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class TabletReader {

Status _init_conditions_param(const ReaderParams& read_params);

void _init_conditions_param_except_leafnode_of_andnode(const ReaderParams& read_params);
Status _init_conditions_param_except_leafnode_of_andnode(const ReaderParams& read_params);

ColumnPredicate* _parse_to_predicate(
const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
Expand Down
8 changes: 5 additions & 3 deletions be/src/runtime/runtime_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ class RuntimePredicate {
return _inited && _tablet_schema;
}

void set_tablet_schema(TabletSchemaSPtr tablet_schema) {
Status set_tablet_schema(TabletSchemaSPtr tablet_schema) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
// when sort node and scan node are not in the same backend, predicate will not be initialized
if (_tablet_schema || !_inited) {
return;
return Status::OK();
}
RETURN_IF_ERROR(tablet_schema->have_column(_col_name));
_tablet_schema = tablet_schema;
_predicate = SharedPredicate::create_shared(
tablet_schema->field_index(_tablet_schema->column(_col_name).unique_id()));
_tablet_schema->field_index(_tablet_schema->column(_col_name).unique_id()));
return Status::OK();
}

std::shared_ptr<ColumnPredicate> get_predicate() {
Expand Down
12 changes: 9 additions & 3 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>

#include <memory>
#include <string>

#include "common/config.h"
Expand Down Expand Up @@ -98,14 +99,19 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
Status status =
init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
_runtime_filter_mgr.reset(new RuntimeFilterMgr(fragment_exec_params.query_id,
RuntimeFilterParamsContext::create(this)));
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
fragment_exec_params.query_id, RuntimeFilterParamsContext::create(this));
if (fragment_exec_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
fragment_exec_params.runtime_filter_params);
}

if (_query_ctx) {
_query_ctx->init_runtime_predicates({0});
if (fragment_exec_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(fragment_exec_params.topn_filter_source_node_ids);
} else {
_query_ctx->init_runtime_predicates({0});
}
}
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ struct TPlanFragmentExecParams {
// Used to merge and send runtime filter
12: optional TRuntimeFilterParams runtime_filter_params
13: optional bool group_commit // deprecated
14: optional list<i32> topn_filter_source_node_ids
}

// Global query parameters assigned by the coordinator.
Expand Down

0 comments on commit eeda6f1

Please sign in to comment.