diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c78f7814c05f53..68b97e49cb39a1 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -282,8 +282,9 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); closure->cntl_->ignore_eovercrowded(); stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), - closure->response_.get(), closure.release()); + closure->response_.get(), closure.get()); + closure.release(); pull_callback->join(); if (pull_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 29933fcdd1571d..46364528458506 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -107,7 +107,7 @@ bool ExchangeSinkBuffer::is_pending_finish() { if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) { auto& rpc_ctx = _instance_to_rpc_ctx[id]; if (!rpc_ctx.is_cancelled) { - brpc::StartCancel(rpc_ctx._closure->cntl.call_id()); + brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id()); rpc_ctx.is_cancelled = true; } } @@ -245,21 +245,21 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (!request.exec_status.ok()) { request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); } - auto* closure = request.channel->get_closure(id, request.eos, nullptr); + auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr); - _instance_to_rpc_ctx[id]._closure = closure; + _instance_to_rpc_ctx[id]._send_callback = send_callback; _instance_to_rpc_ctx[id].is_cancelled = false; - closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + send_callback->cntl_->ignore_eovercrowded(); } - closure->addFailedHandler( + send_callback->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - closure->start_rpc_time = GetCurrentTimeNanos(); - closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->start_rpc_time = GetCurrentTimeNanos(); + send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is()) { @@ -275,11 +275,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(brpc_request, send_callback); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request, - request.channel->_brpc_dest_addr)); + RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), + std::move(send_remote_block_closure), + request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); + transmit_blockv2(*request.channel->_brpc_stub, + std::move(send_remote_block_closure)); } } if (request.block) { @@ -303,23 +309,24 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto statistic = brpc_request->mutable_query_statistics(); _statistics->to_pb(statistic); } - auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + auto send_callback = + request.channel->get_send_callback(id, request.eos, request.block_holder); ExchangeRpcContext rpc_ctx; - rpc_ctx._closure = closure; + rpc_ctx._send_callback = send_callback; rpc_ctx.is_cancelled = false; _instance_to_rpc_ctx[id] = rpc_ctx; - closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + send_callback->cntl_->ignore_eovercrowded(); } - closure->addFailedHandler( + send_callback->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - closure->start_rpc_time = GetCurrentTimeNanos(); - closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result, - const int64_t& start_rpc_time) { + send_callback->start_rpc_time = GetCurrentTimeNanos(); + send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result, + const int64_t& start_rpc_time) { set_rpc_time(id, start_rpc_time, result.receive_time()); Status s(Status::create(result.status())); if (s.is()) { @@ -335,11 +342,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { }); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(brpc_request, send_callback); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request, - request.channel->_brpc_dest_addr)); + RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), + std::move(send_remote_block_closure), + request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); + transmit_blockv2(*request.channel->_brpc_stub, + std::move(send_remote_block_closure)); } } if (request.block_holder->get_block()) { @@ -359,7 +372,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { template void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { - _instance_to_request[id] = std::make_unique(); + _instance_to_request[id] = std::make_shared(); _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c04de2a51f3a5a..9111f553b2799b 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -36,6 +36,7 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" +#include "util/ref_count_closure.h" namespace doris { class PTransmitDataParams; @@ -107,10 +108,12 @@ struct BroadcastTransmitInfo { bool eos; }; -template -class SelfDeleteClosure : public google::protobuf::Closure { +template +class ExchangeSendCallback : public ::doris::DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(ExchangeSendCallback); + public: - SelfDeleteClosure() = default; + ExchangeSendCallback() = default; void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { _id = id; @@ -118,32 +121,35 @@ class SelfDeleteClosure : public google::protobuf::Closure { _data = data; } - ~SelfDeleteClosure() override = default; - SelfDeleteClosure(const SelfDeleteClosure& other) = delete; - SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; + ~ExchangeSendCallback() override = default; + ExchangeSendCallback(const ExchangeSendCallback& other) = delete; + ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete; void addFailedHandler( const std::function& fail_fn) { _fail_fn = fail_fn; } - void addSuccessHandler(const std::function& suc_fn) { + void addSuccessHandler(const std::function& suc_fn) { _suc_fn = suc_fn; } - void Run() noexcept override { + void call() noexcept override { try { if (_data) { _data->unref(); } - if (cntl.Failed()) { + if (::doris::DummyBrpcCallback::cntl_->Failed()) { std::string err = fmt::format( "failed to send brpc when exchange, error={}, error_text={}, client: {}, " "latency = {}", - berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(), - cntl.latency_us()); + berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()), + ::doris::DummyBrpcCallback::cntl_->ErrorText(), + BackendOptions::get_localhost(), + ::doris::DummyBrpcCallback::cntl_->latency_us()); _fail_fn(_id, err); } else { - _suc_fn(_id, _eos, result, start_rpc_time); + _suc_fn(_id, _eos, *(::doris::DummyBrpcCallback::response_), + start_rpc_time); } } catch (const std::exception& exp) { LOG(FATAL) << "brpc callback error: " << exp.what(); @@ -151,21 +157,18 @@ class SelfDeleteClosure : public google::protobuf::Closure { LOG(FATAL) << "brpc callback error."; } } - - brpc::Controller cntl; - T result; int64_t start_rpc_time; private: std::function _fail_fn; - std::function _suc_fn; + std::function _suc_fn; InstanceLoId _id; bool _eos; vectorized::BroadcastPBlockHolder* _data; }; struct ExchangeRpcContext { - SelfDeleteClosure* _closure = nullptr; + std::shared_ptr> _send_callback = nullptr; bool is_cancelled = false; }; @@ -208,7 +211,7 @@ class ExchangeSinkBuffer { // must init zero // TODO: make all flat_hash_map to a STRUT phmap::flat_hash_map _instance_to_seq; - phmap::flat_hash_map> _instance_to_request; + phmap::flat_hash_map> _instance_to_request; // One channel is corresponding to a downstream instance. phmap::flat_hash_map _rpc_channel_is_idle; // Number of busy channels; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 17f1c08da913ba..0c12e910c3391f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1518,37 +1518,35 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote return; } - PTabletWriteSlaveDoneRequest request; - request.set_txn_id(txn_id); - request.set_tablet_id(tablet_id); - request.set_node_id(node_id); - request.set_is_succeed(is_succeed); - RefCountClosure* closure = - new RefCountClosure(); - closure->ref(); - closure->ref(); - closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); - closure->cntl.ignore_eovercrowded(); - stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); - - closure->join(); - if (closure->cntl.Failed()) { + auto request = std::make_shared(); + request->set_txn_id(txn_id); + request->set_tablet_id(tablet_id); + request->set_node_id(node_id); + request->set_is_succeed(is_succeed); + auto pull_rowset_callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriteSlaveDoneRequest, + DummyBrpcCallback>::create_unique(request, + pull_rowset_callback); + closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); + closure->cntl_->ignore_eovercrowded(); + stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + + pull_rowset_callback->join(); + if (pull_rowset_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(stub, remote_host, brpc_port)) { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - closure->cntl.remote_side()); + closure->cntl_->remote_side()); } LOG(WARNING) << "failed to response result of slave replica to master replica, error=" - << berror(closure->cntl.ErrorCode()) - << ", error_text=" << closure->cntl.ErrorText() + << berror(pull_rowset_callback->cntl_->ErrorCode()) + << ", error_text=" << pull_rowset_callback->cntl_->ErrorText() << ", master host: " << remote_host << ", tablet_id=" << tablet_id << ", txn_id=" << txn_id; } - - if (closure->unref()) { - delete closure; - } - closure = nullptr; VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master " "replica. master host: " << remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index 2b8b524da543b2..26779977642288 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -38,9 +38,10 @@ constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31); // Embed column_values and brpc request serialization string in controller attachment. template -Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) { +Status request_embed_attachment_contain_blockv2(Params* brpc_request, + std::unique_ptr& closure) { auto block = brpc_request->block(); - Status st = request_embed_attachment(brpc_request, block.column_values(), closure); + Status st = request_embed_attachmentv2(brpc_request, block.column_values(), closure); block.set_column_values(""); return st; } @@ -59,32 +60,37 @@ inline bool enable_http_send_block(const PTransmitDataParams& request) { } template -void transmit_block(PBackendService_Stub& stub, Closure* closure, - const PTransmitDataParams& params) { - closure->cntl.http_request().Clear(); - stub.transmit_block(&closure->cntl, ¶ms, &closure->result, closure); +void transmit_blockv2(PBackendService_Stub& stub, std::unique_ptr closure) { + closure->cntl_->http_request().Clear(); + stub.transmit_block(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), + closure.get()); + closure.release(); } template -Status transmit_block_http(ExecEnv* exec_env, Closure* closure, PTransmitDataParams& params, - TNetworkAddress brpc_dest_addr) { - RETURN_IF_ERROR(request_embed_attachment_contain_block(¶ms, closure)); +Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr closure, + TNetworkAddress brpc_dest_addr) { + RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure)); //format an ipv6 address std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port); std::shared_ptr brpc_http_stub = exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); - closure->cntl.http_request().uri() = brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; - closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); - closure->cntl.http_request().set_content_type("application/json"); - brpc_http_stub->transmit_block_by_http(&closure->cntl, nullptr, &closure->result, closure); + closure->cntl_->http_request().uri() = + brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; + closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); + closure->cntl_->http_request().set_content_type("application/json"); + brpc_http_stub->transmit_block_by_http(closure->cntl_.get(), nullptr, closure->response_.get(), + closure.get()); + closure.release(); return Status::OK(); } template -Status request_embed_attachment(Params* brpc_request, const std::string& data, Closure* closure) { +Status request_embed_attachmentv2(Params* brpc_request, const std::string& data, + std::unique_ptr& closure) { butil::IOBuf attachment; // step1: serialize brpc_request to string, and append to attachment. @@ -106,7 +112,7 @@ Status request_embed_attachment(Params* brpc_request, const std::string& data, C data_size); } // step3: attachment add to closure. - closure->cntl.request_attachment().swap(attachment); + closure->cntl_->request_attachment().swap(attachment); return Status::OK(); } diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index d844a7fc820b90..14a136fcfd0c6e 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -21,39 +21,11 @@ #include -#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "service/brpc.h" namespace doris { -template -class RefCountClosure : public google::protobuf::Closure { -public: - RefCountClosure() : _refs(0) {} - ~RefCountClosure() override = default; - - void ref() { _refs.fetch_add(1); } - - // If unref() returns true, this object should be delete - bool unref() { return _refs.fetch_sub(1) == 1; } - - void Run() override { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - if (unref()) { - delete this; - } - } - - void join() { brpc::Join(cntl.call_id()); } - - brpc::Controller cntl; - T result; - -private: - std::atomic _refs; -}; - template class DummyBrpcCallback { ENABLE_FACTORY_CREATOR(DummyBrpcCallback); @@ -65,9 +37,11 @@ class DummyBrpcCallback { response_ = std::make_shared(); } - void call() {} + virtual ~DummyBrpcCallback() = default; + + virtual void call() {} - void join() { brpc::Join(cntl_->call_id()); } + virtual void join() { brpc::Join(cntl_->call_id()); } // controller has to be the same lifecycle with the closure, because brpc may use // it in any stage of the rpc. @@ -102,7 +76,7 @@ class AutoReleaseClosure : public google::protobuf::Closure { public: AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback) - : callback_(callback) { + : request_(req), callback_(callback) { this->cntl_ = callback->cntl_; this->response_ = callback->response_; } @@ -111,7 +85,6 @@ class AutoReleaseClosure : public google::protobuf::Closure { // Will delete itself void Run() override { - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); Defer defer {[&]() { delete this; }}; // If lock failed, it means the callback object is deconstructed, then no need // to deal with the callback any more. diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f5397e831d3159..c25f4a9ea61712 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -57,19 +57,19 @@ Status Channel::init(RuntimeState* state) { ", maybe version is not compatible."; return Status::InternalError("no brpc destination"); } - + _brpc_request = std::make_shared(); // initialize brpc request - _brpc_request.mutable_finst_id()->set_hi(_fragment_instance_id.hi); - _brpc_request.mutable_finst_id()->set_lo(_fragment_instance_id.lo); - _finst_id = _brpc_request.finst_id(); + _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi); + _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo); + _finst_id = _brpc_request->finst_id(); - _brpc_request.mutable_query_id()->set_hi(state->query_id().hi); - _brpc_request.mutable_query_id()->set_lo(state->query_id().lo); - _query_id = _brpc_request.query_id(); + _brpc_request->mutable_query_id()->set_hi(state->query_id().hi); + _brpc_request->mutable_query_id()->set_lo(state->query_id().lo); + _query_id = _brpc_request->query_id(); - _brpc_request.set_node_id(_dest_node_id); - _brpc_request.set_sender_id(_parent->sender_id()); - _brpc_request.set_be_number(_be_number); + _brpc_request->set_node_id(_dest_node_id); + _brpc_request->set_sender_id(_parent->sender_id()); + _brpc_request->set_be_number(_be_number); _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; @@ -174,49 +174,50 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s } SCOPED_TIMER(_parent->brpc_send_timer()); - if (_closure == nullptr) { - _closure = new RefCountClosure(); - _closure->ref(); + if (_send_remote_block_callback == nullptr) { + _send_remote_block_callback = DummyBrpcCallback::create_shared(); } else { RETURN_IF_ERROR(_wait_last_brpc()); SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - _closure->cntl.Reset(); + _send_remote_block_callback->cntl_->Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { - auto statistic = _brpc_request.mutable_query_statistics(); + auto statistic = _brpc_request->mutable_query_statistics(); _parent->query_statistics()->to_pb(statistic); } - _brpc_request.set_eos(eos); + _brpc_request->set_eos(eos); if (!exec_status.ok()) { - exec_status.to_protobuf(_brpc_request.mutable_exec_status()); + exec_status.to_protobuf(_brpc_request->mutable_exec_status()); } if (block != nullptr) { - _brpc_request.set_allocated_block(block); + _brpc_request->set_allocated_block(block); } - _brpc_request.set_packet_seq(_packet_seq++); + _brpc_request->set_packet_seq(_packet_seq++); - _closure->ref(); - _closure->cntl.set_timeout_ms(_brpc_timeout_ms); + _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { - _closure->cntl.ignore_eovercrowded(); + _send_remote_block_callback->cntl_->ignore_eovercrowded(); } { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(_brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, _brpc_request, - _brpc_dest_addr)); + auto send_remote_block_closure = + AutoReleaseClosure>:: + create_unique(_brpc_request, _send_remote_block_callback); + if (enable_http_send_block(*_brpc_request)) { + RETURN_IF_ERROR(transmit_block_httpv2( + _state->exec_env(), std::move(send_remote_block_closure), _brpc_dest_addr)); } else { - transmit_block(*_brpc_stub, _closure, _brpc_request); + transmit_blockv2(*_brpc_stub, std::move(send_remote_block_closure)); } } if (block != nullptr) { - static_cast(_brpc_request.release_block()); + static_cast(_brpc_request->release_block()); } return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 10116be03346ae..bcc050e25bd865 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -261,11 +261,7 @@ class Channel { _ch_cur_pb_block = &_ch_pb_block1; } - virtual ~Channel() { - if (_closure != nullptr && _closure->unref()) { - delete _closure; - } - } + virtual ~Channel() = default; // Initialize channel. // Returns OK if successful, error indication otherwise. @@ -337,22 +333,22 @@ class Channel { Status _wait_last_brpc() { SCOPED_TIMER(_parent->brpc_wait_timer()); - if (_closure == nullptr) { + if (_send_remote_block_callback == nullptr) { return Status::OK(); } - auto cntl = &_closure->cntl; - auto call_id = _closure->cntl.call_id(); - brpc::Join(call_id); - _receiver_status = Status::create(_closure->result.status()); - if (cntl->Failed()) { + _send_remote_block_callback->join(); + if (_send_remote_block_callback->cntl_->Failed()) { std::string err = fmt::format( "failed to send brpc batch, error={}, error_text={}, client: {}, " "latency = {}", - berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), - cntl->latency_us()); + berror(_send_remote_block_callback->cntl_->ErrorCode()), + _send_remote_block_callback->cntl_->ErrorText(), + BackendOptions::get_localhost(), + _send_remote_block_callback->cntl_->latency_us()); LOG(WARNING) << err; return Status::RpcError(err); } + _receiver_status = Status::create(_send_remote_block_callback->response_->status()); return _receiver_status; } @@ -380,9 +376,9 @@ class Channel { PUniqueId _finst_id; PUniqueId _query_id; PBlock _pb_block; - PTransmitDataParams _brpc_request; + std::shared_ptr _brpc_request; std::shared_ptr _brpc_stub = nullptr; - RefCountClosure* _closure = nullptr; + std::shared_ptr> _send_remote_block_callback = nullptr; Status _receiver_status; int32_t _brpc_timeout_ms = 500; // whether the dest can be treated as query statistics transfer chain. @@ -540,15 +536,15 @@ class PipChannel final : public Channel { _buffer->register_sink(Channel::_fragment_instance_id); } - pipeline::SelfDeleteClosure* get_closure( + std::shared_ptr> get_send_callback( InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { - if (!_closure) { - _closure.reset(new pipeline::SelfDeleteClosure()); + if (!_send_callback) { + _send_callback = pipeline::ExchangeSendCallback::create_shared(); } else { - _closure->cntl.Reset(); + _send_callback->cntl_->Reset(); } - _closure->init(id, eos, data); - return _closure.get(); + _send_callback->init(id, eos, data); + return _send_callback; } std::shared_ptr get_local_channel_dependency() { @@ -564,7 +560,7 @@ class PipChannel final : public Channel { pipeline::ExchangeSinkBuffer* _buffer = nullptr; bool _eos_send = false; - std::unique_ptr> _closure = nullptr; + std::shared_ptr> _send_callback = nullptr; std::unique_ptr _pblock; }; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 75e813b134791e..f3270c4b9fff28 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -273,26 +273,13 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i _index_channel(index_channel), _node_id(node_id), _is_incremental(is_incremental) { + _cur_add_block_request = std::make_shared(); _node_channel_tracker = std::make_shared(fmt::format( "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), thread_context()->get_thread_id())); } -VNodeChannel::~VNodeChannel() { - for (auto& closure : _open_closures) { - if (closure != nullptr) { - if (closure->unref()) { - delete closure; - } - closure = nullptr; - } - } - if (_add_block_closure != nullptr) { - delete _add_block_closure; - _add_block_closure = nullptr; - } - static_cast(_cur_add_block_request.release_id()); -} +VNodeChannel::~VNodeChannel() = default; void VNodeChannel::clear_all_blocks() { std::lock_guard lg(_pending_batches_lock); @@ -334,13 +321,13 @@ Status VNodeChannel::init(RuntimeState* state) { _timeout_watch.start(); // Initialize _cur_add_block_request - if (!_cur_add_block_request.has_id()) { - _cur_add_block_request.set_allocated_id(&_parent->_load_id); + if (!_cur_add_block_request->has_id()) { + *(_cur_add_block_request->mutable_id()) = _parent->_load_id; } - _cur_add_block_request.set_index_id(_index_channel->_index_id); - _cur_add_block_request.set_sender_id(_parent->_sender_id); - _cur_add_block_request.set_backend_id(_node_id); - _cur_add_block_request.set_eos(false); + _cur_add_block_request->set_index_id(_index_channel->_index_id); + _cur_add_block_request->set_sender_id(_parent->_sender_id); + _cur_add_block_request->set_backend_id(_node_id); + _cur_add_block_request->set_eos(false); _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); // The node channel will send _batch_size rows of data each rpc. When the @@ -355,46 +342,48 @@ Status VNodeChannel::init(RuntimeState* state) { void VNodeChannel::_open_internal(bool is_incremental) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - PTabletWriterOpenRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_txn_id(_parent->_txn_id); - request.set_allocated_schema(_parent->_schema->to_protobuf()); + auto request = std::make_shared(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_txn_id(_parent->_txn_id); + request->set_allocated_schema(_parent->_schema->to_protobuf()); std::set deduper; for (auto& tablet : _all_tablets) { if (deduper.contains(tablet.tablet_id)) { continue; } - auto ptablet = request.add_tablets(); + auto ptablet = request->add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); deduper.insert(tablet.tablet_id); } - request.set_num_senders(_parent->_num_senders); - request.set_need_gen_rollup(false); // Useless but it is a required field in pb - request.set_load_mem_limit(_parent->_load_mem_limit); - request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); - request.set_is_high_priority(_parent->_is_high_priority); - request.set_sender_ip(BackendOptions::get_localhost()); - request.set_is_vectorized(true); - request.set_backend_id(_node_id); - request.set_enable_profile(_state->enable_profile()); - request.set_is_incremental(is_incremental); - - auto* open_closure = new RefCountClosure {}; - open_closure->ref(); - - open_closure->ref(); // This ref is for RPC's reference - open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + request->set_num_senders(_parent->_num_senders); + request->set_need_gen_rollup(false); // Useless but it is a required field in pb + request->set_load_mem_limit(_parent->_load_mem_limit); + request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + request->set_is_high_priority(_parent->_is_high_priority); + request->set_sender_ip(BackendOptions::get_localhost()); + request->set_is_vectorized(true); + request->set_backend_id(_node_id); + request->set_enable_profile(_state->enable_profile()); + request->set_is_incremental(is_incremental); + + auto open_callback = DummyBrpcCallback::create_shared(); + auto open_closure = AutoReleaseClosure< + PTabletWriterOpenRequest, + DummyBrpcCallback>::create_unique(request, open_callback); + open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); if (config::tablet_writer_ignore_eovercrowded) { - open_closure->cntl.ignore_eovercrowded(); + open_callback->cntl_->ignore_eovercrowded(); } // the real transmission here. the corresponding BE's load mgr will open load channel for it. - _stub->tablet_writer_open(&open_closure->cntl, &request, &open_closure->result, open_closure); - _open_closures.push_back(open_closure); + _stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(), + open_closure->response_.get(), open_closure.get()); + open_closure.release(); + _open_callbacks.push_back(open_callback); - static_cast(request.release_id()); - static_cast(request.release_schema()); + static_cast(request->release_id()); + static_cast(request->release_schema()); } void VNodeChannel::open() { @@ -407,36 +396,28 @@ void VNodeChannel::incremental_open() { Status VNodeChannel::open_wait() { Status status; - for (auto& open_closure : _open_closures) { + for (auto& open_callback : _open_callbacks) { // because of incremental open, we will wait multi times. so skip the closures which have been checked and set to nullptr in previous rounds - if (open_closure == nullptr) { + if (open_callback == nullptr) { continue; } - open_closure->join(); + open_callback->join(); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - if (open_closure->cntl.Failed()) { + if (open_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( _stub, _node_info.host, _node_info.brpc_port)) { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - open_closure->cntl.remote_side()); + open_callback->cntl_->remote_side()); } _cancelled = true; - auto error_code = open_closure->cntl.ErrorCode(); - auto error_text = open_closure->cntl.ErrorText(); - if (open_closure->unref()) { - delete open_closure; - } - open_closure = nullptr; + auto error_code = open_callback->cntl_->ErrorCode(); + auto error_text = open_callback->cntl_->ErrorText(); return Status::InternalError( "failed to open tablet writer, error={}, error_text={}, info={}", berror(error_code), error_text, channel_info()); } - status = Status::create(open_closure->result.status()); - if (open_closure->unref()) { - delete open_closure; - } - open_closure = nullptr; + status = Status::create(open_callback->response_->status()); if (!status.ok()) { _cancelled = true; @@ -445,11 +426,11 @@ Status VNodeChannel::open_wait() { } // add block closure - _add_block_closure = ReusableClosure::create(); - _add_block_closure->addFailedHandler( + _send_block_callback = WriteBlockCallback::create_shared(); + _send_block_callback->addFailedHandler( [this](bool is_last_rpc) { _add_block_failed_callback(is_last_rpc); }); - _add_block_closure->addSuccessHandler( + _send_block_callback->addSuccessHandler( [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { _add_block_success_callback(result, is_last_rpc); }); @@ -501,12 +482,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, for (auto column : block->get_columns()) { columns.push_back(std::move(*column).mutate()); } - *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), tablets.end()}; - _cur_add_block_request.set_is_single_tablet_block(true); + *_cur_add_block_request->mutable_tablet_ids() = {tablets.begin(), tablets.end()}; + _cur_add_block_request->set_is_single_tablet_block(true); } else { block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); for (auto tablet_id : payload->second) { - _cur_add_block_request.add_tablet_ids(tablet_id); + _cur_add_block_request->add_tablet_ids(tablet_id); } } @@ -517,9 +498,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, std::lock_guard l(_pending_batches_lock); // To simplify the add_row logic, postpone adding block into req until the time of sending req _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); - _cur_add_block_request.set_eos( + _cur_add_block_request->set_eos( false); // for multi-add, only when marking close we set it eos. - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + // Copy the request to tmp request to add to pend block queue + auto tmp_add_block_request = std::make_shared(); + *tmp_add_block_request = *_cur_add_block_request; + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); _pending_batches_num++; VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this << " pending_batches_bytes:" << _pending_batches_bytes @@ -527,7 +511,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, << " loadinfo:" << _load_info; } _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); - _cur_add_block_request.clear_tablet_ids(); + _cur_add_block_request->clear_tablet_ids(); } return Status::OK(); @@ -540,7 +524,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, } // set closure for sending block. - if (!_add_block_closure->try_set_in_flight()) { + if (!_send_block_callback->try_set_in_flight()) { // There is packet in flight, skip. return _send_finished ? 0 : 1; } @@ -551,12 +535,12 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); // sending finished. clear in flight - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); } // in_flight is cleared in closure::Run } else { // sending finished. clear in flight - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); } return _send_finished ? 0 : 1; } @@ -607,20 +591,21 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { auto request = std::move(send_block.second); // doesn't need to be saved in heap // tablet_ids has already set when add row - request.set_packet_seq(_next_packet_seq); + request->set_packet_seq(_next_packet_seq); auto block = mutable_block->to_block(); - CHECK(block.rows() == request.tablet_ids_size()) - << "block rows: " << block.rows() << ", tablet_ids_size: " << request.tablet_ids_size(); + CHECK(block.rows() == request->tablet_ids_size()) + << "block rows: " << block.rows() + << ", tablet_ids_size: " << request->tablet_ids_size(); if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; - Status st = block.serialize(state->be_exec_version(), request.mutable_block(), + Status st = block.serialize(state->be_exec_version(), request->mutable_block(), &uncompressed_bytes, &compressed_bytes, state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { @@ -632,29 +617,29 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { - if (remain_ms <= 0 && !request.eos()) { + if (remain_ms <= 0 && !request->eos()) { cancel(fmt::format("{}, err: timeout", channel_info())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } else { remain_ms = config::min_load_rpc_timeout_ms; } } - _add_block_closure->reset(); - _add_block_closure->cntl.set_timeout_ms(remain_ms); + _send_block_callback->reset(); + _send_block_callback->cntl_->set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { - _add_block_closure->cntl.ignore_eovercrowded(); + _send_block_callback->cntl_->ignore_eovercrowded(); } - if (request.eos()) { + if (request->eos()) { for (auto pid : _parent->_tablet_finder->partition_ids()) { - request.add_partition_ids(pid); + request->add_partition_ids(pid); } - request.set_write_single_replica(false); + request->set_write_single_replica(false); if (_parent->_write_single_replica) { - request.set_write_single_replica(true); + request->set_write_single_replica(true); for (std::unordered_map>::iterator iter = _slave_tablet_nodes.begin(); iter != _slave_tablet_nodes.end(); iter++) { @@ -670,24 +655,27 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { pnode->set_host(node->host); pnode->set_async_internal_port(node->brpc_port); } - request.mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); + request->mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); } } - // eos request must be the last request. it's a signal makeing callback function to set _add_batch_finished true. - _add_block_closure->end_mark(); + // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. + _send_block_callback->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; } - if (_parent->_transfer_large_data_by_brpc && request.has_block() && - request.block().has_column_values() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { - Status st = request_embed_attachment_contain_block< - PTabletWriterAddBlockRequest, ReusableClosure>( - &request, _add_block_closure); + auto send_block_closure = AutoReleaseClosure< + PTabletWriterAddBlockRequest, + WriteBlockCallback>::create_unique(request, + _send_block_callback); + if (_parent->_transfer_large_data_by_brpc && request->has_block() && + request->block().has_column_values() && request->ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_blockv2(send_block_closure->request_.get(), + send_block_closure); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.to_string())); - _add_block_closure->clear_in_flight(); + _send_block_callback->clear_in_flight(); return; } @@ -696,23 +684,26 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { std::shared_ptr _brpc_http_stub = _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); - _add_block_closure->cntl.http_request().uri() = + _send_block_callback->cntl_->http_request().uri() = brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; - _add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); - _add_block_closure->cntl.http_request().set_content_type("application/json"); + _send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); + _send_block_callback->cntl_->http_request().set_content_type("application/json"); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - _brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, nullptr, - &_add_block_closure->result, - _add_block_closure); + _brpc_http_stub->tablet_writer_add_block_by_http( + send_block_closure->cntl_.get(), nullptr, send_block_closure->response_.get(), + send_block_closure.get()); + send_block_closure.release(); } } else { - _add_block_closure->cntl.http_request().Clear(); + _send_block_callback->cntl_->http_request().Clear(); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, - &_add_block_closure->result, _add_block_closure); + _stub->tablet_writer_add_block( + send_block_closure->cntl_.get(), send_block_closure->request_.get(), + send_block_closure->response_.get(), send_block_closure.get()); + send_block_closure.release(); } } @@ -805,11 +796,11 @@ void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { } SCOPED_ATTACH_TASK(_state); // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed( - this, - fmt::format("rpc failed, error coed:{}, error text:{}", - _add_block_closure->cntl.ErrorCode(), _add_block_closure->cntl.ErrorText()), - -1); + _index_channel->mark_as_failed(this, + fmt::format("rpc failed, error coed:{}, error text:{}", + _send_block_callback->cntl_->ErrorCode(), + _send_block_callback->cntl_->ErrorText()), + -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); @@ -835,24 +826,28 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { // But do we need brpc::StartCancel(call_id)? _cancel_with_msg(cancel_msg); - PTabletWriterCancelRequest request; - request.set_allocated_id(&_parent->_load_id); - request.set_index_id(_index_channel->_index_id); - request.set_sender_id(_parent->_sender_id); + auto request = std::make_shared(); + request->set_allocated_id(&_parent->_load_id); + request->set_index_id(_index_channel->_index_id); + request->set_sender_id(_parent->_sender_id); - auto closure = new RefCountClosure(); + auto cancel_callback = DummyBrpcCallback::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriterCancelRequest, + DummyBrpcCallback>::create_unique(request, cancel_callback); - closure->ref(); int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { remain_ms = config::min_load_rpc_timeout_ms; } - closure->cntl.set_timeout_ms(remain_ms); + cancel_callback->cntl_->set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { - closure->cntl.ignore_eovercrowded(); + closure->cntl_->ignore_eovercrowded(); } - _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); - static_cast(request.release_id()); + _stub->tablet_writer_cancel(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.get()); + closure.release(); + static_cast(request->release_id()); } bool VNodeChannel::is_send_data_rpc_done() const { @@ -912,17 +907,19 @@ void VNodeChannel::mark_close() { return; } - _cur_add_block_request.set_eos(true); + _cur_add_block_request->set_eos(true); { std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { // add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } + auto tmp_add_block_request = + std::make_shared(*_cur_add_block_request); // when prepare to close, add block to queue so that try_send_pending_block thread will send it. - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request); _pending_batches_num++; - DCHECK(_pending_blocks.back().second.eos()); + DCHECK(_pending_blocks.back().second->eos()); _close_time_ms = UnixMillis(); LOG(INFO) << channel_info() << " mark closed, left pending batch size: " << _pending_blocks.size(); @@ -1391,7 +1388,7 @@ Status VTabletWriter::close(Status exec_status) { SCOPED_TIMER(_close_timer); SCOPED_TIMER(_profile->total_time_counter()); - // will make the last batch of request. close_wait will wait this finished. + // will make the last batch of request-> close_wait will wait this finished. static_cast(try_close(_state, exec_status)); // If _close_status is not ok, all nodes have been canceled in try_close. diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 7fee45be371080..ea998a0f0b4878 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -65,6 +65,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "util/countdown_latch.h" +#include "util/ref_count_closure.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" @@ -88,8 +89,6 @@ class TExpr; class Thread; class ThreadPoolToken; class TupleDescriptor; -template -class RefCountClosure; namespace vectorized { @@ -120,26 +119,21 @@ struct AddBatchCounter { // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. -// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. template -class ReusableClosure final : public google::protobuf::Closure { -public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() override { - // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. - join(); - SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - } +class WriteBlockCallback final : public ::doris::DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(WriteBlockCallback); - static ReusableClosure* create() { return new ReusableClosure(); } +public: + WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} + ~WriteBlockCallback() override = default; void addFailedHandler(const std::function& fn) { failed_handler = fn; } void addSuccessHandler(const std::function& fn) { success_handler = fn; } - void join() { + void join() override { // We rely on in_flight to assure one rpc is running, // while cid is not reliable due to memory order. // in_flight is written before getting callid, @@ -159,8 +153,8 @@ class ReusableClosure final : public google::protobuf::Closure { // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); - cntl.Reset(); - cid = cntl.call_id(); + ::doris::DummyBrpcCallback::cntl_->Reset(); + cid = ::doris::DummyBrpcCallback::cntl_->call_id(); } // if _packet_in_flight == false, set it to true. Return true. @@ -179,21 +173,19 @@ class ReusableClosure final : public google::protobuf::Closure { _is_last_rpc = true; } - void Run() override { + void call() override { DCHECK(_packet_in_flight); - if (cntl.Failed()) { - LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) - << ", error_text=" << cntl.ErrorText(); + if (::doris::DummyBrpcCallback::cntl_->Failed()) { + LOG(WARNING) << "failed to send brpc batch, error=" + << berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()) + << ", error_text=" << ::doris::DummyBrpcCallback::cntl_->ErrorText(); failed_handler(_is_last_rpc); } else { - success_handler(result, _is_last_rpc); + success_handler(*(::doris::DummyBrpcCallback::response_), _is_last_rpc); } clear_in_flight(); } - brpc::Controller cntl; - T result; - private: brpc::CallId cid; std::atomic _packet_in_flight {false}; @@ -381,7 +373,7 @@ class VNodeChannel { std::shared_ptr _stub = nullptr; // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block. - std::vector*> _open_closures; + std::vector>> _open_callbacks; std::vector _all_tablets; // map from tablet_id to node_id where slave replicas locate in @@ -413,12 +405,12 @@ class VNodeChannel { // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty. std::unique_ptr _cur_mutable_block; - PTabletWriterAddBlockRequest _cur_add_block_request; + std::shared_ptr _cur_add_block_request; - using AddBlockReq = - std::pair, PTabletWriterAddBlockRequest>; + using AddBlockReq = std::pair, + std::shared_ptr>; std::queue _pending_blocks; - ReusableClosure* _add_block_closure = nullptr; + std::shared_ptr> _send_block_callback = nullptr; bool _is_incremental; }; diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 80adee73749509..c1ce87fe56dabd 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -84,6 +84,10 @@ class LocalMockBackendService : public PBackendService { << ", fragment_instance_id=" << print_id(request->finst_id()) << ", node=" << request->node_id(); } + if (done != nullptr) { + st.to_protobuf(response->mutable_status()); + done->Run(); + } } private: