diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 790054f171fa91c..587f20e69e24d17 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -626,8 +626,6 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); -DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); - // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 585c4dc45ccef9c..4287507309b9063 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -676,9 +676,6 @@ DECLARE_Int32(load_process_safe_mem_permit_percent); // result buffer cancelled time (unit: second) DECLARE_mInt32(result_buffer_cancelled_interval_time); -// arrow flight result sink buffer rows size, default 4096 * 8 -DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); - // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index f04ace2e292595c..9fc682e6d1193e9 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -112,16 +112,12 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r } else { _sink_type = sink.type; } - if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { - _result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows; - } else { - _result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE; - } _fetch_option = sink.fetch_option; _name = "ResultSink"; } Status ResultSinkOperatorX::open(RuntimeState* state) { + _result_sink_buffer_size_rows = state->result_batch_rows(); RETURN_IF_ERROR(DataSinkOperatorX::open(state)); // prepare output_expr // From the thrift expressions create the real exprs. diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 3c503096ecb51e8..9a8dd29f71fd422 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -115,8 +115,6 @@ struct ResultFileOptions { } }; -constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8; - class ResultSinkLocalState final : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(ResultSinkLocalState); using Base = PipelineXSinkLocalState; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 34ce79ec7a749a9..4669efcdac66c4a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -119,6 +119,9 @@ class RuntimeState { const DescriptorTbl& desc_tbl() const { return *_desc_tbl; } void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; } int batch_size() const { return _query_options.batch_size; } + int result_batch_rows() const { + return _query_options.__isset.result_batch_rows ? _query_options.result_batch_rows : 32768; + } int wait_full_block_schedule_times() const { return _query_options.wait_full_block_schedule_times; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index cc1f29b76c2b491..0b4c3482fcc213e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, Writable { // mem limit can't smaller than bufferpool's default page size public static final int MIN_EXEC_MEM_LIMIT = 2097152; public static final String BATCH_SIZE = "batch_size"; + public static final String RESULT_BATCH_ROWS = "result_batch_rows"; public static final String BROKER_LOAD_BATCH_SIZE = "broker_load_batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = "enable_distinct_streaming_aggregation"; @@ -907,6 +908,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = BATCH_SIZE, fuzzy = true, checker = "checkBatchSize") public int batchSize = 4064; + @VariableMgr.VarAttr(name = RESULT_BATCH_ROWS, checker = "checkBatchSize") + public int resultBatchRows = 32768; + // 16352 + 16 + 16 = 16384 @VariableMgr.VarAttr(name = BROKER_LOAD_BATCH_SIZE, fuzzy = true, checker = "checkBatchSize") public int brokerLoadBatchSize = 16352; @@ -3711,6 +3715,7 @@ public TQueryOptions toThrift() { tResult.setEnableHashJoinEarlyStartProbe(enableHashJoinEarlyStartProbe); tResult.setBatchSize(batchSize); + tResult.setResultBatchRows(resultBatchRows); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation); tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 62a45260f80c9cf..464c2bd7f203df4 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -351,6 +351,8 @@ struct TQueryOptions { 136: optional bool enable_phrase_query_sequential_opt = true; + 137: optional i32 result_batch_rows = 32768; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.