From 97e009fcced2121ee88537418f30e0e0f1e2cdc1 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 13 Nov 2024 18:45:33 +0800 Subject: [PATCH] [fix](arrow-flight-sql) Arrow flight server supports data forwarding when BE uses public vip (#43281) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If there is a Doris cluster, its FE node can be accessed by the external network, and all its BE nodes can only be accessed by the internal network. This is fine when using Mysql client and JDBC to connect to Doris to execute queries, and the query results will be returned to the client by the Doris FE node. However, using Arrow Flight SQL to connect to Doris cannot execute queries, because the ADBC ​​client needs to connect to the Doris BE node to pull query results, but the Doris BE node is not allowed to be accessed by the external network. In a production environment, it is often inconvenient to expose Doris BE nodes to the external network. However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, and the external client will be randomly routed to a Doris BE node when connecting to nginx. The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. If it is different from the Doris BE node randomly routed by nginx, data forwarding needs to be done inside the Doris BE node. 1. The Ticket returned by Doris FE Arrow Flight Server to ADBC ​​client contains the IP and Brpc Port of the Doris BE node where the query result is located. 2. Doris BE Arrow Flight Server receives a request to pull data. If the IP:BrpcPort in the Ticket is not itself, it pulls the query result Block from the Doris BE node specified by IP:BrpcPort, converts it to Arrow Batch and returns it to ADBC ​​Client; if the IP:BrpcPort in the Ticket is itself, it is the same as before. 1. If the data is not in the current BE node, you can pull the data from other BE nodes asynchronously and cache at least one Block locally in the current BE node, which will reduce the time consumption of serialization, deserialization, and RPC. 1. Create a Doris cluster with 1 FE and 2 BE, and modify `arrow_flight_sql_port` in `fe.conf` and `be.conf`. 2. Root executes `systemctl status nginx` to check whether Nginx is installed. If not, yum install is recommended. 3. `vim /etc/nginx/nginx.conf` adds `underscores_in_headers on;` 4. `touch /etc/nginx/conf.d/arrowflight.conf` creates a file, and `vim /etc/nginx/conf.d/arrowflight.conf` adds: ``` upstream arrowflight { server {BE1_ip}:{BE1_arrow_flight_sql_port}; server {BE2_IP}:{BE2_arrow_flight_sql_port}; } server { listen {nginx port} http2; listen [::]:{nginx port} http2; server_name doris.arrowflight.com; #ssl_certificate /etc/nginx/cert/myCA.pem; #ssl_certificate_key /etc/nginx/cert/myCA.key; location / { grpc_pass grpc://arrowflight; grpc_set_header X-Real-IP $remote_addr; grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; grpc_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 60s; proxy_send_timeout 60s; #proxy_http_version 1.1; #proxy_set_header Connection ""; } } ``` Where {BE1_ip}:{BE1_arrow_flight_sql_port} is the IP of BE 1 and arrow_flight_sql_port in be.conf, and similarly {BE2_IP}:{BE2_arrow_flight_sql_port}. `{nginx port}` is any available port. 6. Add in be.conf of all BEs: ``` public_access_ip={nginx ip} public_access_port={nginx port} ``` --- 如果存在一个 Doris 集群,它的 FE 节点可以被外部网络访问,它的所有 BE 节点只可以被内网访问。 这在使用 Mysql client 和 JDBC 连接 Doris 执行查询是没问题的,查询结果将由 Doris FE 节点返回给 client。 但使用 Arrow Flight SQL 连接 Doris 无法执行查询,因为 ADBC client 需要连接 Doris BE 节点拉取查询结果,但 Doris BE 节点不允许被外网访问。 生产环境中,很多时候不方便在外网暴露 Doris BE 节点。但可以为所有 Doris BE 节点增加了一层反向代理(比如 nginx),外网的 client 连接 nginx 时会随机路由到一台 Doris BE 节点上。 Arrow Flight SQL 查询结果会随机保存在一台 Doris BE 节点上,如果和 nginx 随机路由的 Doris BE 节点不同,需要在 Doris BE 节点内部做一次数据转发。 1. Doris FE Arrow Flight Server 向 ADBC client 返回的 Ticket 中包含查询结果所在 Doris BE节点的 IP 和 Brpc Port。 2. Doris BE Arrow Flight Server 收到拉取数据请求。如果 Ticket 中的 IP:BrpcPort 不是自己,则从 IP:BrpcPort 指定的 Doris BE 节点拉取查询结果Block,转为 Arrow Batch 后返回 ADBC Client;如果 Ticket 中的 IP:BrpcPort 是自己,则和过去一样。 1. 若数据不在当前 BE 节点,可以异步的从其他 BE 节点拉取数据,并在当前 BE 节点本地缓存至少一个 Block,这将减少序列化、反序列化、RPC 的耗时。 1. 创建一个 1 FE 和 2 BE 的 Doris 集群,修改 `fe.conf` 和 `be.conf` 中的 `arrow_flight_sql_port`。 2. Root 执行 `systemctl status nginx` 查看是否安装 Nginx,若没有则推荐 yum install。 3. `vim /etc/nginx/nginx.conf` 增加 `underscores_in_headers on;` 4. `touch /etc/nginx/conf.d/arrowflight.conf` 创建文件,`vim /etc/nginx/conf.d/arrowflight.conf` 增加: ``` upstream arrowflight { server {BE1_ip}:{BE1_arrow_flight_sql_port}; server {BE2_IP}:{BE2_arrow_flight_sql_port}; } server { listen {nginx port} http2; listen [::]:{nginx port} http2; server_name doris.arrowflight.com; #ssl_certificate /etc/nginx/cert/myCA.pem; #ssl_certificate_key /etc/nginx/cert/myCA.key; location / { grpc_pass grpc://arrowflight; grpc_set_header X-Real-IP $remote_addr; grpc_set_header X-Forwarded-For $proxy_add_x_forwarded_for; grpc_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 60s; proxy_send_timeout 60s; #proxy_http_version 1.1; #proxy_set_header Connection ""; } } ``` 其中 {BE1_ip}:{BE1_arrow_flight_sql_port} 是 BE 1 的 IP 和 be.conf 中的 arrow_flight_sql_port,同理 {BE2_IP}:{BE2_arrow_flight_sql_port}。`{nginx port}` 是一个任意可用端口。 6. 在所有 BE 的 be.conf 中增加 ``` public_access_ip={nginx ip} public_access_port={nginx port} ``` --- be/src/common/config.cpp | 7 + be/src/common/config.h | 6 + .../exec/memory_scratch_sink_operator.cpp | 2 +- .../exec/result_file_sink_operator.cpp | 3 +- be/src/pipeline/exec/result_sink_operator.cpp | 19 +- be/src/runtime/buffer_control_block.cpp | 229 ++++++++++++-- be/src/runtime/buffer_control_block.h | 58 +++- be/src/runtime/result_buffer_mgr.cpp | 76 +++-- be/src/runtime/result_buffer_mgr.h | 30 +- .../arrow_flight_batch_reader.cpp | 291 ++++++++++++++++-- .../arrow_flight/arrow_flight_batch_reader.h | 69 ++++- .../arrow_flight/flight_sql_service.cpp | 57 ++-- be/src/service/internal_service.cpp | 60 +++- be/src/service/internal_service.h | 5 + be/src/util/arrow/row_batch.cpp | 19 +- be/src/util/arrow/row_batch.h | 11 +- be/src/util/arrow/utils.cpp | 3 +- be/src/util/doris_metrics.h | 5 + .../vec/sink/varrow_flight_result_writer.cpp | 64 ++-- be/src/vec/sink/varrow_flight_result_writer.h | 16 +- be/test/runtime/result_buffer_mgr_test.cpp | 13 +- .../arrowflight/DorisFlightSqlProducer.java | 35 ++- .../FlightSqlConnectProcessor.java | 14 +- gensrc/proto/internal_service.proto | 16 + 24 files changed, 858 insertions(+), 250 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 02ebefbc0f32d0..72253238909dc6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -65,6 +65,7 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); DEFINE_mString(public_access_ip, ""); +DEFINE_Int32(public_access_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -535,6 +536,8 @@ DEFINE_Int32(brpc_light_work_pool_threads, "-1"); DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1"); DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); DEFINE_mBool(enable_bthread_transmit_block, "true"); +DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1"); +DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1"); //Enable brpc builtin services, see: //https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely @@ -643,7 +646,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); +// arrow flight result sink buffer rows size, default 4096 * 8 DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768"); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000"); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 56d5e8e648a86c..0944669c747f08 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port); // For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. // If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. DECLARE_mString(public_access_ip); +DECLARE_Int32(public_access_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores @@ -584,6 +585,8 @@ DECLARE_Int32(brpc_light_work_pool_threads); DECLARE_Int32(brpc_heavy_work_pool_max_queue_size); DECLARE_Int32(brpc_light_work_pool_max_queue_size); DECLARE_mBool(enable_bthread_transmit_block); +DECLARE_Int32(brpc_arrow_flight_work_pool_threads); +DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size); // The maximum amount of data that can be processed by a stream load DECLARE_mInt64(streaming_load_max_mb); @@ -693,6 +696,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time); // arrow flight result sink buffer rows size, default 4096 * 8 DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows); +// The timeout for ADBC Client to wait for data using arrow flight reader. +// If the query is very complex and no result is generated after this time, consider increasing this timeout. +DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms); // the increased frequency of priority for remaining tasks in BlockingPriorityQueue DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency); diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 262a5ef538e90b..874b6fd1ab1aa0 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* { SCOPED_TIMER(local_state._get_arrow_schema_timer); // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema, state->timezone())); + RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone())); } { SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index bc4e4c88d14ca7..fb3b293a122d41 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -92,8 +92,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i _sender = _parent->cast()._sender; } else { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), - state->batch_size())); + state->fragment_instance_id(), p._buf_size, &_sender, state)); } _sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this()); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 15612168affd89..a3f1133f00e78e 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -51,8 +51,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) } else { auto& p = _parent->cast(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, - state->execution_timeout(), state->batch_size())); + fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state)); } _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); return Status::OK(); @@ -81,16 +80,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - if (state->query_options().enable_parallel_result_sink) { - state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); - } else { - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); - } + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( - _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); + _sender.get(), _output_vexpr_ctxs, _profile)); break; } default: @@ -135,8 +129,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { if (state->query_options().enable_parallel_result_sink) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), _result_sink_buffer_size_rows, &_sender, - state->execution_timeout(), state->batch_size())); + state->query_id(), _result_sink_buffer_size_rows, &_sender, state)); } return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 61ea5ef080de5f..98feb85ad6b9c2 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -33,9 +33,11 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" #include "pipeline/dependency.h" -#include "runtime/exec_env.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" +#include "util/string_util.h" #include "util/thrift_util.h" +#include "vec/core/block.h" namespace doris { @@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size) +void GetArrowResultBatchCtx::on_failure(const Status& status) { + DCHECK(!status.ok()) << "status is ok, errmsg=" << status; + status.to_protobuf(result->mutable_status()); + delete this; +} + +void GetArrowResultBatchCtx::on_close(int64_t packet_seq) { + Status status; + status.to_protobuf(result->mutable_status()); + result->set_packet_seq(packet_seq); + result->set_eos(true); + delete this; +} + +void GetArrowResultBatchCtx::on_data( + const std::shared_ptr& block, int64_t packet_seq, int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone, + RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter) { + Status st = Status::OK(); + if (result != nullptr) { + size_t uncompressed_bytes = 0, compressed_bytes = 0; + SCOPED_TIMER(serialize_batch_ns_timer); + st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes, + &compressed_bytes, fragement_transmission_compression_type, false); + COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes); + COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes); + if (st.ok()) { + result->set_packet_seq(packet_seq); + result->set_eos(false); + if (packet_seq == 0) { + result->set_timezone(timezone); + } + } else { + result->clear_block(); + result->set_packet_seq(packet_seq); + LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; + } + } else { + result->set_empty_batch(true); + result->set_packet_seq(packet_seq); + result->set_eos(false); + } + + /// The size limit of proto buffer message is 2G + if (result->ByteSizeLong() > std::numeric_limits::max()) { + st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); + result->clear_block(); + } + st.to_protobuf(result->mutable_status()); + delete this; +} + +BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_limit(buffer_size), _packet_num(0), - _batch_size(batch_size) { + _batch_size(state->batch_size()), + _timezone(state->timezone()), + _timezone_obj(state->timezone_obj()), + _be_exec_version(state->be_exec_version()), + _fragement_transmission_compression_type( + state->fragement_transmission_compression_type()), + _profile("BufferControlBlock " + print_id(_fragment_id)) { _query_statistics = std::make_unique(); + _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime"); + _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES); + _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES); + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id))); } BufferControlBlock::~BufferControlBlock() { @@ -148,36 +216,44 @@ Status BufferControlBlock::add_batch(RuntimeState* state, } Status BufferControlBlock::add_arrow_batch(RuntimeState* state, - std::shared_ptr& result) { + std::shared_ptr& result) { std::unique_lock l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); } - int num_rows = result->num_rows(); - - // TODO: merge RocordBatch, ToStructArray -> Make again + if (_waiting_arrow_result_batch_rpc.empty()) { + // TODO: Merge result into block to reduce rpc times + int num_rows = result->rows(); + _arrow_flight_result_batch_queue.push_back(std::move(result)); + _instance_rows_in_queue.emplace_back(); + _instance_rows[state->fragment_instance_id()] += num_rows; + _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + _arrow_data_arrival + .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr,) + } else { + auto* ctx = _waiting_arrow_result_batch_rpc.front(); + _waiting_arrow_result_batch_rpc.pop_front(); + ctx->on_data(result, _packet_num, _be_exec_version, + _fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer, + _uncompressed_bytes_counter, _compressed_bytes_counter); + _packet_num++; + } - _arrow_flight_batch_queue.push_back(std::move(result)); - _instance_rows_in_queue.emplace_back(); - _instance_rows[state->fragment_instance_id()] += num_rows; - _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; - _arrow_data_arrival.notify_one(); _update_dependency(); return Status::OK(); } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { std::lock_guard l(_lock); + Defer defer {[&]() { _update_dependency(); }}; if (!_status.ok()) { ctx->on_failure(_status); - _update_dependency(); return; } if (_is_cancelled) { ctx->on_failure(Status::Cancelled("Cancelled")); - _update_dependency(); return; } if (!_fe_result_batch_queue.empty()) { @@ -191,54 +267,132 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { ctx->on_data(result, _packet_num); _packet_num++; - _update_dependency(); return; } if (_is_close) { ctx->on_close(_packet_num, _query_statistics.get()); - _update_dependency(); return; } // no ready data, push ctx to waiting list _waiting_rpc.push_back(ctx); - _update_dependency(); } -Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result) { +Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result, + cctz::time_zone& timezone_obj) { std::unique_lock l(_lock); + Defer defer {[&]() { _update_dependency(); }}; if (!_status.ok()) { return _status; } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20)); } if (_is_cancelled) { - return Status::Cancelled("Cancelled"); + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); + if (!_arrow_flight_result_batch_queue.empty()) { + *result = std::move(_arrow_flight_result_batch_queue.front()); + _arrow_flight_result_batch_queue.pop_front(); + timezone_obj = _timezone_obj; + for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } _instance_rows_in_queue.pop_front(); _packet_num++; - _update_dependency(); return Status::OK(); } // normal path end if (_is_close) { - _update_dependency(); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return Status::OK(); + } + return Status::InternalError( + fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id))); +} + +void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { + std::unique_lock l(_lock); + SCOPED_ATTACH_TASK(_mem_tracker); + Defer defer {[&]() { _update_dependency(); }}; + if (!_status.ok()) { + ctx->on_failure(_status); + return; + } + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)))); + return; + } + + if (!_arrow_flight_result_batch_queue.empty()) { + auto block = _arrow_flight_result_batch_queue.front(); + _arrow_flight_result_batch_queue.pop_front(); + for (auto it : _instance_rows_in_queue.front()) { + _instance_rows[it.first] -= it.second; + } + _instance_rows_in_queue.pop_front(); + + ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, + _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, + _compressed_bytes_counter); + _packet_num++; + return; + } + + // normal path end + if (_is_close) { + ctx->on_close(_packet_num); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return; + } + // no ready data, push ctx to waiting list + _waiting_arrow_result_batch_rpc.push_back(ctx); +} + +void BufferControlBlock::register_arrow_schema(const std::shared_ptr& arrow_schema) { + std::lock_guard l(_lock); + _arrow_schema = arrow_schema; +} + +Status BufferControlBlock::find_arrow_schema(std::shared_ptr* arrow_schema) { + std::unique_lock l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); + } + + // normal path end + if (_arrow_schema != nullptr) { + *arrow_schema = _arrow_schema; return Status::OK(); } - return Status::InternalError("Get Arrow Batch Abnormal Ending"); + + if (_is_close) { + return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id))); + } + return Status::InternalError( + fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id))); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -272,18 +426,37 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { } _waiting_rpc.clear(); } + + if (!_waiting_arrow_result_batch_rpc.empty()) { + if (_status.ok()) { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_close(_packet_num); + } + } else { + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(_status); + } + } + _waiting_arrow_result_batch_rpc.clear(); + } return Status::OK(); } void BufferControlBlock::cancel() { std::unique_lock l(_lock); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _is_cancelled = true; _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } _waiting_rpc.clear(); + for (auto& ctx : _waiting_arrow_result_batch_rpc) { + ctx->on_failure(Status::Cancelled("Cancelled")); + } + _waiting_arrow_result_batch_rpc.clear(); _update_dependency(); + _arrow_flight_result_batch_queue.clear(); } void BufferControlBlock::set_dependency( diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 8b45552b2fadb1..a75b670836d121 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -17,6 +17,8 @@ #pragma once +#include +#include #include #include #include @@ -52,7 +54,12 @@ namespace pipeline { class Dependency; } // namespace pipeline +namespace vectorized { +class Block; +} // namespace vectorized + class PFetchDataResult; +class PFetchArrowDataResult; struct GetResultBatchCtx { brpc::Controller* cntl = nullptr; @@ -69,18 +76,44 @@ struct GetResultBatchCtx { bool eos = false); }; +struct GetArrowResultBatchCtx { + brpc::Controller* cntl = nullptr; + PFetchArrowDataResult* result = nullptr; + google::protobuf::Closure* done = nullptr; + + GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult* result_, + google::protobuf::Closure* done_) + : cntl(cntl_), result(result_), done(done_) {} + + void on_failure(const Status& status); + void on_close(int64_t packet_seq); + void on_data(const std::shared_ptr& block, int64_t packet_seq, + int be_exec_version, + segment_v2::CompressionTypePB fragement_transmission_compression_type, + std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer, + RuntimeProfile::Counter* uncompressed_bytes_counter, + RuntimeProfile::Counter* compressed_bytes_counter); +}; + // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size); + BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state); ~BufferControlBlock(); Status init(); Status add_batch(RuntimeState* state, std::unique_ptr& result); - Status add_arrow_batch(RuntimeState* state, std::shared_ptr& result); + Status add_arrow_batch(RuntimeState* state, std::shared_ptr& result); void get_batch(GetResultBatchCtx* ctx); - Status get_arrow_batch(std::shared_ptr* result); + // for ArrowFlightBatchLocalReader + Status get_arrow_batch(std::shared_ptr* result, + cctz::time_zone& timezone_obj); + // for ArrowFlightBatchRemoteReader + void get_arrow_batch(GetArrowResultBatchCtx* ctx); + + void register_arrow_schema(const std::shared_ptr& arrow_schema); + Status find_arrow_schema(std::shared_ptr* arrow_schema); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. @@ -89,6 +122,7 @@ class BufferControlBlock { void cancel(); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } + [[nodiscard]] std::shared_ptr mem_tracker() { return _mem_tracker; } void update_return_rows(int64_t num_rows) { // _query_statistics may be null when the result sink init failed @@ -106,7 +140,7 @@ class BufferControlBlock { void _update_dependency(); using FeResultQueue = std::list>; - using ArrowFlightResultQueue = std::list>; + using ArrowFlightResultQueue = std::list>; // result's query id TUniqueId _fragment_id; @@ -118,7 +152,9 @@ class BufferControlBlock { // blocking queue for batch FeResultQueue _fe_result_batch_queue; - ArrowFlightResultQueue _arrow_flight_batch_queue; + ArrowFlightResultQueue _arrow_flight_result_batch_queue; + // for arrow flight + std::shared_ptr _arrow_schema; // protects all subsequent data in this block std::mutex _lock; @@ -128,6 +164,7 @@ class BufferControlBlock { std::condition_variable _arrow_data_arrival; std::deque _waiting_rpc; + std::deque _waiting_arrow_result_batch_rpc; // only used for FE using return rows to check limit std::unique_ptr _query_statistics; @@ -137,6 +174,17 @@ class BufferControlBlock { std::list> _instance_rows_in_queue; int _batch_size; + std::string _timezone; + cctz::time_zone _timezone_obj; + int _be_exec_version; + segment_v2::CompressionTypePB _fragement_transmission_compression_type; + std::shared_ptr _mem_tracker; + + // only used for ArrowFlightBatchRemoteReader + RuntimeProfile _profile; + RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr; + RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr; + RuntimeProfile::Counter* _compressed_bytes_counter = nullptr; }; } // namespace doris diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index ccbf0c3ff6729e..ecc3d56773ca82 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -67,8 +67,8 @@ Status ResultBufferMgr::init() { } Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, int exec_timout, - int batch_size) { + std::shared_ptr* sender, + RuntimeState* state) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr control_block = nullptr; - control_block = std::make_shared(query_id, buffer_size, batch_size); + control_block = std::make_shared(query_id, buffer_size, state); { std::unique_lock wlock(_buffer_map_lock); @@ -87,7 +87,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size // otherwise in some case may block all fragment handle threads // details see issue https://github.com/apache/doris/issues/16203 // add extra 5s for avoid corner case - int64_t max_timeout = time(nullptr) + exec_timout + 5; + int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5; cancel_at_time(max_timeout, query_id); } *sender = control_block; @@ -105,27 +105,19 @@ std::shared_ptr ResultBufferMgr::find_control_block(const TU return {}; } -void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr& arrow_schema) { - std::unique_lock wlock(_arrow_schema_map_lock); - _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema)); -} - -std::shared_ptr ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) { - std::shared_lock rlock(_arrow_schema_map_lock); - auto iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != iter) { - return iter->second; +Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id, + std::shared_ptr* schema) { + std::shared_ptr cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no arrow schema for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - - return nullptr; + return cb->find_arrow_schema(schema); } void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) { - TUniqueId tid; - tid.__set_hi(finst_id.hi()); - tid.__set_lo(finst_id.lo()); + TUniqueId tid = UniqueId(finst_id).to_thrift(); std::shared_ptr cb = find_control_block(tid); if (cb == nullptr) { ctx->on_failure(Status::InternalError("no result for this query, tid={}", print_id(tid))); @@ -134,16 +126,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c cb->get_batch(ctx); } +Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr* mem_tracker) { + std::shared_ptr cb = find_control_block(finst_id); + if (cb == nullptr) { + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); + } + *mem_tracker = cb->mem_tracker(); + return Status::OK(); +} + Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, - std::shared_ptr* result) { + std::shared_ptr* result, + cctz::time_zone& timezone_obj) { std::shared_ptr cb = find_control_block(finst_id); if (cb == nullptr) { - return Status::InternalError("no result for this query, finst_id={}", print_id(finst_id)); + return Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(finst_id)); } - RETURN_IF_ERROR(cb->get_arrow_batch(result)); + RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj)); return Status::OK(); } +void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx) { + TUniqueId tid = UniqueId(finst_id).to_thrift(); + std::shared_ptr cb = find_control_block(tid); + if (cb == nullptr) { + ctx->on_failure(Status::InternalError( + "no result for this query, maybe query has been canceled, finst_id={}", + print_id(tid))); + return; + } + cb->get_arrow_batch(ctx); +} + void ResultBufferMgr::cancel(const TUniqueId& query_id) { { std::unique_lock wlock(_buffer_map_lock); @@ -154,15 +173,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) { _buffer_map.erase(iter); } } - - { - std::unique_lock wlock(_arrow_schema_map_lock); - auto arrow_schema_iter = _arrow_schema_map.find(query_id); - - if (_arrow_schema_map.end() != arrow_schema_iter) { - _arrow_schema_map.erase(arrow_schema_iter); - } - } } void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8bac69c23ac522..1efa0a544f1961 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -17,7 +17,9 @@ #pragma once +#include #include +#include #include #include @@ -41,8 +43,14 @@ namespace doris { class BufferControlBlock; struct GetResultBatchCtx; +struct GetArrowResultBatchCtx; class PUniqueId; +class RuntimeState; +class MemTrackerLimiter; class Thread; +namespace vectorized { +class Block; +} // namespace vectorized // manage all result buffer control block in one backend class ResultBufferMgr { @@ -58,17 +66,18 @@ class ResultBufferMgr { // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, int exec_timeout, - int batch_size); + std::shared_ptr* sender, RuntimeState* state); // fetch data result to FE void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); - // fetch data result to Arrow Flight Server - Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr* result); - - void register_arrow_schema(const TUniqueId& query_id, - const std::shared_ptr& arrow_schema); - std::shared_ptr find_arrow_schema(const TUniqueId& query_id); + // fetch data result to Arrow Flight Client + Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr* result, + cctz::time_zone& timezone_obj); + // fetch data result to Other BE forwards to Client + void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx); + Status find_mem_tracker(const TUniqueId& finst_id, + std::shared_ptr* mem_tracker); + Status find_arrow_schema(const TUniqueId& query_id, std::shared_ptr* schema); // cancel void cancel(const TUniqueId& fragment_id); @@ -79,7 +88,6 @@ class ResultBufferMgr { private: using BufferMap = std::unordered_map>; using TimeoutMap = std::map>; - using ArrowSchemaMap = std::unordered_map>; std::shared_ptr find_control_block(const TUniqueId& query_id); @@ -91,10 +99,6 @@ class ResultBufferMgr { std::shared_mutex _buffer_map_lock; // buffer block map BufferMap _buffer_map; - // lock for arrow schema map - std::shared_mutex _arrow_schema_map_lock; - // for arrow flight - ArrowSchemaMap _arrow_schema_map; // lock for timeout map std::mutex _timeout_lock; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index a07e479d759be7..e935aff996d55e 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -17,53 +17,294 @@ #include "service/arrow_flight/arrow_flight_batch_reader.h" +#include +#include #include +#include +#include -#include "arrow/builder.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" #include "util/arrow/utils.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" -namespace doris { -namespace flight { +namespace doris::flight { -std::shared_ptr ArrowFlightBatchReader::schema() const { - return schema_; +ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase( + const std::shared_ptr& statement) + : _statement(statement) {} + +std::shared_ptr ArrowFlightBatchReaderBase::schema() const { + return _schema; +} + +arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::string& msg) { + std::string status_msg = + fmt::format("ArrowFlightBatchReader {}, packet_seq={}, result={}:{}, finistId={}", msg, + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id)); + LOG(WARNING) << status_msg; + return arrow::Status::Invalid(status_msg); } -ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr statement, - std::shared_ptr schema) - : statement_(std::move(statement)), schema_(std::move(schema)) {} +ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() { + VLOG_NOTICE << fmt::format( + "ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, " + "convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}", + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id), _convert_arrow_batch_timer, _deserialize_block_timer, + _mem_tracker->peak_consumption()); +} -arrow::Result> ArrowFlightBatchReader::Create( - const std::shared_ptr& statement_) { +ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader( + const std::shared_ptr& statement, + const std::shared_ptr& schema, + const std::shared_ptr& mem_tracker) + : ArrowFlightBatchReaderBase(statement) { + _schema = schema; + _mem_tracker = mem_tracker; +} + +arrow::Result> ArrowFlightBatchLocalReader::Create( + const std::shared_ptr& statement) { + DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost()); // Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket // to the ADBC client, so that the schema and control block can be found. - auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id); - if (schema == nullptr) { - ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( - "Client not found arrow flight schema, maybe query has been canceled, queryid: {}", - print_id(statement_->query_id)))); + std::shared_ptr schema; + RETURN_ARROW_STATUS_IF_ERROR( + ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, &schema)); + std::shared_ptr mem_tracker; + RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker( + statement->query_id, &mem_tracker)); + + std::shared_ptr result( + new ArrowFlightBatchLocalReader(statement, schema, mem_tracker)); + return result; +} + +arrow::Status ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr* out) { + // parameter *out not nullptr + *out = nullptr; + SCOPED_ATTACH_TASK(_mem_tracker); + std::shared_ptr result; + auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, &result, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + if (result == nullptr) { + // eof, normal path end + return arrow::Status::OK(); + } + + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + st = convert_to_arrow_batch(*result, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + } + + _packet_seq++; + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } - std::shared_ptr result(new ArrowFlightBatchReader(statement_, schema)); + return arrow::Status::OK(); +} + +ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader( + const std::shared_ptr& statement, + const std::shared_ptr& stub) + : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), _block(nullptr) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", print_id(_statement->query_id))); +} + +arrow::Result> ArrowFlightBatchRemoteReader::Create( + const std::shared_ptr& statement) { + std::shared_ptr stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + statement->result_addr); + if (!stub) { + std::string msg = fmt::format( + "ArrowFlightBatchRemoteReader get rpc stub failed, result_addr={}:{}, finistId={}", + statement->result_addr.hostname, statement->result_addr.port, + print_id(statement->query_id)); + LOG(WARNING) << msg; + return arrow::Status::Invalid(msg); + } + + std::shared_ptr result( + new ArrowFlightBatchRemoteReader(statement, stub)); + ARROW_RETURN_NOT_OK(result->init_schema()); return result; } -arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr* out) { - // *out not nullptr +arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() { + Status st; + auto request = std::make_shared(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowFlightSchemaRequest, + DummyBrpcCallback>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + if (callback->response_->has_schema() && !callback->response_->schema().empty()) { + auto input = + arrow::io::BufferReader::FromString(std::string(callback->response_->schema())); + ARROW_ASSIGN_OR_RAISE(auto reader, + arrow::ipc::RecordBatchStreamReader::Open( + input.get(), arrow::ipc::IpcReadOptions::Defaults())); + _schema = reader->schema(); + } else { + return _return_invalid_status(fmt::format("fetch schema error: not find schema")); + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() { + DCHECK(_block == nullptr); + while (true) { + // if `continue` occurs, data is invalid, continue fetch, block is nullptr. + // if `break` occurs, fetch data successfully (block is not nullptr) or fetch eos. + Status st; + auto request = std::make_shared(); + auto* pfinst_id = request->mutable_finst_id(); + pfinst_id->set_hi(_statement->query_id.hi); + pfinst_id->set_lo(_statement->query_id.lo); + auto callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PFetchArrowDataRequest, + DummyBrpcCallback>::create_unique(request, callback); + callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms); + callback->cntl_->ignore_eovercrowded(); + + _brpc_stub->fetch_arrow_data(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + callback->join(); + + if (callback->cntl_->Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _brpc_stub, _statement->result_addr.hostname, + _statement->result_addr.port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + callback->cntl_->remote_side()); + } + auto error_code = callback->cntl_->ErrorCode(); + auto error_text = callback->cntl_->ErrorText(); + return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}", + berror(error_code), error_text)); + } + st = Status::create(callback->response_->status()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + + DCHECK(callback->response_->has_packet_seq()); + if (_packet_seq != callback->response_->packet_seq()) { + return _return_invalid_status( + fmt::format("fetch data receive packet failed, expect: {}, receive: {}", + _packet_seq, callback->response_->packet_seq())); + } + _packet_seq++; + + if (callback->response_->has_eos() && callback->response_->eos()) { + break; + } + + if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) { + continue; + } + + DCHECK(callback->response_->has_block()); + if (callback->response_->block().ByteSizeLong() == 0) { + continue; + } + + std::call_once(_timezone_once_flag, [this, callback] { + DCHECK(callback->response_->has_timezone()); + TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj); + }); + + { + SCOPED_ATOMIC_TIMER(&_deserialize_block_timer); + _block = vectorized::Block::create_shared(); + st = _block->deserialize(callback->response_->block()); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + break; + } + + const auto rows = _block->rows(); + if (rows == 0) { + _block = nullptr; + continue; + } + } + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::init_schema() { + ARROW_RETURN_NOT_OK(_fetch_schema()); + DCHECK(_schema != nullptr); + return arrow::Status::OK(); +} + +arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr* out) { + // parameter *out not nullptr *out = nullptr; - auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); + SCOPED_ATTACH_TASK(_mem_tracker); + ARROW_RETURN_NOT_OK(_fetch_data()); + if (_block == nullptr) { + // eof, normal path end, last _fetch_data return block is nullptr + return arrow::Status::OK(); + } + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + auto st = convert_to_arrow_batch(*_block, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch failed"); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } + _block = nullptr; + if (*out != nullptr) { - VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " - << (*out)->num_columns(); + VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; } return arrow::Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h b/be/src/service/arrow_flight/arrow_flight_batch_reader.h index e0279cbb70da12..612ebc8063ca37 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h @@ -17,40 +17,91 @@ #pragma once +#include #include #include +#include #include "arrow/record_batch.h" +#include "runtime/exec_env.h" namespace doris { + +namespace vectorized { +class Block; +} // namespace vectorized + namespace flight { struct QueryStatement { public: TUniqueId query_id; + TNetworkAddress result_addr; // BE brpc ip & port std::string sql; - QueryStatement(const TUniqueId& query_id_, const std::string& sql_) - : query_id(query_id_), sql(sql_) {} + QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_, std::string sql_) + : query_id(std::move(query_id_)), + result_addr(std::move(result_addr_)), + sql(std::move(sql_)) {} +}; + +class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader { +public: + // RecordBatchReader force override + [[nodiscard]] std::shared_ptr schema() const override; + +protected: + ArrowFlightBatchReaderBase(const std::shared_ptr& statement); + ~ArrowFlightBatchReaderBase() override; + arrow::Status _return_invalid_status(const std::string& msg); + + std::shared_ptr _statement; + std::shared_ptr _schema; + cctz::time_zone _timezone_obj; + std::atomic _packet_seq = 0; + + std::atomic _convert_arrow_batch_timer = 0; + std::atomic _deserialize_block_timer = 0; + std::shared_ptr _mem_tracker; }; -class ArrowFlightBatchReader : public arrow::RecordBatchReader { +class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase { public: - static arrow::Result> Create( + static arrow::Result> Create( const std::shared_ptr& statement); - [[nodiscard]] std::shared_ptr schema() const override; + arrow::Status ReadNext(std::shared_ptr* out) override; +private: + ArrowFlightBatchLocalReader(const std::shared_ptr& statement, + const std::shared_ptr& schema, + const std::shared_ptr& mem_tracker); +}; + +class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase { +public: + static arrow::Result> Create( + const std::shared_ptr& statement); + + // create arrow RecordBatchReader must initialize the schema. + // so when creating arrow RecordBatchReader, fetch result data once, + // which will return Block and some necessary information, and extract arrow schema from Block. + arrow::Status init_schema(); arrow::Status ReadNext(std::shared_ptr* out) override; private: - std::shared_ptr statement_; - std::shared_ptr schema_; + ArrowFlightBatchRemoteReader(const std::shared_ptr& statement, + const std::shared_ptr& stub); - ArrowFlightBatchReader(std::shared_ptr statement, - std::shared_ptr schema); + arrow::Status _fetch_schema(); + arrow::Status _fetch_data(); + + std::shared_ptr _brpc_stub = nullptr; + std::once_flag _timezone_once_flag; + std::shared_ptr _block; }; } // namespace flight + } // namespace doris diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index 60b665c62fc781..90ee3edfbea72b 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -19,15 +19,17 @@ #include +#include + #include "arrow/flight/sql/server.h" +#include "gutil/strings/split.h" #include "service/arrow_flight/arrow_flight_batch_reader.h" #include "service/arrow_flight/flight_sql_info.h" #include "service/backend_options.h" #include "util/arrow/utils.h" #include "util/uid_util.h" -namespace doris { -namespace flight { +namespace doris::flight { class FlightSqlServer::Impl { private: @@ -41,14 +43,21 @@ class FlightSqlServer::Impl { return arrow::flight::Ticket {std::move(ticket)}; } - arrow::Result> decode_ticket(const std::string& ticket) { - auto divider = ticket.find(':'); - if (divider == std::string::npos) { - return arrow::Status::Invalid("Malformed ticket"); + arrow::Result> decode_ticket(const std::string& ticket) { + std::vector fields = strings::Split(ticket, "&"); + if (fields.size() != 4) { + return arrow::Status::Invalid(fmt::format("Malformed ticket, size: {}", fields.size())); } - std::string query_id = ticket.substr(0, divider); - std::string sql = ticket.substr(divider + 1); - return std::make_pair(std::move(sql), std::move(query_id)); + + TUniqueId queryid; + parse_id(fields[0], &queryid); + TNetworkAddress result_addr; + result_addr.hostname = fields[1]; + result_addr.port = std::stoi(fields[2]); + std::string sql = fields[3]; + std::shared_ptr statement = + std::make_shared(queryid, result_addr, sql); + return statement; } public: @@ -59,18 +68,21 @@ class FlightSqlServer::Impl { arrow::Result> DoGetStatement( const arrow::flight::ServerCallContext& context, const arrow::flight::sql::StatementQueryTicket& command) { - ARROW_ASSIGN_OR_RAISE(auto pair, decode_ticket(command.statement_handle)); - const std::string& sql = pair.first; - const std::string query_id = pair.second; - TUniqueId queryid; - parse_id(query_id, &queryid); - - auto statement = std::make_shared(queryid, sql); - - std::shared_ptr reader; - ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchReader::Create(statement)); - - return std::make_unique(reader); + ARROW_ASSIGN_OR_RAISE(auto statement, decode_ticket(command.statement_handle)); + // if IP:BrpcPort in the Ticket is not current BE node, + // pulls the query result Block from the BE node specified by IP:BrpcPort, + // converts it to Arrow Batch and returns it to ADBC client. + // use brpc to transmit blocks between BEs. + if (statement->result_addr.hostname == BackendOptions::get_localhost() && + statement->result_addr.port == config::brpc_port) { + std::shared_ptr reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchLocalReader::Create(statement)); + return std::make_unique(reader); + } else { + std::shared_ptr reader; + ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchRemoteReader::Create(statement)); + return std::make_unique(reader); + } } }; @@ -135,5 +147,4 @@ Status FlightSqlServer::join() { return Status::OK(); } -} // namespace flight -} // namespace doris +} // namespace doris::flight diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ae84081813f1f3..29eb01bad2aaa8 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -157,6 +157,11 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT); + bthread_key_t btls_key; static void thread_context_deleter(void* d) { @@ -200,7 +205,14 @@ PInternalService::PInternalService(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light") { + "brpc_light"), + _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 + ? config::brpc_arrow_flight_work_pool_threads + : std::max(512, CpuInfo::num_cores() * 16), + config::brpc_arrow_flight_work_pool_max_queue_size != -1 + ? config::brpc_arrow_flight_work_pool_max_queue_size + : std::max(20480, CpuInfo::num_cores() * 640), + "brpc_arrow_flight") { REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -219,6 +231,15 @@ PInternalService::PInternalService(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, + [this]() { return _arrow_flight_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, + [this]() { return _arrow_flight_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, + []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, + []() { return config::brpc_arrow_flight_work_pool_threads; }); + _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool); @@ -242,6 +263,11 @@ PInternalService::~PInternalService() { DEREGISTER_HOOK_METRIC(heavy_work_max_threads); DEREGISTER_HOOK_METRIC(light_work_max_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } @@ -650,6 +676,22 @@ void PInternalService::fetch_data(google::protobuf::RpcController* controller, } } +void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, + PFetchArrowDataResult* result, + google::protobuf::Closure* done) { + bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + auto* cntl = static_cast(controller); + auto* ctx = new GetArrowResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx); + }); + if (!ret) { + offer_failed(result, done, _arrow_flight_work_pool); + return; + } +} + void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -857,23 +899,21 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); - std::shared_ptr schema = - ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( - UniqueId(request->finst_id()).to_thrift()); - if (schema == nullptr) { - LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled"; - auto st = Status::NotFound( - "FE not found arrow flight schema, maybe query has been canceled"); + std::shared_ptr schema; + auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( + UniqueId(request->finst_id()).to_thrift(), &schema); + if (!st.ok()) { st.to_protobuf(result->mutable_status()); return; } std::string schema_str; - auto st = serialize_arrow_schema(&schema, &schema_str); + st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (config::public_access_ip != "") { + if (!config::public_access_ip.empty() && config::public_access_port != -1) { result->set_be_arrow_flight_ip(config::public_access_ip); + result->set_be_arrow_flight_port(config::public_access_port); } } st.to_protobuf(result->mutable_status()); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b3ab1c5a6474c0..66a0f867393793 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -97,6 +97,10 @@ class PInternalService : public PBackendService { void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void fetch_arrow_data(google::protobuf::RpcController* controller, + const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, + google::protobuf::Closure* done) override; + void outfile_write_success(google::protobuf::RpcController* controller, const POutfileWriteSuccessRequest* request, POutfileWriteSuccessResult* result, @@ -271,6 +275,7 @@ class PInternalService : public PBackendService { // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; + FifoThreadPool _arrow_flight_work_pool; }; // `StorageEngine` mixin for `PInternalService` diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 308986df68e1d2..dd11d5ae46f740 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -157,16 +157,9 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* field, - const std::string& timezone) { - std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone)); - *field = arrow::field(desc->col_name(), type, desc->is_nullable()); - return Status::OK(); -} - -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, - const std::string& timezone) { +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; @@ -179,9 +172,9 @@ Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, - const std::string& timezone) { +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (int i = 0; i < output_vexpr_ctxs.size(); i++) { std::shared_ptr arrow_type; diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index d036086902348e..d10bd54b2ae5af 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -44,12 +44,13 @@ class RowDescriptor; Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, const std::string& timezone); -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, - const std::string& timezone); +Status get_arrow_schema_from_block(const vectorized::Block& block, + std::shared_ptr* result, + const std::string& timezone); -Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr* result, - const std::string& timezone); +Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr* result, + const std::string& timezone); Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp index 5ccff849034a4a..742f5bd0fc33f3 100644 --- a/be/src/util/arrow/utils.cpp +++ b/be/src/util/arrow/utils.cpp @@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) { } arrow::Status to_arrow_status(const Status& status) { - if (status.ok()) { + if (LIKELY(status.ok())) { return arrow::Status::OK(); } else { + LOG(WARNING) << status.to_string(); // The length of exception msg returned to the ADBC Client cannot larger than 8192, // otherwise ADBC Client will receive: // `INTERNAL: http2 exception Header size exceeded max allowed size (8192)`. diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 3006461059c106..69516773debdbc 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -218,6 +218,11 @@ class DorisMetrics { UIntGauge* heavy_work_max_threads = nullptr; UIntGauge* light_work_max_threads = nullptr; + UIntGauge* arrow_flight_work_pool_queue_size = nullptr; + UIntGauge* arrow_flight_work_active_threads = nullptr; + UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr; + UIntGauge* arrow_flight_work_max_threads = nullptr; + UIntGauge* flush_thread_pool_queue_size = nullptr; UIntGauge* flush_thread_pool_thread_num = nullptr; diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index b23d1668465bbd..c54c27a84844b1 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -19,21 +19,16 @@ #include "runtime/buffer_control_block.h" #include "runtime/runtime_state.h" -#include "util/arrow/block_convertor.h" -#include "util/arrow/row_batch.h" +#include "runtime/thread_context.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" -namespace doris { -namespace vectorized { +namespace doris::vectorized { -VArrowFlightResultWriter::VArrowFlightResultWriter( - BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, const std::shared_ptr& arrow_schema) - : _sinker(sinker), - _output_vexpr_ctxs(output_vexpr_ctxs), - _parent_profile(parent_profile), - _arrow_schema(arrow_schema) {} +VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker, + const VExprContextSPtrs& output_vexpr_ctxs, + RuntimeProfile* parent_profile) + : _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs), _parent_profile(parent_profile) {} Status VArrowFlightResultWriter::init(RuntimeState* state) { _init_profile(); @@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) { return Status::InternalError("sinker is NULL pointer."); } _is_dry_run = state->query_options().dry_run_query; - _timezone_obj = state->timezone_obj(); return Status::OK(); } void VArrowFlightResultWriter::_init_profile() { _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); - _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); @@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(RuntimeState* state, Block& input_block) RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch - std::shared_ptr result; - auto num_rows = block.rows(); - // arrow::RecordBatch without `nbytes()` in C++ - uint64_t bytes_sent = block.bytes(); { - SCOPED_TIMER(_convert_tuple_timer); - RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, _timezone_obj)); - } - { - SCOPED_TIMER(_result_send_timer); - // If this is a dry run task, no need to send data block - if (!_is_dry_run) { - status = _sinker->add_arrow_batch(state, result); - } - if (status.ok()) { - _written_rows += num_rows; + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker()); + std::unique_ptr mutable_block = + vectorized::MutableBlock::create_unique(block.clone_empty()); + RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block))); + std::shared_ptr output_block = vectorized::Block::create_shared(); + output_block->swap(mutable_block->to_block()); + + auto num_rows = output_block->rows(); + // arrow::RecordBatch without `nbytes()` in C++ + uint64_t bytes_sent = output_block->bytes(); + { + SCOPED_TIMER(_result_send_timer); + // If this is a dry run task, no need to send data block if (!_is_dry_run) { - _bytes_sent += bytes_sent; + status = _sinker->add_arrow_batch(state, output_block); + } + if (status.ok()) { + _written_rows += num_rows; + if (!_is_dry_run) { + _bytes_sent += bytes_sent; + } + } else { + LOG(WARNING) << "append result batch to sink failed."; } - } else { - LOG(WARNING) << "append result batch to sink failed."; } } return status; @@ -100,5 +95,4 @@ Status VArrowFlightResultWriter::close(Status st) { return Status::OK(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index ab2578421c80bc..c87518de5e1561 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -17,13 +17,6 @@ #pragma once -#include -#include -#include - -#include -#include - #include "common/status.h" #include "runtime/result_writer.h" #include "util/runtime_profile.h" @@ -39,8 +32,7 @@ class Block; class VArrowFlightResultWriter final : public ResultWriter { public: VArrowFlightResultWriter(BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, - const std::shared_ptr& arrow_schema); + RuntimeProfile* parent_profile); Status init(RuntimeState* state) override; @@ -58,8 +50,6 @@ class VArrowFlightResultWriter final : public ResultWriter { RuntimeProfile* _parent_profile = nullptr; // parent profile from result sink. not owned // total time cost on append batch operation RuntimeProfile::Counter* _append_row_batch_timer = nullptr; - // tuple convert timer, child timer of _append_row_batch_timer - RuntimeProfile::Counter* _convert_tuple_timer = nullptr; // file write timer, child timer of _append_row_batch_timer RuntimeProfile::Counter* _result_send_timer = nullptr; // number of sent rows @@ -70,10 +60,6 @@ class VArrowFlightResultWriter final : public ResultWriter { bool _is_dry_run = false; uint64_t _bytes_sent = 0; - - std::shared_ptr _arrow_schema; - - cctz::time_zone _timezone_obj; }; } // namespace vectorized } // namespace doris diff --git a/be/test/runtime/result_buffer_mgr_test.cpp b/be/test/runtime/result_buffer_mgr_test.cpp index 152c155ef0a3c5..4ab9186c5fab10 100644 --- a/be/test/runtime/result_buffer_mgr_test.cpp +++ b/be/test/runtime/result_buffer_mgr_test.cpp @@ -34,6 +34,7 @@ class ResultBufferMgrTest : public testing::Test { virtual void SetUp() {} private: + RuntimeState _state; }; TEST_F(ResultBufferMgrTest, create_normal) { @@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); } TEST_F(ResultBufferMgrTest, create_same_buffer) { @@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); std::shared_ptr control_block2; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, &_state).ok()); EXPECT_EQ(control_block1.get(), control_block1.get()); } @@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); result->result_batch.rows.push_back("hello test"); @@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); TFetchDataResult* result = new TFetchDataResult(); query_id.lo = 11; @@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) { query_id.hi = 100; std::shared_ptr control_block1; - EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok()); + EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok()); EXPECT_TRUE(buffer_mgr.cancel(query_id).ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 5d431b386b7f11..371680813502f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -27,6 +27,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; import com.google.protobuf.Any; @@ -224,24 +225,40 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con } } else { // Now only query stmt will pull results from BE. - final ByteString handle; - if (connectContext.getSessionVariable().enableParallelResultSink()) { - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query); - } else { - // only one instance - handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query); - } Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); if (schema == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } + + TUniqueId queryId = connectContext.queryId(); + if (!connectContext.getSessionVariable().enableParallelResultSink()) { + // only one instance + queryId = connectContext.getFinstId(); + } + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname + + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) .build(); Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); // TODO Support multiple endpoints. - Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + Location location; + if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, + flightSQLConnectProcessor.getPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, + connectContext.getResultFlightServerAddr().port); + } List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. return new FlightInfo(schema, descriptor, endpoints, -1, -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index b812bf81d8a514..febadbef0ab0d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -53,11 +53,12 @@ /** * Process one flgiht sql connection. - * + *

* Must use try-with-resources. */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); + private TNetworkAddress publicAccessAddr = new TNetworkAddress(); public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -66,6 +67,10 @@ public FlightSqlConnectProcessor(ConnectContext context) { context.setReturnResultFromLocal(true); } + public TNetworkAddress getPublicAccessAddr() { + return publicAccessAddr; + } + public void prepare(MysqlCommand command) { // set status of query to OK. ctx.getState().reset(); @@ -130,10 +135,11 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus.toString())); + DebugUtil.printId(tid), resultStatus)); } - if (pResult.hasBeArrowFlightIp()) { - ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { + publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); + publicAccessAddr.port = pResult.getBeArrowFlightPort(); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index f3764cea233806..17448861dc06bb 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -283,6 +283,20 @@ message PFetchDataResult { optional bool empty_batch = 6; }; +message PFetchArrowDataRequest { + optional PUniqueId finst_id = 1; +}; + +message PFetchArrowDataResult { + optional PStatus status = 1; + // valid when status is ok + optional int64 packet_seq = 2; + optional bool eos = 3; + optional PBlock block = 4; + optional bool empty_batch = 5; + optional string timezone = 6; +}; + message PFetchArrowFlightSchemaRequest { optional PUniqueId finst_id = 1; }; @@ -292,6 +306,7 @@ message PFetchArrowFlightSchemaResult { // valid when status is ok optional bytes schema = 2; optional bytes be_arrow_flight_ip = 3; + optional int32 be_arrow_flight_port = 4; }; message KeyTuple { @@ -979,6 +994,7 @@ service PBackendService { rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult); rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); + rpc fetch_arrow_data(PFetchArrowDataRequest) returns (PFetchArrowDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc open_load_stream(POpenLoadStreamRequest) returns (POpenLoadStreamResponse); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult);