Skip to content

Commit

Permalink
[fix](arrow-flight-sql) Fix FE not found arrow flight schema (apache#…
Browse files Browse the repository at this point in the history
…43960)

Problem Summary:

After query first phase `exec_plan_fragment`, FE will fetches arrow
schema to BE, but BE will generate arrow schema when query second stage
`ResultSinkLocalState::open`.

Therefore, this pr is changed to generate arrow schema in the first
phase `ResultSinkLocalState::init`.

Fix:

```
rrmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled], error code: null, error msg:
java.lang.RuntimeException: fetch arrow flight schema failed, finstId: 3573efbeb10c44a7-956531d8e15d1630, errmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled]
        at org.apache.doris.service.arrowflight.FlightSqlConnectProcessor.fetchArrowFlightSchema(FlightSqlConnectProcessor.java:126) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.executeQueryStatement(DorisFlightSqlProducer.java:229) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.getFlightInfoStatement(DorisFlightSqlProducer.java:260) ~[doris-fe.jar:1.2-SNAPSHOT]
```
  • Loading branch information
xinyiZzz committed Nov 20, 2024
1 parent 97e009f commit 9f0eda2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
if (state->query_options().enable_parallel_outfile) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), _buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
state->query_id(), _buf_size, &_sender, state));
}
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Expand Down
21 changes: 12 additions & 9 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
auto fragment_instance_id = state->fragment_instance_id();

auto& p = _parent->cast<ResultSinkOperatorX>();
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state));
}
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());

_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
_sender->register_arrow_schema(arrow_schema);
}
return Status::OK();
}

Expand All @@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ResultSinkOperatorX>();
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL: {
Expand All @@ -79,10 +86,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
_sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
break;
Expand Down
1 change: 1 addition & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift(), &schema);
if (!st.ok()) {
LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
Expand Down

0 comments on commit 9f0eda2

Please sign in to comment.