diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 048bbc51c66e5b..128a15c1678aa2 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -168,6 +168,9 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o if (read_options.use_topn_opt) { auto* query_ctx = read_options.runtime_state->get_query_ctx(); for (int id : read_options.topn_filter_source_node_ids) { + if (!query_ctx->get_runtime_predicate(id).need_update()) { + continue; + } auto runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate(); int32_t uid = diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index cf5e3130526f92..d923b5e9d67553 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -597,6 +597,10 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns); auto* query_ctx = _opts.runtime_state->get_query_ctx(); for (int id : _opts.topn_filter_source_node_ids) { + if (!query_ctx->get_runtime_predicate(id).need_update()) { + continue; + } + std::shared_ptr runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate(); if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(), @@ -1512,6 +1516,10 @@ Status SegmentIterator::_vec_init_lazy_materialization() { if (_opts.use_topn_opt && (_opts.read_orderby_key_columns == nullptr || _opts.read_orderby_key_columns->empty())) { for (int id : _opts.topn_filter_source_node_ids) { + if (!_opts.runtime_state->get_query_ctx()->get_runtime_predicate(id).need_update()) { + continue; + } + auto& runtime_predicate = _opts.runtime_state->get_query_ctx()->get_runtime_predicate(id); _col_predicates.push_back(runtime_predicate.get_predicate().get()); diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index f0229431b7b932..7b40ff4eae1238 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -288,7 +288,7 @@ Status TabletReader::_init_params(const ReaderParams& read_params) { _reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants; RETURN_IF_ERROR(_init_conditions_param(read_params)); - _init_conditions_param_except_leafnode_of_andnode(read_params); + RETURN_IF_ERROR(_init_conditions_param_except_leafnode_of_andnode(read_params)); Status res = _init_delete_condition(read_params); if (!res.ok()) { @@ -558,7 +558,7 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) { return Status::OK(); } -void TabletReader::_init_conditions_param_except_leafnode_of_andnode( +Status TabletReader::_init_conditions_param_except_leafnode_of_andnode( const ReaderParams& read_params) { for (const auto& condition : read_params.conditions_except_leafnode_of_andnode) { TCondition tmp_cond = condition; @@ -578,9 +578,10 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( for (int id : read_params.topn_filter_source_node_ids) { auto& runtime_predicate = read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id); - runtime_predicate.set_tablet_schema(_tablet_schema); + RETURN_IF_ERROR(runtime_predicate.set_tablet_schema(_tablet_schema)); } } + return Status::OK(); } ColumnPredicate* TabletReader::_parse_to_predicate( diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 6a560fac0f8042..3bf83ec296c04b 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -242,7 +242,7 @@ class TabletReader { Status _init_conditions_param(const ReaderParams& read_params); - void _init_conditions_param_except_leafnode_of_andnode(const ReaderParams& read_params); + Status _init_conditions_param_except_leafnode_of_andnode(const ReaderParams& read_params); ColumnPredicate* _parse_to_predicate( const std::pair>& bloom_filter); diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 4975b03720105e..255c909c28630d 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -56,15 +56,17 @@ class RuntimePredicate { return _inited && _tablet_schema; } - void set_tablet_schema(TabletSchemaSPtr tablet_schema) { + Status set_tablet_schema(TabletSchemaSPtr tablet_schema) { std::unique_lock wlock(_rwlock); // when sort node and scan node are not in the same backend, predicate will not be initialized if (_tablet_schema || !_inited) { - return; + return Status::OK(); } + RETURN_IF_ERROR(tablet_schema->have_column(_col_name)); _tablet_schema = tablet_schema; _predicate = SharedPredicate::create_shared( - tablet_schema->field_index(_tablet_schema->column(_col_name).unique_id())); + _tablet_schema->field_index(_tablet_schema->column(_col_name).unique_id())); + return Status::OK(); } std::shared_ptr get_predicate() { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index a39c0c58e7d458..480521f58d31e4 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include "common/config.h" @@ -98,14 +99,19 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, Status status = init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); - _runtime_filter_mgr.reset(new RuntimeFilterMgr(fragment_exec_params.query_id, - RuntimeFilterParamsContext::create(this))); + _runtime_filter_mgr = std::make_unique( + fragment_exec_params.query_id, RuntimeFilterParamsContext::create(this)); if (fragment_exec_params.__isset.runtime_filter_params) { _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( fragment_exec_params.runtime_filter_params); } + if (_query_ctx) { - _query_ctx->init_runtime_predicates({0}); + if (fragment_exec_params.__isset.topn_filter_source_node_ids) { + _query_ctx->init_runtime_predicates(fragment_exec_params.topn_filter_source_node_ids); + } else { + _query_ctx->init_runtime_predicates({0}); + } } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 563be2cc9e1408..006f3dc25dc680 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -359,6 +359,7 @@ struct TPlanFragmentExecParams { // Used to merge and send runtime filter 12: optional TRuntimeFilterParams runtime_filter_params 13: optional bool group_commit // deprecated + 14: optional list topn_filter_source_node_ids } // Global query parameters assigned by the coordinator.