-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #2146 E112 EHOSTDOWN in short and pooled connection #2177
base: master
Are you sure you want to change the base?
Changes from 2 commits
c6f798c
a8978de
0ad8448
e86f284
cb961bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -719,6 +719,37 @@ inline bool does_error_affect_main_socket(int error_code) { | |
error_code == EINVAL/*returned by connect "0.0.0.1"*/; | ||
} | ||
|
||
inline void maybe_block_server(int error_code, Controller* cntl, SharedLoadBalancer* lb, SocketId sock) { | ||
if (!does_error_affect_main_socket(error_code)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 为什么要把does_error_affect_main_socket放到这个函数里? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 因为这个函数最终的动作是SetFailed main socket,所以要先检查一下错误号是否满足影响main socket 的条件 |
||
// Error code does not indicate that server is down, we can't block it | ||
return; | ||
} | ||
if (!lb) { | ||
// Single server mode, we can't block the only server | ||
return; | ||
} | ||
// We try to SelectServer once to check if sock is the last available server | ||
ExcludedServers* excluded = ExcludedServers::Create(1); | ||
excluded->Add(sock); | ||
SocketUniquePtr tmp_sock; | ||
LoadBalancer::SelectIn sel_in = { 0, false, true, 0, excluded }; | ||
LoadBalancer::SelectOut sel_out(&tmp_sock); | ||
const int rc = lb->SelectServer(sel_in, &sel_out); | ||
ExcludedServers::Destroy(excluded); | ||
if (rc != 0 || tmp_sock->id() == sock) { | ||
// sock is the last available server in this LB, we can't block it | ||
return; | ||
} | ||
// main socket should die as well | ||
// NOTE: main socket may be wrongly set failed (provided that | ||
// short/pooled socket does not hold a ref of the main socket). | ||
// E.g. a in-parallel RPC sets the peer_id to be failed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 'an' |
||
// -> this RPC meets ECONNREFUSED | ||
// -> main socket gets revived from HC | ||
// -> this RPC sets main socket to be failed again. | ||
Socket::SetFailed(sock); | ||
} | ||
|
||
//Note: A RPC call is probably consisted by several individual Calls such as | ||
// retries and backup requests. This method simply cares about the error of | ||
// this very Call (specified by |error_code|) rather than the error of the | ||
|
@@ -749,9 +780,8 @@ void Controller::Call::OnComplete( | |
// "single" streams are often maintained in a separate SocketMap and | ||
// different from the main socket as well. | ||
if (c->_stream_creator != NULL && | ||
does_error_affect_main_socket(error_code) && | ||
(sending_sock == NULL || sending_sock->id() != peer_id)) { | ||
Socket::SetFailed(peer_id); | ||
maybe_block_server(error_code, c, c->_lb.get(), peer_id); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 实现是"block socket"不是block server". 或许叫checkAndSetFailed更明确一点? |
||
} | ||
break; | ||
case CONNECTION_TYPE_POOLED: | ||
chenBright marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
@@ -786,16 +816,7 @@ void Controller::Call::OnComplete( | |
sending_sock->OnProgressiveReadCompleted(); | ||
} | ||
} | ||
if (does_error_affect_main_socket(error_code)) { | ||
// main socket should die as well. | ||
// NOTE: main socket may be wrongly set failed (provided that | ||
// short/pooled socket does not hold a ref of the main socket). | ||
// E.g. an in-parallel RPC sets the peer_id to be failed | ||
// -> this RPC meets ECONNREFUSED | ||
// -> main socket gets revived from HC | ||
// -> this RPC sets main socket to be failed again. | ||
Socket::SetFailed(peer_id); | ||
} | ||
maybe_block_server(error_code, c, c->_lb.get(), peer_id); | ||
break; | ||
} | ||
|
||
|
@@ -1033,7 +1054,13 @@ void Controller::IssueRPC(int64_t start_realtime_us) { | |
{ start_realtime_us, true, | ||
has_request_code(), _request_code, _accessed }; | ||
LoadBalancer::SelectOut sel_out(&tmp_sock); | ||
const int rc = _lb->SelectServer(sel_in, &sel_out); | ||
int rc = _lb->SelectServer(sel_in, &sel_out); | ||
if (rc == EHOSTDOWN) { | ||
// If no server is available, include accessed server and try to SelectServer again | ||
sel_in.excluded = NULL; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这样是不是就会选到期望要excluded的实例呢?这样合理吗? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 目前excluded的实例是在之前的重试中选过的实例,如果返回EHOSTDOWN说明已经没有实例可选,RPC必然失败,那就退而求其次,选择excluded中的实例,这样还有成功的可能 |
||
sel_in.changable_weights = false; | ||
rc = _lb->SelectServer(sel_in, &sel_out); | ||
} | ||
if (rc != 0) { | ||
std::ostringstream os; | ||
DescribeOptions opt; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1777,7 +1777,7 @@ class ChannelTest : public ::testing::Test{ | |
cntl.Reset(); | ||
cntl.set_max_retry(RETRY_NUM); | ||
CallMethod(&channel, &cntl, &req, &res, async); | ||
EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode()); | ||
EXPECT_EQ(short_connection ? ECONNREFUSED : EHOSTDOWN, cntl.ErrorCode()); | ||
EXPECT_EQ(RETRY_NUM, cntl.retried_count()); | ||
} | ||
|
||
|
@@ -1824,6 +1824,39 @@ class ChannelTest : public ::testing::Test{ | |
MyEchoService _svc; | ||
}; | ||
|
||
void TestBlockServer(bool single_server, bool short_connection, const char* lb) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TestBlockServer 应该 是ChannelTest 的成员函数。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
哦哦 知道了 |
||
std::cout << " *** single=" << single_server | ||
<< " short=" << short_connection | ||
<< " lb=" << lb << std::endl; | ||
|
||
brpc::Channel channel; | ||
brpc::ChannelOptions opt; | ||
if (short_connection) { | ||
opt.connection_type = brpc::CONNECTION_TYPE_SHORT; | ||
} else { | ||
opt.connection_type = brpc::CONNECTION_TYPE_POOLED; | ||
} | ||
opt.max_retry = 0; | ||
if (single_server) { | ||
EXPECT_EQ(0, channel.Init("127.0.0.1:53829", &opt)); | ||
} else { | ||
EXPECT_EQ(0, channel.Init("list://127.0.0.1:53829,127.0.0.1:53830", lb, &opt)); | ||
} | ||
|
||
const int RETRY_NUM = 3; | ||
test::EchoRequest req; | ||
test::EchoResponse res; | ||
brpc::Controller cntl; | ||
req.set_message(__FUNCTION__); | ||
|
||
cntl.set_max_retry(RETRY_NUM); | ||
cntl.set_timeout_ms(10); // 10ms | ||
cntl.set_request_code(1); | ||
CallMethod(&channel, &cntl, &req, &res, false); | ||
EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText(); | ||
EXPECT_EQ(RETRY_NUM, cntl.retried_count()); | ||
} | ||
|
||
class MyShared : public brpc::SharedObject { | ||
public: | ||
MyShared() { ++ nctor; } | ||
|
@@ -2466,6 +2499,17 @@ TEST_F(ChannelTest, retry_other_servers) { | |
} | ||
} | ||
|
||
TEST_F(ChannelTest, test_block_server) { | ||
const char* lbs[] = {"rr", "random", "la", "c_md5"}; | ||
for (int i = 0; i <= 1; ++i) { // Flag SingleServer | ||
for (int j = 0; j <= 1; ++j) { // Flag ShortConnection | ||
for (int k = 0; k < 4; ++k) { // Flag LB | ||
TestBlockServer(i, j, lbs[k]); | ||
} | ||
} | ||
} | ||
} | ||
|
||
TEST_F(ChannelTest, multiple_threads_single_channel) { | ||
srand(time(NULL)); | ||
ASSERT_EQ(0, StartAccept(_ep)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
连接超时的情况可以优化吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我这周再研究研究。。