Skip to content

Commit

Permalink
[fix](result sink) fix error 'Message size exceeds 2GB' with big resu…
Browse files Browse the repository at this point in the history
…lt row
  • Loading branch information
jacktengg committed Oct 24, 2024
1 parent 9f9d4c2 commit acd7f28
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 12 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,6 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,6 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);

Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,12 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r
} else {
_sink_type = sink.type;
}
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
_result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
} else {
_result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
}
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}

Status ResultSinkOperatorX::open(RuntimeState* state) {
_result_sink_buffer_size_rows = state->result_batch_rows();
RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::open(state));
// prepare output_expr
// From the thrift expressions create the real exprs.
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ struct ResultFileOptions {
}
};

constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;

class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
using Base = PipelineXSinkLocalState<BasicSharedState>;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class RuntimeState {
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int batch_size() const { return _query_options.batch_size; }
int result_batch_rows() const {
return _query_options.__isset.result_batch_rows ? _query_options.result_batch_rows : 32768;
}
int wait_full_block_schedule_times() const {
return _query_options.wait_full_block_schedule_times;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, Writable {
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
public static final String BATCH_SIZE = "batch_size";
public static final String RESULT_BATCH_ROWS = "result_batch_rows";
public static final String BROKER_LOAD_BATCH_SIZE = "broker_load_batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = "enable_distinct_streaming_aggregation";
Expand Down Expand Up @@ -907,6 +908,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = BATCH_SIZE, fuzzy = true, checker = "checkBatchSize")
public int batchSize = 4064;

@VariableMgr.VarAttr(name = RESULT_BATCH_ROWS, checker = "checkBatchSize")
public int resultBatchRows = 32768;

// 16352 + 16 + 16 = 16384
@VariableMgr.VarAttr(name = BROKER_LOAD_BATCH_SIZE, fuzzy = true, checker = "checkBatchSize")
public int brokerLoadBatchSize = 16352;
Expand Down Expand Up @@ -3711,6 +3715,7 @@ public TQueryOptions toThrift() {
tResult.setEnableHashJoinEarlyStartProbe(enableHashJoinEarlyStartProbe);

tResult.setBatchSize(batchSize);
tResult.setResultBatchRows(resultBatchRows);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ struct TQueryOptions {

136: optional bool enable_phrase_query_sequential_opt = true;

137: optional i32 result_batch_rows = 32768;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down

0 comments on commit acd7f28

Please sign in to comment.