diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index ab0a43f4a635cf..5273960a5c1c29 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -202,7 +202,7 @@ size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) cons Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); _runtime_state->set_task_execution_context(state->get_task_execution_context().lock()); _runtime_state->set_be_number(state->be_number()); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 655a6e19725a9b..cdc6ef881d436d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -166,7 +166,7 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); _runtime_state->set_task_execution_context(state->get_task_execution_context().lock()); _runtime_state->set_be_number(state->be_number()); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 0e56acc1c574b2..20b25d54ff9f16 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -606,7 +606,7 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( } local_state._runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); local_state._runtime_state->set_task_execution_context( diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d221eaeed0faba..878c3870946f1c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -438,7 +438,7 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* auto& local_state = get_local_state(state); local_state._shared_state->inner_runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); local_state._shared_state->inner_runtime_state->set_task_execution_context( state->get_task_execution_context().lock()); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 6e6689d4134deb..6071301c1d7bcc 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -80,7 +80,7 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); _runtime_state->set_task_execution_context(state->get_task_execution_context().lock()); _runtime_state->set_be_number(state->be_number()); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index e766cb27168de1..69ed816fa9142d 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -212,7 +212,7 @@ Status SpillSortLocalState::_create_intermediate_merger( } Status SpillSortLocalState::setup_in_memory_sort_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( - nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), + state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); _runtime_state->set_task_execution_context(state->get_task_execution_context().lock()); _runtime_state->set_be_number(state->be_number()); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e6f257f9da792a..8ceb63eb99324c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -397,9 +397,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag << print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " " << pipeline->debug_string(); _task_runtime_states[pip_idx][i] = RuntimeState::create_unique( - this, local_params.fragment_instance_id, request.query_id, - request.fragment_id, request.query_options, _query_ctx->query_globals, - _exec_env, _query_ctx.get()); + local_params.fragment_instance_id, request.query_id, request.fragment_id, + request.query_options, _query_ctx->query_globals, _exec_env, + _query_ctx.get()); auto& task_runtime_state = _task_runtime_states[pip_idx][i]; _runtime_filter_states[i]->set_state(task_runtime_state.get()); { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 579329c1082633..f3376d06858ec0 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -123,37 +123,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan"); } -RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId& instance_id, - const TUniqueId& query_id, int32_t fragment_id, - const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env, QueryContext* ctx) - : _profile("Fragment " + print_id(instance_id)), - _load_channel_profile(""), - _obj_pool(new ObjectPool()), - _unreported_error_idx(0), - _query_id(query_id), - _fragment_id(fragment_id), - _per_fragment_instance_idx(0), - _num_rows_load_total(0), - _num_rows_load_filtered(0), - _num_rows_load_unselected(0), - _num_rows_filtered_in_strict_mode_partial_update(0), - _num_print_error_rows(0), - _num_bytes_load_total(0), - _num_finished_scan_range(0), - _error_row_number(0), - _query_ctx(ctx) { - [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); - _query_mem_tracker = ctx->query_mem_tracker; -#ifdef BE_TEST - if (_query_mem_tracker == nullptr) { - init_mem_trackers(); - } -#endif - DCHECK(_query_mem_tracker != nullptr && _query_mem_tracker->label() != "Orphan"); - DCHECK(status.ok()); -} - RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ad63510e2af82c..a49567109a3b31 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -85,12 +85,7 @@ class RuntimeState { const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx); - // for only use in pipelineX - RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId& instance_id, - const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx); - - // Used by pipelineX. This runtime state is only used for setup. + // Used by pipeline. This runtime state is only used for setup. RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx);