Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jun 25, 2024
1 parent d01d5c7 commit b099e62
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 21 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
break;
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,14 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,

int num_rows = result->num_rows();

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

// TODO: merge RocordBatch, ToStructArray -> Make again

_arrow_flight_batch_queue.push_back(std::move(result));
_buffer_rows += num_rows;
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
_arrow_data_arrival.notify_one();
_update_dependency();
return Status::OK();
}
Expand Down Expand Up @@ -212,6 +209,10 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
return Status::Cancelled("Cancelled");
}

while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
_arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
}

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
Expand All @@ -234,7 +235,7 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
_update_dependency();
return Status::OK();
}
return Status::InternalError("Abnormal Ending");
return Status::InternalError("Get Arrow Batch Abnormal Ending");
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
Expand All @@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {

_is_close = true;
_status = exec_status;
_arrow_data_arrival.notify_all();

if (!_waiting_rpc.empty()) {
if (_status.ok()) {
Expand All @@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
void BufferControlBlock::cancel() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class BufferControlBlock {
// protects all subsequent data in this block
std::mutex _lock;

std::condition_variable _arrow_data_arrival;

std::deque<GetResultBatchCtx*> _waiting_rpc;

// only used for FE using return rows to check limit
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<arrow::Schema> schema =
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift());
UniqueId(request->query_id()).to_thrift());
if (schema == nullptr) {
LOG(INFO) << "not found arrow flight schema, maybe query has been canceled";
auto st = Status::NotFound(
Expand Down
Binary file added fe/fe-core/kafka_datasource_properties
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
} else {
// Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
DebugUtil.printId(connectContext.queryId()) + ":" + query);
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,26 @@ public void handleQuery(String query) throws ConnectionException {

public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = ctx.getResultInternalServiceAddr();
TUniqueId tid = ctx.getFinstId();
TUniqueId tid = ctx.queryId();
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
.setFinstId(finstId)
.setQueryId(queryId)
.build();

Future<InternalService.PFetchArrowFlightSchemaResult> future
= BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s",
throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s",
DebugUtil.printId(tid)));
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus.toString()));
}
if (pResult.hasBeArrowFlightIp()) {
Expand All @@ -138,32 +138,32 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(String.format(
"Schema size %s' is not equal to arrow field size %s, finstId: %s.",
"Schema size %s' is not equal to arrow field size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
}
return root.getSchema();
} catch (Exception e) {
throw new RuntimeException("Read Arrow Flight Schema failed.", e);
}
} else {
throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s",
throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s",
DebugUtil.printId(tid)));
}
} catch (RpcException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s",
"arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get interrupted exception, finstId: %s,backend: %s",
"arrow flight schema future get interrupted exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get execution exception, finstId: %s,backend: %s",
"arrow flight schema future get execution exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch timeout, finstId: %s,backend: %s",
"arrow flight schema fetch timeout, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
}
}
Expand Down
Binary file added fe/fe-core/test_datasource_properties
Binary file not shown.
2 changes: 1 addition & 1 deletion gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ message PFetchDataResult {
};

message PFetchArrowFlightSchemaRequest {
optional PUniqueId finst_id = 1;
optional PUniqueId query_id = 1;
};

message PFetchArrowFlightSchemaResult {
Expand Down

0 comments on commit b099e62

Please sign in to comment.