Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 23, 2024
1 parent 193be20 commit 804b1ea
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 7 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);

Expand Down
22 changes: 16 additions & 6 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}

int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
if (!thrift_sink.result_sink.__isset.type ||
thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
}

// TODO: figure out good buffer size based on size of output row
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
thrift_sink.result_sink,
vectorized::RESULT_SINK_BUFFER_SIZE));
sink->reset(new doris::vectorized::VResultSink(
row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
Expand Down Expand Up @@ -233,10 +238,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}

int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
if (!thrift_sink.result_sink.__isset.type ||
thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
}

// TODO: figure out good buffer size based on size of output row
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
thrift_sink.result_sink,
vectorized::RESULT_SINK_BUFFER_SIZE));
sink->reset(new doris::vectorized::VResultSink(
row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>

#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
Expand Down Expand Up @@ -64,8 +65,9 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);

// create sender
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true,
state->execution_timeout()));
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
return Status::OK();
Expand Down Expand Up @@ -118,6 +120,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r
} else {
_sink_type = sink.type;
}
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
_result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
} else {
_result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
}
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>

Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block);
TResultSinkType::type _sink_type;
int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<vectorized::ResultFileOptions> _file_opts = nullptr;

Expand Down

0 comments on commit 804b1ea

Please sign in to comment.