From b099e62642b2cf527d8556ab256911ba83430848 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 25 Jun 2024 20:34:55 +0800 Subject: [PATCH] 1 --- be/src/pipeline/exec/result_sink_operator.cpp | 3 +-- be/src/runtime/buffer_control_block.cpp | 13 +++++++---- be/src/runtime/buffer_control_block.h | 2 ++ be/src/service/internal_service.cpp | 2 +- fe/fe-core/kafka_datasource_properties | Bin 0 -> 331 bytes .../arrowflight/DorisFlightSqlProducer.java | 2 +- .../FlightSqlConnectProcessor.java | 22 +++++++++--------- fe/fe-core/test_datasource_properties | Bin 0 -> 6 bytes gensrc/proto/internal_service.proto | 2 +- 9 files changed, 25 insertions(+), 21 deletions(-) create mode 100644 fe/fe-core/kafka_datasource_properties create mode 100644 fe/fe-core/test_datasource_properties diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 378fea18eea366e..0495e48b7dc926a 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); + state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); break; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a1a83b22840b2b8..845afb9a84b85cd 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -151,10 +151,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, int num_rows = result->num_rows(); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } - // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); @@ -162,6 +158,7 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, _instance_rows_in_queue.emplace_back(); _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + _arrow_data_arrival.notify_one(); _update_dependency(); return Status::OK(); } @@ -212,6 +209,10 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* return Status::Cancelled("Cancelled"); } + while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + } + if (_is_cancelled) { return Status::Cancelled("Cancelled"); } @@ -234,7 +235,7 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* _update_dependency(); return Status::OK(); } - return Status::InternalError("Abnormal Ending"); + return Status::InternalError("Get Arrow Batch Abnormal Ending"); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { _is_close = true; _status = exec_status; + _arrow_data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { @@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { void BufferControlBlock::cancel() { std::unique_lock l(_lock); _is_cancelled = true; + _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 12cbc72ff520718..5a13965a193ec78 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -124,6 +124,8 @@ class BufferControlBlock { // protects all subsequent data in this block std::mutex _lock; + std::condition_variable _arrow_data_arrival; + std::deque _waiting_rpc; // only used for FE using return rows to check limit diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8bf04ead03551ad..c62cd33339f818d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -831,7 +831,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController brpc::ClosureGuard closure_guard(done); std::shared_ptr schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( - UniqueId(request->finst_id()).to_thrift()); + UniqueId(request->query_id()).to_thrift()); if (schema == nullptr) { LOG(INFO) << "not found arrow flight schema, maybe query has been canceled"; auto st = Status::NotFound( diff --git a/fe/fe-core/kafka_datasource_properties b/fe/fe-core/kafka_datasource_properties new file mode 100644 index 0000000000000000000000000000000000000000..4b2ef1960be6370518be53e33cfd53b47c5eeaa9 GIT binary patch literal 331 zcmZXQy$(Vl425;|DM)8yz(r*?I%uMaZpHx-47oqO0q&s2cej9vCYFVs^0lW72E+Ap z#2DyWJsuz#Y?Q6kQbl#Z-fOe>*cl(42{2CYs#M-9Qli6lXKgSs5O<;?W2sdNu+E1_ zGEY!eg3;JsNfMkQtFKc0gTy~H{X&GEvlZQkh)wqU<5GT1z|CXdO3`kd_%;}>C(AeN3S resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); try { InternalService.PFetchArrowFlightSchemaRequest request = InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(finstId) + .setQueryId(queryId) .build(); Future future @@ -116,12 +116,12 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { InternalService.PFetchArrowFlightSchemaResult pResult; pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); } Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", DebugUtil.printId(tid), resultStatus.toString())); } if (pResult.hasBeArrowFlightIp()) { @@ -138,7 +138,7 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { List fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != resultOutputExprs.size()) { throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + "Schema size %s' is not equal to arrow field size %s, queryId: %s.", fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); } return root.getSchema(); @@ -146,24 +146,24 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { throw new RuntimeException("Read Arrow Flight Schema failed.", e); } } else { - throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } } catch (RpcException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (InterruptedException e) { throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (ExecutionException e) { throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, finstId: %s,backend: %s", + "arrow flight schema future get execution exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (TimeoutException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, finstId: %s,backend: %s", + "arrow flight schema fetch timeout, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } } diff --git a/fe/fe-core/test_datasource_properties b/fe/fe-core/test_datasource_properties new file mode 100644 index 0000000000000000000000000000000000000000..92da0ccdba52fedd21ff645a0120562ee454682b GIT binary patch literal 6 NcmZQzU|_1Q1ponm0Q&#{ literal 0 HcmV?d00001 diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9d8a72e01cc9d15..0625071f5db8071 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -274,7 +274,7 @@ message PFetchDataResult { }; message PFetchArrowFlightSchemaRequest { - optional PUniqueId finst_id = 1; + optional PUniqueId query_id = 1; }; message PFetchArrowFlightSchemaResult {