Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](runtime filter) Fix runtime filter producers #44294

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,8 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly, bool need_local_merge) {
*res = std::make_shared<IRuntimeFilter>(state, desc, need_local_merge);
bool build_bf_exactly) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}
Expand Down Expand Up @@ -1326,10 +1326,10 @@ bool IRuntimeFilter::get_ignored() {

std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}]",
_is_push_down, _get_explain_state_string(), _has_remote_target, _has_local_target,
_wrapper->_context->ignored);
_filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target,
_has_local_target, _wrapper->_context->ignored);
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
Expand Down Expand Up @@ -1524,9 +1524,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, "
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join,
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
}

Expand Down
15 changes: 5 additions & 10 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ enum RuntimeFilterState {
/// that can be pushed down to node based on the results of the right table.
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
bool need_local_merge = false)
IRuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc)
: _state(state),
_filter_id(desc->filter_id),
_is_broadcast_join(true),
Expand All @@ -206,17 +205,16 @@ class IRuntimeFilter {
_wait_infinitely(_state->get_query_ctx()->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(_state->get_query_ctx()->runtime_filter_wait_time_ms()),
_runtime_filter_type(get_runtime_filter_type(desc)),
_profile(
new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id, to_string(_runtime_filter_type)))),
_need_local_merge(need_local_merge) {}
_profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
to_string(_runtime_filter_type)))) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false, bool need_local_merge = false);
bool build_bf_exactly = false);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -417,9 +415,6 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
// `_need_local_merge` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE before push_to_remote or publish.
bool _need_local_merge = false;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;

Expand Down
27 changes: 14 additions & 13 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
if (!has_exist) {
std::shared_ptr<IRuntimeFilter> filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER,
node_id, &filter, build_bf_exactly,
need_local_merge));
node_id, &filter, build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
} else if (!need_local_merge) {
Expand All @@ -123,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc

Status RuntimeFilterMgr::register_local_merge_producer_filter(
const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter, bool build_bf_exactly) {
std::shared_ptr<IRuntimeFilter> producer_filter, bool build_bf_exactly) {
DCHECK(_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
Expand All @@ -139,21 +138,19 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
}

DCHECK(_state != nullptr);
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1,
producer_filter, build_bf_exactly, true));
{
std::lock_guard<std::mutex> l(*iter->second.lock);
if (iter->second.filters.empty()) {
std::shared_ptr<IRuntimeFilter> merge_filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly, true));
build_bf_exactly));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
iter->second.merge_time++;
iter->second.merge_size_times++;
iter->second.filters.emplace_back(*producer_filter);
iter->second.filters.emplace_back(producer_filter);
}
return Status::OK();
}
Expand All @@ -174,6 +171,16 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
return Status::OK();
}

doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id) {
DCHECK(_is_global);
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_merge_producer_map.find(filter_id);
if (iter == _local_merge_producer_map.end()) {
return nullptr;
}
return &iter->second;
}

Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
Expand Down Expand Up @@ -385,12 +392,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryC
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
auto filter = try_get_product_filter(request->filter_id());
if (filter) {
filter->set_synced_size(request->filter_size());
return Status::OK();
}

LocalMergeFilters* local_merge_filters = nullptr;
RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), &local_merge_filters));
// first filter size merged filter
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ class RuntimeFilterMgr {

Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
std::shared_ptr<IRuntimeFilter> producer_filter,
bool build_bf_exactly = false);

Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters);
LocalMergeFilters* get_local_merge_producer_filters(int filter_id);

Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() {
Status RuntimeState::register_producer_runtime_filter(
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly) {
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
// Producers are created by local runtime filter mgr and shared by global runtime filter manager.
// When RF is published, consumers in both global and local RF mgr will be found.
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
desc, query_options(), producer_filter, build_bf_exactly));
return local_runtime_filter_mgr()->register_producer_filter(desc, query_options(),
producer_filter, build_bf_exactly);
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
desc, query_options(), *producer_filter, build_bf_exactly));
return Status::OK();
}

Status RuntimeState::register_consumer_runtime_filter(
Expand Down
Loading