Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2146 E112 EHOSTDOWN in short and pooled connection #2177

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,37 @@ inline bool does_error_affect_main_socket(int error_code) {
error_code == EINVAL/*returned by connect "0.0.0.1"*/;
}

inline void maybe_block_server(int error_code, Controller* cntl, SharedLoadBalancer* lb, SocketId sock) {
if (!does_error_affect_main_socket(error_code)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

连接超时的情况可以优化吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我这周再研究研究。。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么要把does_error_affect_main_socket放到这个函数里?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为这个函数最终的动作是SetFailed main socket,所以要先检查一下错误号是否满足影响main socket 的条件

// Error code does not indicate that server is down, we can't block it
return;
}
if (!lb) {
// Single server mode, we can't block the only server
return;
}
// We try to SelectServer once to check if sock is the last available server
ExcludedServers* excluded = ExcludedServers::Create(1);
excluded->Add(sock);
SocketUniquePtr tmp_sock;
LoadBalancer::SelectIn sel_in = { 0, false, true, 0, excluded };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = lb->SelectServer(sel_in, &sel_out);
ExcludedServers::Destroy(excluded);
if (rc != 0 || tmp_sock->id() == sock) {
// sock is the last available server in this LB, we can't block it
return;
}
// main socket should die as well
// NOTE: main socket may be wrongly set failed (provided that
// short/pooled socket does not hold a ref of the main socket).
// E.g. a in-parallel RPC sets the peer_id to be failed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'an'

// -> this RPC meets ECONNREFUSED
// -> main socket gets revived from HC
// -> this RPC sets main socket to be failed again.
Socket::SetFailed(sock);
}

//Note: A RPC call is probably consisted by several individual Calls such as
// retries and backup requests. This method simply cares about the error of
// this very Call (specified by |error_code|) rather than the error of the
Expand Down Expand Up @@ -749,9 +780,8 @@ void Controller::Call::OnComplete(
// "single" streams are often maintained in a separate SocketMap and
// different from the main socket as well.
if (c->_stream_creator != NULL &&
does_error_affect_main_socket(error_code) &&
(sending_sock == NULL || sending_sock->id() != peer_id)) {
Socket::SetFailed(peer_id);
maybe_block_server(error_code, c, c->_lb.get(), peer_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

实现是"block socket"不是block server". 或许叫checkAndSetFailed更明确一点?

}
break;
case CONNECTION_TYPE_POOLED:
chenBright marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -786,16 +816,7 @@ void Controller::Call::OnComplete(
sending_sock->OnProgressiveReadCompleted();
}
}
if (does_error_affect_main_socket(error_code)) {
// main socket should die as well.
// NOTE: main socket may be wrongly set failed (provided that
// short/pooled socket does not hold a ref of the main socket).
// E.g. an in-parallel RPC sets the peer_id to be failed
// -> this RPC meets ECONNREFUSED
// -> main socket gets revived from HC
// -> this RPC sets main socket to be failed again.
Socket::SetFailed(peer_id);
}
maybe_block_server(error_code, c, c->_lb.get(), peer_id);
break;
}

Expand Down Expand Up @@ -1033,7 +1054,13 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
{ start_realtime_us, true,
has_request_code(), _request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out);
int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc == EHOSTDOWN) {
// If no server is available, include accessed server and try to SelectServer again
sel_in.excluded = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样是不是就会选到期望要excluded的实例呢?这样合理吗?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前excluded的实例是在之前的重试中选过的实例,如果返回EHOSTDOWN说明已经没有实例可选,RPC必然失败,那就退而求其次,选择excluded中的实例,这样还有成功的可能

sel_in.changable_weights = false;
rc = _lb->SelectServer(sel_in, &sel_out);
}
if (rc != 0) {
std::ostringstream os;
DescribeOptions opt;
Expand Down
46 changes: 45 additions & 1 deletion test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ class ChannelTest : public ::testing::Test{
cntl.Reset();
cntl.set_max_retry(RETRY_NUM);
CallMethod(&channel, &cntl, &req, &res, async);
EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode());
EXPECT_EQ(short_connection ? ECONNREFUSED : EHOSTDOWN, cntl.ErrorCode());
EXPECT_EQ(RETRY_NUM, cntl.retried_count());
}

Expand Down Expand Up @@ -1824,6 +1824,39 @@ class ChannelTest : public ::testing::Test{
MyEchoService _svc;
};

void TestBlockServer(bool single_server, bool short_connection, const char* lb) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestBlockServer 应该 是ChannelTest 的成员函数。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestBlockServer 应该 是ChannelTest 的成员函数。

哦哦 知道了

std::cout << " *** single=" << single_server
<< " short=" << short_connection
<< " lb=" << lb << std::endl;

brpc::Channel channel;
brpc::ChannelOptions opt;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
} else {
opt.connection_type = brpc::CONNECTION_TYPE_POOLED;
}
opt.max_retry = 0;
if (single_server) {
EXPECT_EQ(0, channel.Init("127.0.0.1:53829", &opt));
} else {
EXPECT_EQ(0, channel.Init("list://127.0.0.1:53829,127.0.0.1:53830", lb, &opt));
}

const int RETRY_NUM = 3;
test::EchoRequest req;
test::EchoResponse res;
brpc::Controller cntl;
req.set_message(__FUNCTION__);

cntl.set_max_retry(RETRY_NUM);
cntl.set_timeout_ms(10); // 10ms
cntl.set_request_code(1);
CallMethod(&channel, &cntl, &req, &res, false);
EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText();
EXPECT_EQ(RETRY_NUM, cntl.retried_count());
}

class MyShared : public brpc::SharedObject {
public:
MyShared() { ++ nctor; }
Expand Down Expand Up @@ -2466,6 +2499,17 @@ TEST_F(ChannelTest, retry_other_servers) {
}
}

TEST_F(ChannelTest, test_block_server) {
const char* lbs[] = {"rr", "random", "la", "c_md5"};
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag ShortConnection
for (int k = 0; k < 4; ++k) { // Flag LB
TestBlockServer(i, j, lbs[k]);
}
}
}
}

TEST_F(ChannelTest, multiple_threads_single_channel) {
srand(time(NULL));
ASSERT_EQ(0, StartAccept(_ep));
Expand Down