Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 13, 2023
1 parent 05722b4 commit 61f63a5
Show file tree
Hide file tree
Showing 44 changed files with 2,212 additions and 57 deletions.
1 change: 1 addition & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
heartbeat_result.backend_info.__set_http_port(config::webserver_port);
heartbeat_result.backend_info.__set_be_rpc_port(-1);
heartbeat_result.backend_info.__set_brpc_port(config::brpc_port);
heartbeat_result.backend_info.__set_arrow_flight_port(config::arrow_flight_port);
heartbeat_result.backend_info.__set_version(get_short_version());
heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
Expand Down
8 changes: 7 additions & 1 deletion be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
_fe_result_batch_queue.push_back(std::move(result));
}
_buffer_rows += num_rows;
_data_arrival.notify_one();
} else {
auto ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
Expand Down Expand Up @@ -209,18 +208,23 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {

Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) {
std::unique_lock<std::mutex> l(_lock);
LOG(INFO) << "11111111 d " << _buffer_rows << ", " << _packet_num;
if (!_status.ok()) {
LOG(INFO) << "11111111 h " << _buffer_rows << ", " << _packet_num;
return _status;
}
if (_is_cancelled) {
LOG(INFO) << "11111111 g " << _buffer_rows << ", " << _packet_num;
return Status::Cancelled("Cancelled");
}

while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
LOG(INFO) << "11111111 e " << _buffer_rows << ", " << _packet_num;
_data_arrival.wait_for(l, std::chrono::seconds(1));
}

if (_is_cancelled) {
LOG(INFO) << "11111111 f " << _buffer_rows << ", " << _packet_num;
return Status::Cancelled("Cancelled");
}

Expand All @@ -230,11 +234,13 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
_buffer_rows -= (*result)->num_rows();
_data_removal.notify_one();
_packet_num++;
LOG(INFO) << "11111111 c " << _buffer_rows << ", " << _packet_num;
return Status::OK();
}

// normal path end
if (_is_close) {
LOG(INFO) << "11111111 i " << _buffer_rows << ", " << _packet_num;
return Status::OK();
}
return Status::InternalError("Abnormal Ending");
Expand Down
48 changes: 29 additions & 19 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <stdint.h>

#include <chrono>
#include <thread>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <memory>
Expand All @@ -33,6 +36,7 @@
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/thread.h"
#include "util/uid_util.h"

namespace doris {

Expand All @@ -42,7 +46,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) {
// Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the
// actual size of all BufferControlBlock.
REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
// std::lock_guard<std::mutex> l(_buffer_map_lock);
return _buffer_map.size();
});
}
Expand Down Expand Up @@ -80,7 +84,8 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

{
std::lock_guard<std::mutex> l(_lock);
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
LOG(INFO) << "11111111 9 " << query_id << ", " << print_id(query_id);
_buffer_map.insert(std::make_pair(query_id, control_block));
// BufferControlBlock should destroy after max_timeout
// for exceed max_timeout FE will return timeout to client
Expand All @@ -95,8 +100,8 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
}

std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) {
// TODO(zhaochun): this lock can be bottleneck?
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
LOG(INFO) << "11111111 8 " << query_id;
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
Expand All @@ -108,14 +113,12 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU

void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
const RowDescriptor& row_desc) {
{
std::lock_guard<std::mutex> l(_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
_row_descriptor_map.insert(std::make_pair(query_id, row_desc));
}

RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
std::shared_lock<std::shared_mutex> rlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != iter) {
Expand All @@ -141,7 +144,9 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c
Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<arrow::RecordBatch>* result) {
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
LOG(INFO) << "11111111 7 " << finst_id;
if (cb == nullptr) {
LOG(INFO) << "11111111 j " << finst_id;
LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
return Status::InternalError("no result for this query");
}
Expand All @@ -150,27 +155,31 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
}

Status ResultBufferMgr::cancel(const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
if (_buffer_map.end() != iter) {
iter->second->cancel();
_buffer_map.erase(iter);
}
}

RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);
{
std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id);

if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
if (_row_descriptor_map.end() != row_desc_iter) {
_row_descriptor_map.erase(row_desc_iter);
}
}

return Status::OK();
}

Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator iter = _timeout_map.find(cancel_time);

LOG(INFO) << "11111111 3 " << query_id << ", " << cancel_time;
if (_timeout_map.end() == iter) {
_timeout_map.insert(
std::pair<time_t, std::vector<TUniqueId>>(cancel_time, std::vector<TUniqueId>()));
Expand Down Expand Up @@ -203,6 +212,7 @@ void ResultBufferMgr::cancel_thread() {

// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
LOG(INFO) << "11111111 4 " << query_to_cancel[i];
cancel(query_to_cancel[i]);
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -86,9 +87,11 @@ class ResultBufferMgr {
void cancel_thread();

// lock for buffer map
std::mutex _lock;
std::shared_mutex _buffer_map_lock;
// buffer block map
BufferMap _buffer_map;
// lock for descriptor map
std::shared_mutex _row_descriptor_map_lock;
// for arrow flight
RowDescriptorMap _row_descriptor_map;

Expand Down
12 changes: 11 additions & 1 deletion be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
"Schema RowDescriptor Not Found, queryid: {}", print_id(statement_->query_id))));
}
LOG(INFO) << "11111111 2 " << statement_->query_id;
std::shared_ptr<arrow::Schema> schema;
auto st = convert_to_arrow_schema(row_desc, &schema);
if (UNLIKELY(!st.ok())) {
Expand All @@ -57,12 +58,21 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
}

arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
CHECK(*out == nullptr);
// CHECK(*out == nullptr); // not nullptr
LOG(INFO) << "11111111 1 " << statement_->query_id;
*out = nullptr;
auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out);
if (UNLIKELY(!st.ok())) {
LOG(INFO) << "11111111 k " << statement_->query_id;
LOG(WARNING) << st.to_string();
ARROW_RETURN_NOT_OK(to_arrow_status(st));
}
if (*out != nullptr) {
std::stringstream ss;
arrow_pretty_print(*(*out), &ss);
LOG(INFO) << "11111111 m " << (*out)->num_rows() << ", " << (*out)->num_columns();
LOG(INFO) << "11111111 l " << ss.str();
}
return arrow::Status::OK();
}

Expand Down
37 changes: 37 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "service/point_query_executor.h"
#include "util/arrow/row_batch.h"
#include "util/async_io.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -704,6 +705,41 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
}
}

void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) {
LOG(INFO) << "fetch_arrow_flight_schema";
bool ret = _light_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor(
UniqueId(request->finst_id()).to_thrift());
if (row_desc.equals(RowDescriptor())) {
auto st = Status::NotFound("not found row descriptor");
st.to_protobuf(result->mutable_status());
return;
}

std::shared_ptr<arrow::Schema> schema;
auto st = convert_to_arrow_schema(row_desc, &schema);
if (UNLIKELY(!st.ok())) {
st.to_protobuf(result->mutable_status());
return;
}

std::string schema_str;
st = serialize_arrow_schema(row_desc, &schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
}
st.to_protobuf(result->mutable_status());
});
if (!ret) {
offer_failed(result, done, _heavy_work_pool);
return;
}
}

Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response) {
PointQueryExecutor lookup_util;
Expand Down Expand Up @@ -895,6 +931,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller,
const PFetchCacheRequest* request, PFetchCacheResult* result,
google::protobuf::Closure* done) {
LOG(INFO) << "fetch_cache";
bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->fetch(request, result);
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class PInternalServiceImpl : public PBackendService {
PFetchTableSchemaResult* result,
google::protobuf::Closure* done) override;

void fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) override;

void tablet_writer_open(google::protobuf::RpcController* controller,
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,
Expand Down
14 changes: 14 additions & 0 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "util/arrow/block_convertor.h"
#include "vec/core/block.h"

namespace doris {

Expand Down Expand Up @@ -188,4 +190,16 @@ Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::strin
return Status::OK();
}

Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr<arrow::Schema>* schema,
std::string* result) {
std::vector<SlotDescriptor*> slots;
for (auto tuple_desc : row_desc.tuple_descriptors()) {
slots.insert(slots.end(), tuple_desc->slots().begin(), tuple_desc->slots().end());
}
auto block = vectorized::Block(slots, 0);
std::shared_ptr<arrow::RecordBatch> batch;
RETURN_IF_ERROR(convert_to_arrow_batch(block, *schema, arrow::default_memory_pool(), &batch));
return serialize_record_batch(*batch, result);
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/util/arrow/row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,

Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);

Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr<arrow::Schema>* schema,
std::string* result);

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ public class Config extends ConfigBase {
@ConfField(description = {"FE MySQL server 的端口号", "The port of FE MySQL server"})
public static int query_port = 9030;

@ConfField(description = {"FE Flight-SQL server 的端口号", "The port of FE Flight-SQL server"})
public static int flight_sql_query_port = 9040;

@ConfField(description = {"MySQL 服务的 IO 线程数", "The number of IO threads in MySQL service"})
public static int mysql_service_io_threads_num = 4;

Expand Down
Loading

0 comments on commit 61f63a5

Please sign in to comment.