Skip to content

Commit

Permalink
[refactor](closure) remove ref count closure using auto release closu…
Browse files Browse the repository at this point in the history
…re (apache#26718)

1. closure should be managed by a unique ptr and released by brpc , should not hold by our code. If hold by our code, we need to wait brpc finished during cancel or close.
2. closure should be exception safe, if any exception happens, should not memory leak.
3. using a specific callback interface to be implemented by Doris's code, we could write any code and doris should manage callback's lifecycle.
4. using a weak ptr between callback and closure. If callback is deconstruted before closure'Run, should not core.
  • Loading branch information
yiguolei authored and seawinde committed Nov 12, 2023
1 parent 7d8ac3a commit 33d1051
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 321 deletions.
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
closure->cntl_->ignore_eovercrowded();
stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.release());
closure->response_.get(), closure.get());

closure.release();
pull_callback->join();
if (pull_callback->cntl_->Failed()) {
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
Expand Down
65 changes: 39 additions & 26 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) {
auto& rpc_ctx = _instance_to_rpc_ctx[id];
if (!rpc_ctx.is_cancelled) {
brpc::StartCancel(rpc_ctx._closure->cntl.call_id());
brpc::StartCancel(rpc_ctx._send_callback->cntl_->call_id());
rpc_ctx.is_cancelled = true;
}
}
Expand Down Expand Up @@ -245,21 +245,21 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
}
auto* closure = request.channel->get_closure(id, request.eos, nullptr);
auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr);

_instance_to_rpc_ctx[id]._closure = closure;
_instance_to_rpc_ctx[id]._send_callback = send_callback;
_instance_to_rpc_ctx[id].is_cancelled = false;

closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
closure->cntl.ignore_eovercrowded();
send_callback->cntl_->ignore_eovercrowded();
}
closure->addFailedHandler(
send_callback->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
closure->start_rpc_time = GetCurrentTimeNanos();
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
Expand All @@ -275,11 +275,17 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
});
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
auto send_remote_block_closure =
AutoReleaseClosure<PTransmitDataParams,
pipeline::ExchangeSendCallback<PTransmitDataResult>>::
create_unique(brpc_request, send_callback);
if (enable_http_send_block(*brpc_request)) {
RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request,
request.channel->_brpc_dest_addr));
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
std::move(send_remote_block_closure),
request.channel->_brpc_dest_addr));
} else {
transmit_block(*request.channel->_brpc_stub, closure, *brpc_request);
transmit_blockv2(*request.channel->_brpc_stub,
std::move(send_remote_block_closure));
}
}
if (request.block) {
Expand All @@ -303,23 +309,24 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
auto statistic = brpc_request->mutable_query_statistics();
_statistics->to_pb(statistic);
}
auto* closure = request.channel->get_closure(id, request.eos, request.block_holder);
auto send_callback =
request.channel->get_send_callback(id, request.eos, request.block_holder);

ExchangeRpcContext rpc_ctx;
rpc_ctx._closure = closure;
rpc_ctx._send_callback = send_callback;
rpc_ctx.is_cancelled = false;
_instance_to_rpc_ctx[id] = rpc_ctx;

closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
closure->cntl.ignore_eovercrowded();
send_callback->cntl_->ignore_eovercrowded();
}
closure->addFailedHandler(
send_callback->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
closure->start_rpc_time = GetCurrentTimeNanos();
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
set_rpc_time(id, start_rpc_time, result.receive_time());
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
Expand All @@ -335,11 +342,17 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
});
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
auto send_remote_block_closure =
AutoReleaseClosure<PTransmitDataParams,
pipeline::ExchangeSendCallback<PTransmitDataResult>>::
create_unique(brpc_request, send_callback);
if (enable_http_send_block(*brpc_request)) {
RETURN_IF_ERROR(transmit_block_http(_context->exec_env(), closure, *brpc_request,
request.channel->_brpc_dest_addr));
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
std::move(send_remote_block_closure),
request.channel->_brpc_dest_addr));
} else {
transmit_block(*request.channel->_brpc_stub, closure, *brpc_request);
transmit_blockv2(*request.channel->_brpc_stub,
std::move(send_remote_block_closure));
}
}
if (request.block_holder->get_block()) {
Expand All @@ -359,7 +372,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {

template <typename Parent>
void ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_unique<PTransmitDataParams>();
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

Expand Down
41 changes: 22 additions & 19 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/ref_count_closure.h"

namespace doris {
class PTransmitDataParams;
Expand Down Expand Up @@ -107,65 +108,67 @@ struct BroadcastTransmitInfo {
bool eos;
};

template <typename T>
class SelfDeleteClosure : public google::protobuf::Closure {
template <typename Response>
class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
ENABLE_FACTORY_CREATOR(ExchangeSendCallback);

public:
SelfDeleteClosure() = default;
ExchangeSendCallback() = default;

void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
_id = id;
_eos = eos;
_data = data;
}

~SelfDeleteClosure() override = default;
SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
~ExchangeSendCallback() override = default;
ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete;
void addFailedHandler(
const std::function<void(const InstanceLoId&, const std::string&)>& fail_fn) {
_fail_fn = fail_fn;
}
void addSuccessHandler(const std::function<void(const InstanceLoId&, const bool&, const T&,
const int64_t&)>& suc_fn) {
void addSuccessHandler(const std::function<void(const InstanceLoId&, const bool&,
const Response&, const int64_t&)>& suc_fn) {
_suc_fn = suc_fn;
}

void Run() noexcept override {
void call() noexcept override {
try {
if (_data) {
_data->unref();
}
if (cntl.Failed()) {
if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
std::string err = fmt::format(
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
"latency = {}",
berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(),
cntl.latency_us());
berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
BackendOptions::get_localhost(),
::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
_fail_fn(_id, err);
} else {
_suc_fn(_id, _eos, result, start_rpc_time);
_suc_fn(_id, _eos, *(::doris::DummyBrpcCallback<Response>::response_),
start_rpc_time);
}
} catch (const std::exception& exp) {
LOG(FATAL) << "brpc callback error: " << exp.what();
} catch (...) {
LOG(FATAL) << "brpc callback error.";
}
}

brpc::Controller cntl;
T result;
int64_t start_rpc_time;

private:
std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
std::function<void(const InstanceLoId&, const bool&, const T&, const int64_t&)> _suc_fn;
std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn;
InstanceLoId _id;
bool _eos;
vectorized::BroadcastPBlockHolder* _data;
};

struct ExchangeRpcContext {
SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr;
std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback = nullptr;
bool is_cancelled = false;
};

Expand Down Expand Up @@ -208,7 +211,7 @@ class ExchangeSinkBuffer {
// must init zero
// TODO: make all flat_hash_map to a STRUT
phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> _instance_to_request;
phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> _instance_to_request;
// One channel is corresponding to a downstream instance.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
// Number of busy channels;
Expand Down
44 changes: 21 additions & 23 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1518,37 +1518,35 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
return;
}

PTabletWriteSlaveDoneRequest request;
request.set_txn_id(txn_id);
request.set_tablet_id(tablet_id);
request.set_node_id(node_id);
request.set_is_succeed(is_succeed);
RefCountClosure<PTabletWriteSlaveDoneResult>* closure =
new RefCountClosure<PTabletWriteSlaveDoneResult>();
closure->ref();
closure->ref();
closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
closure->cntl.ignore_eovercrowded();
stub->response_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure);

closure->join();
if (closure->cntl.Failed()) {
auto request = std::make_shared<PTabletWriteSlaveDoneRequest>();
request->set_txn_id(txn_id);
request->set_tablet_id(tablet_id);
request->set_node_id(node_id);
request->set_is_succeed(is_succeed);
auto pull_rowset_callback = DummyBrpcCallback<PTabletWriteSlaveDoneResult>::create_shared();
auto closure = AutoReleaseClosure<
PTabletWriteSlaveDoneRequest,
DummyBrpcCallback<PTabletWriteSlaveDoneResult>>::create_unique(request,
pull_rowset_callback);
closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
closure->cntl_->ignore_eovercrowded();
stub->response_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.get());
closure.release();

pull_rowset_callback->join();
if (pull_rowset_callback->cntl_->Failed()) {
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(stub, remote_host,
brpc_port)) {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
closure->cntl.remote_side());
closure->cntl_->remote_side());
}
LOG(WARNING) << "failed to response result of slave replica to master replica, error="
<< berror(closure->cntl.ErrorCode())
<< ", error_text=" << closure->cntl.ErrorText()
<< berror(pull_rowset_callback->cntl_->ErrorCode())
<< ", error_text=" << pull_rowset_callback->cntl_->ErrorText()
<< ", master host: " << remote_host << ", tablet_id=" << tablet_id
<< ", txn_id=" << txn_id;
}

if (closure->unref()) {
delete closure;
}
closure = nullptr;
VLOG_CRITICAL << "succeed to response the result of slave replica pull rowset to master "
"replica. master host: "
<< remote_host << ". is_succeed=" << is_succeed << ", tablet_id=" << tablet_id
Expand Down
36 changes: 21 additions & 15 deletions be/src/util/proto_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);

// Embed column_values and brpc request serialization string in controller attachment.
template <typename Params, typename Closure>
Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) {
Status request_embed_attachment_contain_blockv2(Params* brpc_request,
std::unique_ptr<Closure>& closure) {
auto block = brpc_request->block();
Status st = request_embed_attachment(brpc_request, block.column_values(), closure);
Status st = request_embed_attachmentv2(brpc_request, block.column_values(), closure);
block.set_column_values("");
return st;
}
Expand All @@ -59,32 +60,37 @@ inline bool enable_http_send_block(const PTransmitDataParams& request) {
}

template <typename Closure>
void transmit_block(PBackendService_Stub& stub, Closure* closure,
const PTransmitDataParams& params) {
closure->cntl.http_request().Clear();
stub.transmit_block(&closure->cntl, &params, &closure->result, closure);
void transmit_blockv2(PBackendService_Stub& stub, std::unique_ptr<Closure> closure) {
closure->cntl_->http_request().Clear();
stub.transmit_block(closure->cntl_.get(), closure->request_.get(), closure->response_.get(),
closure.get());
closure.release();
}

template <typename Closure>
Status transmit_block_http(ExecEnv* exec_env, Closure* closure, PTransmitDataParams& params,
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_block(&params, closure));
Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr<Closure> closure,
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure));

//format an ipv6 address
std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port);

std::shared_ptr<PBackendService_Stub> brpc_http_stub =
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http");
closure->cntl.http_request().uri() = brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
closure->cntl.http_request().set_content_type("application/json");
brpc_http_stub->transmit_block_by_http(&closure->cntl, nullptr, &closure->result, closure);
closure->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
closure->cntl_->http_request().set_content_type("application/json");
brpc_http_stub->transmit_block_by_http(closure->cntl_.get(), nullptr, closure->response_.get(),
closure.get());
closure.release();

return Status::OK();
}

template <typename Params, typename Closure>
Status request_embed_attachment(Params* brpc_request, const std::string& data, Closure* closure) {
Status request_embed_attachmentv2(Params* brpc_request, const std::string& data,
std::unique_ptr<Closure>& closure) {
butil::IOBuf attachment;

// step1: serialize brpc_request to string, and append to attachment.
Expand All @@ -106,7 +112,7 @@ Status request_embed_attachment(Params* brpc_request, const std::string& data, C
data_size);
}
// step3: attachment add to closure.
closure->cntl.request_attachment().swap(attachment);
closure->cntl_->request_attachment().swap(attachment);
return Status::OK();
}

Expand Down
Loading

0 comments on commit 33d1051

Please sign in to comment.