Skip to content

Commit

Permalink
[fix](flink) Fix scanner be cancelled early on pipelineX (apache#42421)
Browse files Browse the repository at this point in the history
1. A query_id is randomly generated to replace
t_query_plan_info.query_id. external query does not need to report
anything to FE, so the query_id can be changed.
Otherwise, multiple independent concurrent open tablet scanners have the
same query_id. when one of the scanners ends, the other scanners will be
canceled through FragmentMgr.cancel(query_id).

2. (query_id, fragment_id) is executed only on one BE, locks
_pipeline_map.

3. External query (flink/spark read tablets) not need to report to FE.
  • Loading branch information
xinyiZzz committed Oct 28, 2024
1 parent f0f4284 commit 95fd467
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
44 changes: 26 additions & 18 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report(
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
if (req.coord_addr.hostname == "external") {
// External query (flink/spark read tablets) not need to report to FE.
return;
}
Status exec_status = req.status;
Status coord_status;
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr,
Expand Down Expand Up @@ -836,31 +840,33 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
query_ctx->set_merge_controller_handler(handler);
}

for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
{
// (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})",
params.fragment_id);
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment query_id({}) input duplicated fragment_id({})",
print_id(params.query_id), params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}

if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}

int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
{
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
// TODO: simplify this mapping
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}

if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}

query_ctx->set_pipeline_context(params.fragment_id, context);

RETURN_IF_ERROR(context->submit());
Expand Down Expand Up @@ -1070,6 +1076,7 @@ void FragmentMgr::debug(std::stringstream& ss) {}
*/
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
const TQueryPlanInfo& t_query_plan_info,
const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns) {
// set up desc tbl
Expand Down Expand Up @@ -1110,8 +1117,9 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,

// assign the param used for executing of PlanFragment-self
TPipelineInstanceParams fragment_exec_params;
exec_fragment_params.query_id = t_query_plan_info.query_id;
exec_fragment_params.query_id = query_id;
fragment_exec_params.fragment_instance_id = fragment_instance_id;
exec_fragment_params.coord.hostname = "external";
std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
std::vector<TScanRangeParams> scan_ranges;
std::vector<int64_t> tablet_ids = params.tablet_ids;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class FragmentMgr : public RestMonitorIface {
// execute external query, all query info are packed in TScanOpenParams
Status exec_external_plan_fragment(const TScanOpenParams& params,
const TQueryPlanInfo& t_query_plan_info,
const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns);

Expand Down
14 changes: 12 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus& t_status,
void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) {
TStatus t_status;
TUniqueId fragment_instance_id = generate_uuid();
// A query_id is randomly generated to replace t_query_plan_info.query_id.
// external query does not need to report anything to FE, so the query_id can be changed.
// Otherwise, multiple independent concurrent open tablet scanners have the same query_id.
// when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id).
TUniqueId query_id = generate_uuid();
std::shared_ptr<ScanContext> p_context;
static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context));
p_context->fragment_instance_id = fragment_instance_id;
Expand Down Expand Up @@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenP
<< " deserialize error, should not be modified after returned Doris FE processed";
exec_st = Status::InvalidArgument(msg.str());
}
p_context->query_id = t_query_plan_info.query_id;
p_context->query_id = query_id;
}
std::vector<TScanColumnDesc> selected_columns;
if (exec_st.ok()) {
// start the scan procedure
LOG(INFO) << fmt::format(
"exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id "
"= {}",
print_id(t_query_plan_info.query_id), print_id(query_id),
print_id(fragment_instance_id));
exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment(
params, t_query_plan_info, fragment_instance_id, &selected_columns);
params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns);
}
exec_st.to_thrift(&t_status);
//return status
Expand Down

0 comments on commit 95fd467

Please sign in to comment.