diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index fb3b293a122d411..713e85d5a4816e8 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -73,8 +73,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); if (state->query_options().enable_parallel_outfile) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), _buf_size, &_sender, state->execution_timeout(), - state->batch_size())); + state->query_id(), _buf_size, &_sender, state)); } return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index a3f1133f00e78e3..f8196910021b2c6 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -46,14 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); auto fragment_instance_id = state->fragment_instance_id(); + auto& p = _parent->cast(); if (state->query_options().enable_parallel_result_sink) { _sender = _parent->cast()._sender; } else { - auto& p = _parent->cast(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state)); } _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); + + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + std::shared_ptr arrow_schema; + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); + } return Status::OK(); } @@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } // create writer based on sink type switch (p._sink_type) { case TResultSinkType::MYSQL_PROTOCAL: { @@ -79,10 +86,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { break; } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { - std::shared_ptr arrow_schema; - RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile)); break; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 29eb01bad2aaa8e..be99278ab541a3b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -903,6 +903,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( UniqueId(request->finst_id()).to_thrift(), &schema); if (!st.ok()) { + LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; }