From 33d1051b584a84e08d2e1f4376f3941c8375c88b Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sun, 12 Nov 2023 11:57:46 +0800 Subject: [PATCH] [refactor](closure) remove ref count closure using auto release closure (#26718) 1. closure should be managed by a unique ptr and released by brpc , should not hold by our code. If hold by our code, we need to wait brpc finished during cancel or close. 2. closure should be exception safe, if any exception happens, should not memory leak. 3. using a specific callback interface to be implemented by Doris's code, we could write any code and doris should manage callback's lifecycle. 4. using a weak ptr between callback and closure. If callback is deconstruted before closure'Run, should not core. --- be/src/olap/delta_writer.cpp | 3 +- be/src/pipeline/exec/exchange_sink_buffer.cpp | 65 +++-- be/src/pipeline/exec/exchange_sink_buffer.h | 41 +-- be/src/service/internal_service.cpp | 44 ++- be/src/util/proto_util.h | 36 +-- be/src/util/ref_count_closure.h | 37 +-- be/src/vec/sink/vdata_stream_sender.cpp | 55 ++-- be/src/vec/sink/vdata_stream_sender.h | 40 ++- be/src/vec/sink/writer/vtablet_writer.cpp | 251 +++++++++--------- be/src/vec/sink/writer/vtablet_writer.h | 50 ++-- be/test/vec/runtime/vdata_stream_test.cpp | 4 + 11 files changed, 305 insertions(+), 321 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c78f7814c05f53..68b97e49cb39a1 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -282,8 +282,9 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); closure->cntl_->ignore_eovercrowded(); stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), - closure->response_.get(), closure.release()); + closure->response_.get(), closure.get()); + closure.release(); pull_callback->join(); if (pull_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 29933fcdd1571d..46364528458506 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -107,7 +107,7 @@ bool ExchangeSinkBuffer::is_pending_finish() { if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) { auto& rpc_ctx = _instance_to_rpc_ctx[id]; if (!rpc_ctx.is_cancelled) { - brpc::StartCancel(rpc_ctx._closure->cntl.call_id()); + brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id()); rpc_ctx.is_cancelled = true; } } @@ -245,21 +245,21 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (!request.exec_status.ok()) { request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); } - auto* closure = request.channel->get_closure(id, request.eos, nullptr); + auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr); - _instance_to_rpc_ctx[id]._closure = closure; + _instance_to_rpc_ctx[id]._send_callback = send_callback; _instance_to_rpc_ctx[id].is_cancelled = false; - closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + send_callback->cntl_->ignore_eovercrowded(); } - closure->addFailedHandler( + send_callback->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - closure->start_rpc_time = GetCurrentTimeNanos(); - closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->start_rpc_time = GetCurrentTimeNanos(); + send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is()) { @@ -275,11 +275,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(brpc_request, send_callback); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request, - request.channel->_brpc_dest_addr)); + RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), + std::move(send_remote_block_closure), + request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); + transmit_blockv2(*request.channel->_brpc_stub, + std::move(send_remote_block_closure)); } } if (request.block) { @@ -303,23 +309,24 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto statistic = brpc_request->mutable_query_statistics(); _statistics->to_pb(statistic); } - auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + auto send_callback = + request.channel->get_send_callback(id, request.eos, request.block_holder); ExchangeRpcContext rpc_ctx; - rpc_ctx._closure = closure; + rpc_ctx._send_callback = send_callback; rpc_ctx.is_cancelled = false; _instance_to_rpc_ctx[id] = rpc_ctx; - closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + send_callback->cntl_->ignore_eovercrowded(); } - closure->addFailedHandler( + send_callback->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - closure->start_rpc_time = GetCurrentTimeNanos(); - closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->start_rpc_time = GetCurrentTimeNanos(); + send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is()) { @@ -335,11 +342,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(brpc_request, send_callback); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request, - request.channel->_brpc_dest_addr)); + RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), + std::move(send_remote_block_closure), + request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); + transmit_blockv2(*request.channel->_brpc_stub, + std::move(send_remote_block_closure)); } } if (request.block_holder->get_block()) { @@ -359,7 +372,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { template void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { - _instance_to_request[id] = std::make_unique(); + _instance_to_request[id] = std::make_shared(); _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c04de2a51f3a5a..9111f553b2799b 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -36,6 +36,7 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" +#include "util/ref_count_closure.h" namespace doris { class PTransmitDataParams; @@ -107,10 +108,12 @@ struct BroadcastTransmitInfo { bool eos; }; -template -class SelfDeleteClosure : public google::protobuf::Closure { +template +class ExchangeSendCallback : public ::doris::DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(ExchangeSendCallback); + public: - SelfDeleteClosure() = default; + ExchangeSendCallback() = default; void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { _id = id; @@ -118,32 +121,35 @@ class SelfDeleteClosure : public google::protobuf::Closure { _data = data; } - ~SelfDeleteClosure() override = default; - SelfDeleteClosure(const SelfDeleteClosure& other) = delete; - SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; + ~ExchangeSendCallback() override = default; + ExchangeSendCallback(const ExchangeSendCallback& other) = delete; + ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete; void addFailedHandler( const std::function& fail_fn) { _fail_fn = fail_fn; } - void addSuccessHandler(const std::function& suc_fn) { + void addSuccessHandler(const std::function& suc_fn) { _suc_fn = suc_fn; } - void Run() noexcept override { + void call() noexcept override { try { if (_data) { _data->unref(); } - if (cntl.Failed()) { + if (::doris::DummyBrpcCallback::cntl_->Failed()) { std::string err = fmt::format( "failed to send brpc when exchange, error={}, error_text={}, client: {}, " "latency = {}", - berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(), - cntl.latency_us()); + berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()), + ::doris::DummyBrpcCallback::cntl_->ErrorText(), + BackendOptions::get_localhost(), + ::doris::DummyBrpcCallback::cntl_->latency_us()); _fail_fn(_id, err); } else { - _suc_fn(_id, _eos, result, start_rpc_time); + _suc_fn(_id, _eos, *(::doris::DummyBrpcCallback::response_), + start_rpc_time); } } catch (const std::exception& exp) { LOG(FATAL) << "brpc callback error: " << exp.what(); @@ -151,21 +157,18 @@ class SelfDeleteClosure : public google::protobuf::Closure { LOG(FATAL) << "brpc callback error."; } } - - brpc::Controller cntl; - T result; int64_t start_rpc_time; private: std::function _fail_fn; - std::function _suc_fn; + std::function _suc_fn; InstanceLoId _id; bool _eos; vectorized::BroadcastPBlockHolder* _data; }; struct ExchangeRpcContext { - SelfDeleteClosure* _closure = nullptr; + std::shared_ptr> _send_callback = nullptr; bool is_cancelled = false; }; @@ -208,7 +211,7 @@ class ExchangeSinkBuffer { // must init zero // TODO: make all flat_hash_map to a STRUT phmap::flat_hash_map _instance_to_seq; - phmap::flat_hash_map> _instance_to_request; + phmap::flat_hash_map> _instance_to_request; // One channel is corresponding to a downstream instance. phmap::flat_hash_map _rpc_channel_is_idle; // Number of busy channels; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 17f1c08da913ba..0c12e910c3391f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1518,37 +1518,35 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote return; } - PTabletWriteSlaveDoneRequest request; - request.set_txn_id(txn_id); - request.set_tablet_id(tablet_id); - request.set_node_id(node_id); - request.set_is_succeed(is_succeed); - RefCountClosure* closure = - new RefCountClosure(); - closure->ref(); - closure->ref(); - closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); - closure->cntl.ignore_eovercrowded(); - stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); - - closure->join(); - if (closure->cntl.Failed()) { + auto request = std::make_shared(); + request->set_txn_id(txn_id); + request->set_tablet_id(tablet_id); + request->set_node_id(node_id); + request->set_is_succeed(is_succeed); + auto pull_rowset_callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriteSlaveDoneRequest, + DummyBrpcCallback>::create_unique(request, + pull_rowset_callback); + closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); + closure->cntl_->ignore_eovercrowded(); + stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + + pull_rowset_callback->join(); + if (pull_rowset_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(stub, remote_host, brpc_port)) { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - closure->cntl.remote_side()); + closure->cntl_->remote_side()); } LOG(WARNING) << "failed to response result of slave replica to master replica, error=" - << berror(closure->cntl.ErrorCode()) - << ", error_text=" << closure->cntl.ErrorText() + << berror(pull_rowset_callback->cntl_->ErrorCode()) + << ", error_text=" << pull_rowset_callback->cntl_->ErrorText() << ", master host: " << remote_host << ", tablet_id=" << tablet_id << ", txn_id=" << txn_id; } - - if (closure->unref()) { - delete closure; - } - closure = nullptr; VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master " "replica. master host: " << remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index 2b8b524da543b2..26779977642288 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -38,9 +38,10 @@ constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31); // Embed column_values and brpc request serialization string in controller attachment. template -Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) { +Status request_embed_attachment_contain_blockv2(Params* brpc_request, + std::unique_ptr& closure) { auto block = brpc_request->block(); - Status st = request_embed_attachment(brpc_request, block.column_values(), closure); + Status st = request_embed_attachmentv2(brpc_request, block.column_values(), closure); block.set_column_values(""); return st; } @@ -59,32 +60,37 @@ inline bool enable_http_send_block(const PTransmitDataParams& request) { } template -void transmit_block(PBackendService_Stub& stub, Closure* closure, - const PTransmitDataParams& params) { - closure->cntl.http_request().Clear(); - stub.transmit_block(&closure->cntl, ¶ms, &closure->result, closure); +void transmit_blockv2(PBackendService_Stub& stub, std::unique_ptr closure) { + closure->cntl_->http_request().Clear(); + stub.transmit_block(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), + closure.get()); + closure.release(); } template -Status transmit_block_http(ExecEnv* exec_env, Closure* closure, PTransmitDataParams& params, - TNetworkAddress brpc_dest_addr) { - RETURN_IF_ERROR(request_embed_attachment_contain_block(¶ms, closure)); +Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr closure, + TNetworkAddress brpc_dest_addr) { + RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure)); //format an ipv6 address std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port); std::shared_ptr brpc_http_stub = exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); - closure->cntl.http_request().uri() = brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; - closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); - closure->cntl.http_request().set_content_type("application/json"); - brpc_http_stub->transmit_block_by_http(&closure->cntl, nullptr, &closure->result, closure); + closure->cntl_->http_request().uri() = + brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; + closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); + closure->cntl_->http_request().set_content_type("application/json"); + brpc_http_stub->transmit_block_by_http(closure->cntl_.get(), nullptr, closure->response_.get(), + closure.get()); + closure.release(); return Status::OK(); } template -Status request_embed_attachment(Params* brpc_request, const std::string& data, Closure* closure) { +Status request_embed_attachmentv2(Params* brpc_request, const std::string& data, + std::unique_ptr& closure) { butil::IOBuf attachment; // step1: serialize brpc_request to string, and append to attachment. @@ -106,7 +112,7 @@ Status request_embed_attachment(Params* brpc_request, const std::string& data, C data_size); } // step3: attachment add to closure. - closure->cntl.request_attachment().swap(attachment); + closure->cntl_->request_attachment().swap(attachment); return Status::OK(); } diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index d844a7fc820b90..14a136fcfd0c6e 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -21,39 +21,11 @@ #include -#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/brpc.h" namespace doris { -template -class RefCountClosure : public google::protobuf::Closure { -public: - RefCountClosure() : _refs(0) {} - ~RefCountClosure() override = default; - - void ref() { _refs.fetch_add(1); } - - // If unref() returns true, this object should be delete - bool unref() { return _refs.fetch_sub(1) == 1; } - - void Run() override { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - if (unref()) { - delete this; - } - } - - void join() { brpc::Join(cntl.call_id()); } - - brpc::Controller cntl; - T result; - -private: - std::atomic _refs; -}; - template class DummyBrpcCallback { ENABLE_FACTORY_CREATOR(DummyBrpcCallback); @@ -65,9 +37,11 @@ class DummyBrpcCallback { response_ = std::make_shared(); } - void call() {} + virtual ~DummyBrpcCallback() = default; + + virtual void call() {} - void join() { brpc::Join(cntl_->call_id()); } + virtual void join() { brpc::Join(cntl_->call_id()); } // controller has to be the same lifecycle with the closure, because brpc may use // it in any stage of the rpc. @@ -102,7 +76,7 @@ class AutoReleaseClosure : public google::protobuf::Closure { public: AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback) - : callback_(callback) { + : request_(req), callback_(callback) { this->cntl_ = callback->cntl_; this->response_ = callback->response_; } @@ -111,7 +85,6 @@ class AutoReleaseClosure : public google::protobuf::Closure { // Will delete itself void Run() override { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); Defer defer {[&]() { delete this; }}; // If lock failed, it means the callback object is deconstructed, then no need // to deal with the callback any more. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f5397e831d3159..c25f4a9ea61712 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -57,19 +57,19 @@ Status Channel::init(RuntimeState* state) { ", maybe version is not compatible."; return Status::InternalError("no brpc destination"); } - + _brpc_request = std::make_shared(); // initialize brpc request - _brpc_request.mutable_finst_id()->set_hi(_fragment_instance_id.hi); - _brpc_request.mutable_finst_id()->set_lo(_fragment_instance_id.lo); - _finst_id = _brpc_request.finst_id(); + _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi); + _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo); + _finst_id = _brpc_request->finst_id(); - _brpc_request.mutable_query_id()->set_hi(state->query_id().hi); - _brpc_request.mutable_query_id()->set_lo(state->query_id().lo); - _query_id = _brpc_request.query_id(); + _brpc_request->mutable_query_id()->set_hi(state->query_id().hi); + _brpc_request->mutable_query_id()->set_lo(state->query_id().lo); + _query_id = _brpc_request->query_id(); - _brpc_request.set_node_id(_dest_node_id); - _brpc_request.set_sender_id(_parent->sender_id()); - _brpc_request.set_be_number(_be_number); + _brpc_request->set_node_id(_dest_node_id); + _brpc_request->set_sender_id(_parent->sender_id()); + _brpc_request->set_be_number(_be_number); _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; @@ -174,49 +174,50 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s } SCOPED_TIMER(_parent->brpc_send_timer()); - if (_closure == nullptr) { - _closure = new RefCountClosure(); - _closure->ref(); + if (_send_remote_block_callback == nullptr) { + _send_remote_block_callback = DummyBrpcCallback::create_shared(); } else { RETURN_IF_ERROR(_wait_last_brpc()); SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - _closure->cntl.Reset(); + _send_remote_block_callback->cntl_->Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { - auto statistic = _brpc_request.mutable_query_statistics(); + auto statistic = _brpc_request->mutable_query_statistics(); _parent->query_statistics()->to_pb(statistic); } - _brpc_request.set_eos(eos); + _brpc_request->set_eos(eos); if (!exec_status.ok()) { - exec_status.to_protobuf(_brpc_request.mutable_exec_status()); + exec_status.to_protobuf(_brpc_request->mutable_exec_status()); } if (block != nullptr) { - _brpc_request.set_allocated_block(block); + _brpc_request->set_allocated_block(block); } - _brpc_request.set_packet_seq(_packet_seq++); + _brpc_request->set_packet_seq(_packet_seq++); - _closure->ref(); - _closure->cntl.set_timeout_ms(_brpc_timeout_ms); + _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - _closure->cntl.ignore_eovercrowded(); + _send_remote_block_callback->cntl_->ignore_eovercrowded(); } { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(_brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, _brpc_request, - _brpc_dest_addr)); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(_brpc_request, _send_remote_block_callback); + if (enable_http_send_block(*_brpc_request)) { + RETURN_IF_ERROR(transmit_block_httpv2( + _state->exec_env(), std::move(send_remote_block_closure), _brpc_dest_addr)); } else { - transmit_block(*_brpc_stub, _closure, _brpc_request); + transmit_blockv2(*_brpc_stub, std::move(send_remote_block_closure)); } } if (block != nullptr) { - static_cast(_brpc_request.release_block()); + static_cast(_brpc_request->release_block()); } return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 10116be03346ae..bcc050e25bd865 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -261,11 +261,7 @@ class Channel { _ch_cur_pb_block = &_ch_pb_block1; } - virtual ~Channel() { - if (_closure != nullptr && _closure->unref()) { - delete _closure; - } - } + virtual ~Channel() = default; // Initialize channel. // Returns OK if successful, error indication otherwise. @@ -337,22 +333,22 @@ class Channel { Status _wait_last_brpc() { SCOPED_TIMER(_parent->brpc_wait_timer()); - if (_closure == nullptr) { + if (_send_remote_block_callback == nullptr) { return Status::OK(); } - auto cntl = &_closure->cntl; - auto call_id = _closure->cntl.call_id(); - brpc::Join(call_id); - _receiver_status = Status::create(_closure->result.status()); - if (cntl->Failed()) { + _send_remote_block_callback->join(); + if (_send_remote_block_callback->cntl_->Failed()) { std::string err = fmt::format( "failed to send brpc batch, error={}, error_text={}, client: {}, " "latency = {}", - berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), - cntl->latency_us()); + berror(_send_remote_block_callback->cntl_->ErrorCode()), + _send_remote_block_callback->cntl_->ErrorText(), + BackendOptions::get_localhost(), + _send_remote_block_callback->cntl_->latency_us()); LOG(WARNING) << err; return Status::RpcError(err); } + _receiver_status = Status::create(_send_remote_block_callback->response_->status()); return _receiver_status; } @@ -380,9 +376,9 @@ class Channel { PUniqueId _finst_id; PUniqueId _query_id; PBlock _pb_block; - PTransmitDataParams _brpc_request; + std::shared_ptr _brpc_request; std::shared_ptr _brpc_stub = nullptr; - RefCountClosure* _closure = nullptr; + std::shared_ptr> _send_remote_block_callback = nullptr; Status _receiver_status; int32_t _brpc_timeout_ms = 500; // whether the dest can be treated as query statistics transfer chain. @@ -540,15 +536,15 @@ class PipChannel final : public Channel { _buffer->register_sink(Channel::_fragment_instance_id); } - pipeline::SelfDeleteClosure* get_closure( + std::shared_ptr> get_send_callback( InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { - if (!_closure) { - _closure.reset(new pipeline::SelfDeleteClosure()); + if (!_send_callback) { + _send_callback = pipeline::ExchangeSendCallback::create_shared(); } else { - _closure->cntl.Reset(); + _send_callback->cntl_->Reset(); } - _closure->init(id, eos, data); - return _closure.get(); + _send_callback->init(id, eos, data); + return _send_callback; } std::shared_ptr get_local_channel_dependency() { @@ -564,7 +560,7 @@ class PipChannel final : public Channel { pipeline::ExchangeSinkBuffer* _buffer = nullptr; bool _eos_send = false; - std::unique_ptr> _closure = nullptr; + std::shared_ptr> _send_callback = nullptr; std::unique_ptr _pblock; }; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 75e813b134791e..f3270c4b9fff28 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -273,26 +273,13 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i _index_channel(index_channel), _node_id(node_id), _is_incremental(is_incremental) { + _cur_add_block_request = std::make_shared(); _node_channel_tracker = std::make_shared(fmt::format( "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), thread_context()->get_thread_id())); } -VNodeChannel::~VNodeChannel() { - for (auto& closure : _open_closures) { - if (closure != nullptr) { - if (closure->unref()) { - delete closure; - } - closure = nullptr; - } - } - if (_add_block_closure != nullptr) { - delete _add_block_closure; - _add_block_closure = nullptr; - } - static_cast(_cur_add_block_request.release_id()); -} +VNodeChannel::~VNodeChannel() = default; void VNodeChannel::clear_all_blocks() { std::lock_guard lg(_pending_batches_lock); @@ -334,13 +321,13 @@ Status VNodeChannel::init(RuntimeState* state) { _timeout_watch.start(); // Initialize _cur_add_block_request - if (!_cur_add_block_request.has_id()) { - _cur_add_block_request.set_allocated_id(&_parent->_load_id); + if (!_cur_add_block_request->has_id()) { + *(_cur_add_block_request->mutable_id()) = _parent->_load_id; } - _cur_add_block_request.set_index_id(_index_channel->_index_id); - _cur_add_block_request.set_sender_id(_parent->_sender_id); - _cur_add_block_request.set_backend_id(_node_id); - _cur_add_block_request.set_eos(false); + _cur_add_block_request->set_index_id(_index_channel->_index_id); + _cur_add_block_request->set_sender_id(_parent->_sender_id); + _cur_add_block_request->set_backend_id(_node_id); + _cur_add_block_request->set_eos(false); _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); // The node channel will send _batch_size rows of data each rpc. When the @@ -355,46 +342,48 @@ Status VNodeChannel::init(RuntimeState* state) { void VNodeChannel::_open_internal(bool is_incremental) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - PTabletWriterOpenRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_txn_id(_parent->_txn_id); - request.set_allocated_schema(_parent->_schema->to_protobuf()); + auto request = std::make_shared(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_txn_id(_parent->_txn_id); + request->set_allocated_schema(_parent->_schema->to_protobuf()); std::set deduper; for (auto& tablet : _all_tablets) { if (deduper.contains(tablet.tablet_id)) { continue; } - auto ptablet = request.add_tablets(); + auto ptablet = request->add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); } - request.set_num_senders(_parent->_num_senders); - request.set_need_gen_rollup(false); // Useless but it is a required field in pb - request.set_load_mem_limit(_parent->_load_mem_limit); - request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); - request.set_is_high_priority(_parent->_is_high_priority); - request.set_sender_ip(BackendOptions::get_localhost()); - request.set_is_vectorized(true); - request.set_backend_id(_node_id); - request.set_enable_profile(_state->enable_profile()); - request.set_is_incremental(is_incremental); - - auto* open_closure = new RefCountClosure {}; - open_closure->ref(); - - open_closure->ref(); // This ref is for RPC's reference - open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + request->set_num_senders(_parent->_num_senders); + request->set_need_gen_rollup(false); // Useless but it is a required field in pb + request->set_load_mem_limit(_parent->_load_mem_limit); + request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + request->set_is_high_priority(_parent->_is_high_priority); + request->set_sender_ip(BackendOptions::get_localhost()); + request->set_is_vectorized(true); + request->set_backend_id(_node_id); + request->set_enable_profile(_state->enable_profile()); + request->set_is_incremental(is_incremental); + + auto open_callback = DummyBrpcCallback::create_shared(); + auto open_closure = AutoReleaseClosure< + PTabletWriterOpenRequest, + DummyBrpcCallback>::create_unique(request, open_callback); + open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); if (config::tablet_writer_ignore_eovercrowded) { - open_closure->cntl.ignore_eovercrowded(); + open_callback->cntl_->ignore_eovercrowded(); } // the real transmission here. the corresponding BE's load mgr will open load channel for it. - _stub->tablet_writer_open(&open_closure->cntl, &request, &open_closure->result, open_closure); - _open_closures.push_back(open_closure); + _stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(), + open_closure->response_.get(), open_closure.get()); + open_closure.release(); + _open_callbacks.push_back(open_callback); - static_cast(request.release_id()); - static_cast(request.release_schema()); + static_cast(request->release_id()); + static_cast(request->release_schema()); } void VNodeChannel::open() { @@ -407,36 +396,28 @@ void VNodeChannel::incremental_open() { Status VNodeChannel::open_wait() { Status status; - for (auto& open_closure : _open_closures) { + for (auto& open_callback : _open_callbacks) { // because of incremental open, we will wait multi times. so skip the closures which have been checked and set to nullptr in previous rounds - if (open_closure == nullptr) { + if (open_callback == nullptr) { continue; } - open_closure->join(); + open_callback->join(); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - if (open_closure->cntl.Failed()) { + if (open_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( _stub, _node_info.host, _node_info.brpc_port)) { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - open_closure->cntl.remote_side()); + open_callback->cntl_->remote_side()); } _cancelled = true; - auto error_code = open_closure->cntl.ErrorCode(); - auto error_text = open_closure->cntl.ErrorText(); - if (open_closure->unref()) { - delete open_closure; - } - open_closure = nullptr; + auto error_code = open_callback->cntl_->ErrorCode(); + auto error_text = open_callback->cntl_->ErrorText(); return Status::InternalError( "failed to open tablet writer, error={}, error_text={}, info={}", berror(error_code), error_text, channel_info()); } - status = Status::create(open_closure->result.status()); - if (open_closure->unref()) { - delete open_closure; - } - open_closure = nullptr; + status = Status::create(open_callback->response_->status()); if (!status.ok()) { _cancelled = true; @@ -445,11 +426,11 @@ Status VNodeChannel::open_wait() { } // add block closure - _add_block_closure = ReusableClosure::create(); - _add_block_closure->addFailedHandler( + _send_block_callback = WriteBlockCallback::create_shared(); + _send_block_callback->addFailedHandler( [this](bool is_last_rpc) { _add_block_failed_callback(is_last_rpc); }); - _add_block_closure->addSuccessHandler( + _send_block_callback->addSuccessHandler( [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { _add_block_success_callback(result, is_last_rpc); }); @@ -501,12 +482,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, for (auto column : block->get_columns()) { columns.push_back(std::move(*column).mutate()); } - *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), tablets.end()}; - _cur_add_block_request.set_is_single_tablet_block(true); + *_cur_add_block_request->mutable_tablet_ids() = {tablets.begin(), tablets.end()}; + _cur_add_block_request->set_is_single_tablet_block(true); } else { block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); for (auto tablet_id : payload->second) { - _cur_add_block_request.add_tablet_ids(tablet_id); + _cur_add_block_request->add_tablet_ids(tablet_id); } } @@ -517,9 +498,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, std::lock_guard l(_pending_batches_lock); // To simplify the add_row logic, postpone adding block into req until the time of sending req _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); - _cur_add_block_request.set_eos( + _cur_add_block_request->set_eos( false); // for multi-add, only when marking close we set it eos. - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + // Copy the request to tmp request to add to pend block queue + auto tmp_add_block_request = std::make_shared(); + *tmp_add_block_request = *_cur_add_block_request; + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); _pending_batches_num++; VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this << " pending_batches_bytes:" << _pending_batches_bytes @@ -527,7 +511,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, << " loadinfo:" << _load_info; } _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); - _cur_add_block_request.clear_tablet_ids(); + _cur_add_block_request->clear_tablet_ids(); } return Status::OK(); @@ -540,7 +524,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, } // set closure for sending block. - if (!_add_block_closure->try_set_in_flight()) { + if (!_send_block_callback->try_set_in_flight()) { // There is packet in flight, skip. return _send_finished ? 0 : 1; } @@ -551,12 +535,12 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); // sending finished. clear in flight - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); } // in_flight is cleared in closure::Run } else { // sending finished. clear in flight - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); } return _send_finished ? 0 : 1; } @@ -607,20 +591,21 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { auto request = std::move(send_block.second); // doesn't need to be saved in heap // tablet_ids has already set when add row - request.set_packet_seq(_next_packet_seq); + request->set_packet_seq(_next_packet_seq); auto block = mutable_block->to_block(); - CHECK(block.rows() == request.tablet_ids_size()) - << "block rows: " << block.rows() << ", tablet_ids_size: " << request.tablet_ids_size(); + CHECK(block.rows() == request->tablet_ids_size()) + << "block rows: " << block.rows() + << ", tablet_ids_size: " << request->tablet_ids_size(); if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; - Status st = block.serialize(state->be_exec_version(), request.mutable_block(), + Status st = block.serialize(state->be_exec_version(), request->mutable_block(), &uncompressed_bytes, &compressed_bytes, state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { @@ -632,29 +617,29 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { - if (remain_ms <= 0 && !request.eos()) { + if (remain_ms <= 0 && !request->eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } else { remain_ms = config::min_load_rpc_timeout_ms; } } - _add_block_closure->reset(); - _add_block_closure->cntl.set_timeout_ms(remain_ms); + _send_block_callback->reset(); + _send_block_callback->cntl_->set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { - _add_block_closure->cntl.ignore_eovercrowded(); + _send_block_callback->cntl_->ignore_eovercrowded(); } - if (request.eos()) { + if (request->eos()) { for (auto pid : _parent->_tablet_finder->partition_ids()) { - request.add_partition_ids(pid); + request->add_partition_ids(pid); } - request.set_write_single_replica(false); + request->set_write_single_replica(false); if (_parent->_write_single_replica) { - request.set_write_single_replica(true); + request->set_write_single_replica(true); for (std::unordered_map>::iterator iter = _slave_tablet_nodes.begin(); iter != _slave_tablet_nodes.end(); iter++) { @@ -670,24 +655,27 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { pnode->set_host(node->host); pnode->set_async_internal_port(node->brpc_port); } - request.mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); + request->mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); } } - // eos request must be the last request. it's a signal makeing callback function to set _add_batch_finished true. - _add_block_closure->end_mark(); + // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. + _send_block_callback->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; } - if (_parent->_transfer_large_data_by_brpc && request.has_block() && - request.block().has_column_values() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { - Status st = request_embed_attachment_contain_block< - PTabletWriterAddBlockRequest, ReusableClosure>( - &request, _add_block_closure); + auto send_block_closure = AutoReleaseClosure< + PTabletWriterAddBlockRequest, + WriteBlockCallback>::create_unique(request, + _send_block_callback); + if (_parent->_transfer_large_data_by_brpc && request->has_block() && + request->block().has_column_values() && request->ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_blockv2(send_block_closure->request_.get(), + send_block_closure); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } @@ -696,23 +684,26 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { std::shared_ptr _brpc_http_stub = _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); - _add_block_closure->cntl.http_request().uri() = + _send_block_callback->cntl_->http_request().uri() = brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; - _add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); - _add_block_closure->cntl.http_request().set_content_type("application/json"); + _send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); + _send_block_callback->cntl_->http_request().set_content_type("application/json"); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - _brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, nullptr, - &_add_block_closure->result, - _add_block_closure); + _brpc_http_stub->tablet_writer_add_block_by_http( + send_block_closure->cntl_.get(), nullptr, send_block_closure->response_.get(), + send_block_closure.get()); + send_block_closure.release(); } } else { - _add_block_closure->cntl.http_request().Clear(); + _send_block_callback->cntl_->http_request().Clear(); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, - &_add_block_closure->result, _add_block_closure); + _stub->tablet_writer_add_block( + send_block_closure->cntl_.get(), send_block_closure->request_.get(), + send_block_closure->response_.get(), send_block_closure.get()); + send_block_closure.release(); } } @@ -805,11 +796,11 @@ void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { } SCOPED_ATTACH_TASK(_state); // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed( - this, - fmt::format("rpc failed, error coed:{}, error text:{}", - _add_block_closure->cntl.ErrorCode(), _add_block_closure->cntl.ErrorText()), - -1); + _index_channel->mark_as_failed(this, + fmt::format("rpc failed, error coed:{}, error text:{}", + _send_block_callback->cntl_->ErrorCode(), + _send_block_callback->cntl_->ErrorText()), + -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); @@ -835,24 +826,28 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { // But do we need brpc::StartCancel(call_id)? _cancel_with_msg(cancel_msg); - PTabletWriterCancelRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_sender_id(_parent->_sender_id); + auto request = std::make_shared(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_sender_id(_parent->_sender_id); - auto closure = new RefCountClosure(); + auto cancel_callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriterCancelRequest, + DummyBrpcCallback>::create_unique(request, cancel_callback); - closure->ref(); int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { remain_ms = config::min_load_rpc_timeout_ms; } - closure->cntl.set_timeout_ms(remain_ms); + cancel_callback->cntl_->set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + closure->cntl_->ignore_eovercrowded(); } - _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); - static_cast(request.release_id()); + _stub->tablet_writer_cancel(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + static_cast(request->release_id()); } bool VNodeChannel::is_send_data_rpc_done() const { @@ -912,17 +907,19 @@ void VNodeChannel::mark_close() { return; } - _cur_add_block_request.set_eos(true); + _cur_add_block_request->set_eos(true); { std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { // add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } + auto tmp_add_block_request = + std::make_shared(*_cur_add_block_request); // when prepare to close, add block to queue so that try_send_pending_block thread will send it. - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); _pending_batches_num++; - DCHECK(_pending_blocks.back().second.eos()); + DCHECK(_pending_blocks.back().second->eos()); _close_time_ms = UnixMillis(); LOG(INFO) << channel_info() << " mark closed, left pending batch size: " << _pending_blocks.size(); @@ -1391,7 +1388,7 @@ Status VTabletWriter::close(Status exec_status) { SCOPED_TIMER(_close_timer); SCOPED_TIMER(_profile->total_time_counter()); - // will make the last batch of request. close_wait will wait this finished. + // will make the last batch of request-> close_wait will wait this finished. static_cast(try_close(_state, exec_status)); // If _close_status is not ok, all nodes have been canceled in try_close. diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 7fee45be371080..ea998a0f0b4878 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -65,6 +65,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "util/countdown_latch.h" +#include "util/ref_count_closure.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" @@ -88,8 +89,6 @@ class TExpr; class Thread; class ThreadPoolToken; class TupleDescriptor; -template -class RefCountClosure; namespace vectorized { @@ -120,26 +119,21 @@ struct AddBatchCounter { // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure* create() { return new ReusableClosure(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + ~WriteBlockCallback() override = default; void addFailedHandler(const std::function& fn) { failed_handler = fn; } void addSuccessHandler(const std::function& fn) { success_handler = fn; } - void join() { + void join() override { // We rely on in_flight to assure one rpc is running, // while cid is not reliable due to memory order. // in_flight is written before getting callid, @@ -159,8 +153,8 @@ class ReusableClosure final : public google::protobuf::Closure { // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - cid = cntl.call_id(); + ::doris::DummyBrpcCallback::cntl_->Reset(); + cid = ::doris::DummyBrpcCallback::cntl_->call_id(); } // if _packet_in_flight == false, set it to true. Return true. @@ -179,21 +173,19 @@ class ReusableClosure final : public google::protobuf::Closure { _is_last_rpc = true; } - void Run() override { + void call() override { DCHECK(_packet_in_flight); - if (cntl.Failed()) { - LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) - << ", error_text=" << cntl.ErrorText(); + if (::doris::DummyBrpcCallback::cntl_->Failed()) { + LOG(WARNING) << "failed to send brpc batch, error=" + << berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()) + << ", error_text=" << ::doris::DummyBrpcCallback::cntl_->ErrorText(); failed_handler(_is_last_rpc); } else { - success_handler(result, _is_last_rpc); + success_handler(*(::doris::DummyBrpcCallback::response_), _is_last_rpc); } clear_in_flight(); } - brpc::Controller cntl; - T result; - private: brpc::CallId cid; std::atomic _packet_in_flight {false}; @@ -381,7 +373,7 @@ class VNodeChannel { std::shared_ptr _stub = nullptr; // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block. - std::vector*> _open_closures; + std::vector>> _open_callbacks; std::vector _all_tablets; // map from tablet_id to node_id where slave replicas locate in @@ -413,12 +405,12 @@ class VNodeChannel { // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty. std::unique_ptr _cur_mutable_block; - PTabletWriterAddBlockRequest _cur_add_block_request; + std::shared_ptr _cur_add_block_request; - using AddBlockReq = - std::pair, PTabletWriterAddBlockRequest>; + using AddBlockReq = std::pair, + std::shared_ptr>; std::queue _pending_blocks; - ReusableClosure* _add_block_closure = nullptr; + std::shared_ptr> _send_block_callback = nullptr; bool _is_incremental; }; diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 80adee73749509..c1ce87fe56dabd 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -84,6 +84,10 @@ class LocalMockBackendService : public PBackendService { << ", fragment_instance_id=" << print_id(request->finst_id()) << ", node=" << request->node_id(); } + if (done != nullptr) { + st.to_protobuf(response->mutable_status()); + done->Run(); + } } private: