Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jul 22, 2024
1 parent 1d25dff commit f5cc58e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

DEFINE_mInt32(arrow_flight_result_sink_buffer_size_bytes, "32768");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ DECLARE_Int32(load_process_safe_mem_permit_percent);
// result buffer cancelled time (unit: second)
DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);

Expand Down
15 changes: 11 additions & 4 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <utility>

#include "common/config.h"
#include "common/object_pool.h"
#include "exec/rowid_fetcher.h"
#include "pipeline/exec/operator.h"
Expand Down Expand Up @@ -48,9 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
}
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
return Status::OK();
Expand Down Expand Up @@ -107,6 +109,11 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r
} else {
_sink_type = sink.type;
}
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
_result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
} else {
_result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
}
_fetch_option = sink.fetch_option;
_name = "ResultSink";
}
Expand All @@ -126,8 +133,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(),
state->batch_size()));
state->query_id(), _result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
}
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>

Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block);
TResultSinkType::type _sink_type;
int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts = nullptr;

Expand Down

0 comments on commit f5cc58e

Please sign in to comment.