Skip to content

Commit

Permalink
Merge branch 'master' into Add-Inverted-Checker-Case
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinchou authored Nov 13, 2024
2 parents 4116c4d + 900bf91 commit 2853556
Show file tree
Hide file tree
Showing 69 changed files with 2,377 additions and 376 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
pip install PyGithub
- name: Check SHA
run: |
expected_sha="4e4c0d7689b765c7f0677d75d23222555afa9286af46cf77ced66fa247a298d9f8a8c86830d0ce55f70e5f09532b54fbafee040c0343833077cbc7e214d486d2"
expected_sha="4761d95a336b92e492276d589e580678af8d490d73fa0bd7d53f826aa3bf86b54e2b8725b436bc010aaf14a001e286bcd2b55b3ec0d2668d1e962d8c8b397eab"
calculated_sha=$(sha512sum tools/auto-pick-script.py | awk '{ print $1 }')
if [ "$calculated_sha" != "$expected_sha" ]; then
echo "SHA mismatch! Expected: $expected_sha, but got: $calculated_sha"
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ DEFINE_Int32(brpc_port, "8060");
DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -520,6 +521,8 @@ DEFINE_Int32(brpc_light_work_pool_threads, "-1");
DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
DEFINE_mBool(enable_bthread_transmit_block, "true");
DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1");
DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1");

//Enable brpc builtin services, see:
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
Expand Down Expand Up @@ -628,7 +631,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");

// arrow flight result sink buffer rows size, default 4096 * 8
DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
// The timeout for ADBC Client to wait for data using arrow flight reader.
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000");

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port);
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);
DECLARE_Int32(public_access_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down Expand Up @@ -571,6 +572,8 @@ DECLARE_Int32(brpc_light_work_pool_threads);
DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
DECLARE_Int32(brpc_light_work_pool_max_queue_size);
DECLARE_mBool(enable_bthread_transmit_block);
DECLARE_Int32(brpc_arrow_flight_work_pool_threads);
DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size);

// The maximum amount of data that can be processed by a stream load
DECLARE_mInt64(streaming_load_max_mb);
Expand Down Expand Up @@ -677,6 +680,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time);

// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
// The timeout for ADBC Client to wait for data using arrow flight reader.
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms);

// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/bitmap_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BitmapFilterColumnPredicate : public ColumnPredicate {
const std::shared_ptr<BitmapFilterFuncBase>& filter, int)
: ColumnPredicate(column_id),
_filter(filter),
_specific_filter(static_cast<SpecificFilter*>(_filter.get())) {}
_specific_filter(assert_cast<SpecificFilter*>(_filter.get())) {}
~BitmapFilterColumnPredicate() override = default;

PredicateType type() const override { return PredicateType::BITMAP_FILTER; }
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum OLAPDataVersion {
// Different types of folder names under storage_root_path
static const std::string MINI_PREFIX = "mini_download";
static const std::string CLUSTER_ID_PREFIX = "cluster_id";
static const std::string DEPLOY_MODE_PREFIX = "deploy_mode";
static const std::string DATA_PREFIX = "data";
static const std::string DPP_PREFIX = "dpp_download";
static const std::string SNAPSHOT_PREFIX = "snapshot";
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
{
SCOPED_TIMER(local_state._get_arrow_schema_timer);
// After expr executed, use recaculated schema as final schema
RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema, state->timezone()));
RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone()));
}
{
SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer);
Expand Down
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
if (state->query_options().enable_parallel_outfile) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), _buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(), _buf_size,
&_sender, state));
}
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Expand All @@ -92,8 +91,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
_sender = _parent->cast<ResultFileSinkOperatorX>()._sender;
} else {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(),
state->batch_size()));
state->fragment_instance_id(), p._buf_size, &_sender, state));
}
_sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this());

Expand Down
19 changes: 6 additions & 13 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
} else {
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state));
}
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
return Status::OK();
Expand Down Expand Up @@ -81,16 +80,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
if (state->query_options().enable_parallel_result_sink) {
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
} else {
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
}
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
_sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
_sender.get(), _output_vexpr_ctxs, _profile));
break;
}
default:
Expand Down Expand Up @@ -135,8 +129,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {

if (state->query_options().enable_parallel_result_sink) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->query_id(), _result_sink_buffer_size_rows, &_sender,
state->execution_timeout(), state->batch_size()));
state->query_id(), _result_sink_buffer_size_rows, &_sender, state));
}
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Expand Down
Loading

0 comments on commit 2853556

Please sign in to comment.