From 804b1ea2c8e4e024748689bb04fb39fdbb1d2f56 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 22 Jul 2024 21:12:30 +0800 Subject: [PATCH] 1 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 +++ be/src/exec/data_sink.cpp | 22 ++++++++++++++----- be/src/pipeline/exec/result_sink_operator.cpp | 9 +++++++- be/src/pipeline/exec/result_sink_operator.h | 1 + 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 863d69338bc1c2..76ce00097b08c1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -617,6 +617,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9050701261c6a5..447473e4fddf08 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -675,6 +675,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent); // result buffer cancelled time (unit: second) DECLARE_mInt32(result_buffer_cancelled_interval_time); +// arrow flight result sink buffer rows size, default 4096 * 8 +DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); + // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 054021439c5d24..dc65108029864e 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -78,10 +78,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } + int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + if (!thrift_sink.result_sink.__isset.type || + thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } + // TODO: figure out good buffer size based on size of output row - sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, - vectorized::RESULT_SINK_BUFFER_SIZE)); + sink->reset(new doris::vectorized::VResultSink( + row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows)); break; } case TDataSinkType::RESULT_FILE_SINK: { @@ -233,10 +238,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } + int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + if (!thrift_sink.result_sink.__isset.type || + thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } + // TODO: figure out good buffer size based on size of output row - sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs, - thrift_sink.result_sink, - vectorized::RESULT_SINK_BUFFER_SIZE)); + sink->reset(new doris::vectorized::VResultSink( + row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows)); break; } case TDataSinkType::RESULT_FILE_SINK: { diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 624b9ca192d699..1aa7f37c1fef4e 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -19,6 +19,7 @@ #include +#include "common/config.h" #include "common/object_pool.h" #include "exec/rowid_fetcher.h" #include "pipeline/exec/operator.h" @@ -64,8 +65,9 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); // create sender + auto& p = _parent->cast(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, + state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state->execution_timeout())); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); return Status::OK(); @@ -118,6 +120,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r } else { _sink_type = sink.type; } + if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + _result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; + } else { + _result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE; + } _fetch_option = sink.fetch_option; _name = "ResultSink"; } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index aed9961a6d6771..a3f8b8f9882dd8 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -82,6 +82,7 @@ class ResultSinkOperatorX final : public DataSinkOperatorX Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block); TResultSinkType::type _sink_type; + int _result_sink_buffer_size_rows; // set file options when sink type is FILE std::unique_ptr _file_opts = nullptr;