Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 13, 2024
1 parent 05aabc4 commit e6c5ed2
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 75 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

// arrow flight result sink buffer rows size, default 4096 * 8
DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
// The timeout for ADBC Client to wait for data using arrow flight reader.
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
// The timeout for ADBC Client to wait for data using arrow flight reader.
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
Expand Down
64 changes: 52 additions & 12 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
void GetArrowResultBatchCtx::on_data(
const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version,
segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone,
std::string arrow_schema_field_names, RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* uncompressed_bytes_counter,
RuntimeProfile::Counter* compressed_bytes_counter) {
Status st = Status::OK();
Expand All @@ -128,7 +128,6 @@ void GetArrowResultBatchCtx::on_data(
result->set_eos(false);
if (packet_seq == 0) {
result->set_timezone(timezone);
result->set_fields_labels(arrow_schema_field_names);
}
} else {
result->clear_block();
Expand Down Expand Up @@ -237,9 +236,8 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
auto* ctx = _waiting_arrow_result_batch_rpc.front();
_waiting_arrow_result_batch_rpc.pop_front();
ctx->on_data(result, _packet_num, _be_exec_version,
_fragement_transmission_compression_type, _timezone, _arrow_schema_field_names,
_serialize_batch_ns_timer, _uncompressed_bytes_counter,
_compressed_bytes_counter);
_fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer,
_uncompressed_bytes_counter, _compressed_bytes_counter);
_packet_num++;
}

Expand Down Expand Up @@ -287,15 +285,15 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* r
return _status;
}
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
}

while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) {
_arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
}

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
}

if (!_arrow_flight_result_batch_queue.empty()) {
Expand All @@ -322,7 +320,8 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* r
_mem_tracker->peak_consumption(), ss.str());
return Status::OK();
}
return Status::InternalError("Get Arrow Batch Abnormal Ending");
return Status::InternalError(
fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id)));
}

void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
Expand All @@ -334,7 +333,7 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
return;
}
if (_is_cancelled) {
ctx->on_failure(Status::Cancelled("Cancelled"));
ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))));
return;
}

Expand All @@ -347,8 +346,8 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
_instance_rows_in_queue.pop_front();

ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type,
_timezone, _arrow_schema_field_names, _serialize_batch_ns_timer,
_uncompressed_bytes_counter, _compressed_bytes_counter);
_timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter,
_compressed_bytes_counter);
_packet_num++;
return;
}
Expand All @@ -370,8 +369,30 @@ void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
}

void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) {
std::lock_guard<std::mutex> l(_lock);
_arrow_schema = arrow_schema;
_arrow_schema_field_names = join(_arrow_schema->field_names(), ",");
}

Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) {
std::unique_lock<std::mutex> l(_lock);
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
}

// normal path end
if (_arrow_schema != nullptr) {
*arrow_schema = _arrow_schema;
return Status::OK();
}

if (_is_close) {
return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id)));
}
return Status::InternalError(
fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id)));
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
Expand Down Expand Up @@ -405,18 +426,37 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
}
_waiting_rpc.clear();
}

if (!_waiting_arrow_result_batch_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_close(_packet_num);
}
} else {
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_failure(_status);
}
}
_waiting_arrow_result_batch_rpc.clear();
}
return Status::OK();
}

void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_is_cancelled = true;
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(reason);
}
_waiting_rpc.clear();
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
_waiting_arrow_result_batch_rpc.clear();
_update_dependency();
_arrow_flight_result_batch_queue.clear();
}

void BufferControlBlock::set_dependency(
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ struct GetArrowResultBatchCtx {
void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq,
int be_exec_version,
segment_v2::CompressionTypePB fragement_transmission_compression_type,
std::string timezone, std::string arrow_schema_field_names,
RuntimeProfile::Counter* serialize_batch_ns_timer,
std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer,
RuntimeProfile::Counter* uncompressed_bytes_counter,
RuntimeProfile::Counter* compressed_bytes_counter);
};
Expand All @@ -114,7 +113,7 @@ class BufferControlBlock {
void get_arrow_batch(GetArrowResultBatchCtx* ctx);

void register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema);
std::shared_ptr<arrow::Schema> find_arrow_schema() { return _arrow_schema; }
Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema);

// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Expand Down Expand Up @@ -156,7 +155,6 @@ class BufferControlBlock {
ArrowFlightResultQueue _arrow_flight_result_batch_queue;
// for arrow flight
std::shared_ptr<arrow::Schema> _arrow_schema;
std::string _arrow_schema_field_names;

// protects all subsequent data in this block
std::mutex _lock;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
"no arrow schema for this query, maybe query has been canceled, finst_id={}",
print_id(finst_id));
}
*schema = cb->find_arrow_schema();
return Status::OK();
return cb->find_arrow_schema(schema);
}

void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) {
Expand Down
Loading

0 comments on commit e6c5ed2

Please sign in to comment.