Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 7, 2024
1 parent 99d0748 commit 4d8825c
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 45 deletions.
7 changes: 1 addition & 6 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
if (state->query_options().enable_parallel_result_sink) {
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
} else {
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
}
_sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
break;
Expand Down
34 changes: 34 additions & 0 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,40 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
return Status::InternalError("Get Arrow Batch Abnormal Ending");
}

void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) {
std::lock_guard<std::mutex> l(_lock);
_arrow_schema = arrow_schema;
}

Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) {
std::unique_lock<std::mutex> l(_lock);
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

while (_arrow_schema == nullptr && !_is_cancelled && !_is_close) {
_arrow_schema_arrival.wait_for(l, std::chrono::milliseconds(20));
}

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

// normal path end
if (_arrow_schema != nullptr) {
*arrow_schema = _arrow_schema;
return Status::OK();
}

if (_is_close) {
return Status::RuntimeError("Closed");
}
return Status::InternalError("Get Arrow Schema Abnormal Ending");
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
// close will be called multiple times and error status needs to be collected.
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <arrow/type.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <stdint.h>
Expand Down Expand Up @@ -82,6 +83,9 @@ class BufferControlBlock {
void get_batch(GetResultBatchCtx* ctx);
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);

void register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema);
Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema);

// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Status close(const TUniqueId& id, Status exec_status);
Expand Down Expand Up @@ -119,13 +123,16 @@ class BufferControlBlock {
// blocking queue for batch
FeResultQueue _fe_result_batch_queue;
ArrowFlightResultQueue _arrow_flight_batch_queue;
// for arrow flight
std::shared_ptr<arrow::Schema> _arrow_schema;

// protects all subsequent data in this block
std::mutex _lock;

// get arrow flight result is a sync method, need wait for data ready and return result.
// TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
std::condition_variable _arrow_data_arrival;
std::condition_variable _arrow_schema_arrival;

std::deque<GetResultBatchCtx*> _waiting_rpc;

Expand Down
31 changes: 8 additions & 23 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,15 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU
return {};
}

void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
const std::shared_ptr<arrow::Schema>& arrow_schema) {
std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
_arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
}

std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) {
std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock);
auto iter = _arrow_schema_map.find(query_id);

if (_arrow_schema_map.end() != iter) {
return iter->second;
Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
std::shared_ptr<arrow::Schema>* schema) {
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
if (cb == nullptr) {
return Status::InternalError(
"no arrow schema for this query, maybe query has been canceled, finst_id={}",
print_id(finst_id));
}

return nullptr;
return cb->find_arrow_schema(schema);
}

void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) {
Expand Down Expand Up @@ -155,15 +149,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
_buffer_map.erase(iter);
}
}

{
std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
auto arrow_schema_iter = _arrow_schema_map.find(query_id);

if (_arrow_schema_map.end() != arrow_schema_iter) {
_arrow_schema_map.erase(arrow_schema_iter);
}
}
}

void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) {
Expand Down
8 changes: 1 addition & 7 deletions be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class ResultBufferMgr {
// fetch data result to Arrow Flight Server
Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<arrow::RecordBatch>* result);

void register_arrow_schema(const TUniqueId& query_id,
const std::shared_ptr<arrow::Schema>& arrow_schema);
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);
Status find_arrow_schema(const TUniqueId& query_id, std::shared_ptr<arrow::Schema>* schema);

// cancel
void cancel(const TUniqueId& query_id, const Status& reason);
Expand All @@ -91,10 +89,6 @@ class ResultBufferMgr {
std::shared_mutex _buffer_map_lock;
// buffer block map
BufferMap _buffer_map;
// lock for arrow schema map
std::shared_mutex _arrow_schema_map_lock;
// for arrow flight
ArrowSchemaMap _arrow_schema_map;

// lock for timeout map
std::mutex _timeout_lock;
Expand Down
4 changes: 3 additions & 1 deletion be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
const std::shared_ptr<QueryStatement>& statement_) {
// Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket
// to the ADBC client, so that the schema and control block can be found.
auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
std::shared_ptr<arrow::Schema> schema;
RETURN_ARROW_STATUS_IF_ERROR(
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id, &schema));
if (schema == nullptr) {
ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
"Client not found arrow flight schema, maybe query has been canceled, queryid: {}",
Expand Down
13 changes: 5 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,19 +857,16 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<arrow::Schema> schema =
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift());
if (schema == nullptr) {
LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled";
auto st = Status::NotFound(
"FE not found arrow flight schema, maybe query has been canceled");
std::shared_ptr<arrow::Schema> schema;
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift(), &schema);
if (!st.ok()) {
st.to_protobuf(result->mutable_status());
return;
}

std::string schema_str;
auto st = serialize_arrow_schema(&schema, &schema_str);
st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
if (config::public_access_ip != "") {
Expand Down

0 comments on commit 4d8825c

Please sign in to comment.