Skip to content

Commit

Permalink
Support connect on socket create (#2574)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Jun 13, 2024
1 parent 0f5e102 commit 15469ed
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/brpc/rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ class RtmpSocketCreator : public SocketCreator {
: _connect_options(connect_options) {
}

int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.app_connect = std::make_shared<RtmpConnect>();
sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
Expand Down
20 changes: 12 additions & 8 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ friend class SubDone;
ChannelBalancer::~ChannelBalancer() {
for (ChannelToIdMap::iterator
it = _chan_map.begin(); it != _chan_map.end(); ++it) {
SocketUniquePtr ptr(it->second); // Dereference
it->second->ReleaseAdditionalReference();
it->second->ReleaseHCRelatedReference();
}
_chan_map.clear();
}
Expand Down Expand Up @@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
return -1;
}
SocketUniquePtr ptr;
CHECK_EQ(0, Socket::Address(sock_id, &ptr));
int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
LOG(FATAL) << "Fail to address SocketId=" << sock_id;
return -1;
}
if (!AddServer(ServerId(sock_id))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
_chan_map[sub_channel]= ptr.release(); // Add reference.
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
if (handle) {
*handle = sock_id;
}
Expand All @@ -223,13 +229,11 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
ptr->ReleaseAdditionalReference();
}
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
}
}

Expand Down
65 changes: 51 additions & 14 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ int Socket::ResetFileDescriptor(int fd) {
if (!ValidFileDescriptor(fd)) {
return 0;
}
if (_remote_side == butil::EndPoint()) {
// OK to fail, non-socket fd does not support this.
butil::get_remote_side(fd, &_remote_side);
}
// OK to fail, non-socket fd does not support this.
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
Expand Down Expand Up @@ -781,6 +785,19 @@ int Socket::OnCreated(const SocketOptions& options) {
_keepalive_options = options.keepalive_options;
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
_is_write_shutdown = false;
int fd = options.fd;
if (!ValidFileDescriptor(fd) && options.connect_on_create) {
// Connect on create.
fd = DoConnect(options.connect_abstime, NULL, NULL);
if (fd < 0) {
PLOG(ERROR) << "Fail to connect to " << options.remote_side;
int error_code = errno != 0 ? errno : EHOSTDOWN;
SetFailed(error_code, "Fail to connect to %s: %s",
butil::endpoint2str(options.remote_side).c_str(),
berror(error_code));
return -1;
}
}
// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
if (ResetFileDescriptor(options.fd) != 0) {
Expand All @@ -790,6 +807,7 @@ int Socket::OnCreated(const SocketOptions& options) {
berror(saved_errno));
return -1;
}
HoldHCRelatedRef();
guard.dismiss();

return 0;
Expand Down Expand Up @@ -940,6 +958,20 @@ std::string Socket::OnDescription() const {
return result;
}

void Socket::HoldHCRelatedRef() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = true;
AddReference();
}
}

void Socket::ReleaseHCRelatedReference() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = false;
Dereference();
}
}

int Socket::WaitAndReset(int32_t expected_nref) {
const uint32_t id_ver = VersionOfVRefId(id());
uint64_t vref;
Expand Down Expand Up @@ -1350,16 +1382,27 @@ int Socket::CheckConnected(int sockfd) {
return -1;
}

butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG_IF(INFO, FLAGS_log_connected)
<< "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
if (FLAGS_log_connected) {
butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG(INFO) << "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
}

// Doing SSL handshake after TCP connected
return SSLHandshake(sockfd, false);
}

int Socket::DoConnect(const timespec* abstime,
int (*on_connect)(int, int, void*), void* data) {
if (_conn) {
return _conn->Connect(this, abstime, on_connect, data);
} else {
return Connect(abstime, on_connect, data);
}
}

int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
if (_fd.load(butil::memory_order_consume) >= 0) {
return 0;
Expand All @@ -1370,14 +1413,8 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
SocketUniquePtr s;
ReAddress(&s);
req->set_socket(s.get());
if (_conn) {
if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
} else {
if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
s.release();
return 1;
Expand Down
27 changes: 20 additions & 7 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd;
butil::EndPoint remote_side;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
// Default: false, means that a connection will be established
// on first write.
bool connect_on_create;
// Default: NULL, means no timeout.
const timespec* connect_abstime;
SocketUser* user;
// When *edge-triggered* events happen on the file descriptor, callback
// `on_edge_triggered_events' will be called. Inside the callback, user
Expand Down Expand Up @@ -409,16 +417,15 @@ friend void DereferenceSocket(Socket*);

// True if health checking is enabled.
bool HCEnabled() const {
// This fence makes sure that we see change of
// `_is_hc_related_ref_held' before changing `_versioned_ref.
butil::atomic_thread_fence(butil::memory_order_acquire);
return _health_check_interval_s > 0 && _is_hc_related_ref_held;
}

// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }
// Release the health-checking-related
// reference which is held on created.
void ReleaseHCRelatedReference();

// After health checking is complete, set _hc_started to false.
void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); }
Expand Down Expand Up @@ -665,6 +672,9 @@ friend void DereferenceSocket(Socket*);

std::string OnDescription() const;

// Hold the health-checking-related
// reference on created.
void HoldHCRelatedRef();

static int Status(SocketId, int32_t* nref = NULL); // for unit-test.

Expand Down Expand Up @@ -699,8 +709,11 @@ friend void DereferenceSocket(Socket*);
// starting a connection request and `on_connect' will be called
// when connecting completes (whether it succeeds or not)
// Returns the socket fd on success, -1 otherwise
int DoConnect(const timespec* abstime,
int (*on_connect)(int fd, int err, void* data), void* data);
int Connect(const timespec* abstime,
int (*on_connect)(int fd, int err, void* data), void* data);

int CheckConnected(int sockfd);

// [Not thread-safe] Only used by `Write'.
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace brpc {

inline SocketOptions::SocketOptions()
: fd(-1)
, connect_on_create(false)
, connect_abstime(NULL)
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
Expand Down
17 changes: 9 additions & 8 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map = BUTIL_STATIC_ATOMIC_INIT(

class GlobalSocketCreator : public SocketCreator {
public:
int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.health_check_interval_s = FLAGS_health_check_interval;
return get_client_side_messenger()->Create(sock_opt, id);
Expand Down Expand Up @@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
sc->socket->ReleaseHCRelatedReference();
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
Expand All @@ -258,12 +257,15 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
SocketUniquePtr ptr;
if (Socket::Address(tmp_id, &ptr) != 0) {
int rc = Socket::AddressFailedAsWell(tmp_id, &ptr);
if (rc < 0) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
} else if (rc > 0 && !ptr->HCEnabled()) {
LOG(FATAL) << "Failed socket is not HC-enabled";
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
SingleConnection new_sc = { 1, ptr.get(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
Expand Down Expand Up @@ -301,8 +303,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
s->ReleaseHCRelatedReference();
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/versioned_ref_with_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ friend void DereferenceVersionedRefWithId<>(T* r);
// it will be recycled automatically and T::BeforeRecycled() will be called.
int Dereference();

// Increase the reference count by 1.
void AddReference() {
_versioned_ref.fetch_add(1, butil::memory_order_release);
}

// Make this socket addressable again.
// If nref is less than `at_least_nref', VersionedRefWithId was
// abandoned during revival and cannot be revived.
Expand Down
21 changes: 12 additions & 9 deletions test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
#endif
ASSERT_TRUE(src.empty());
ASSERT_EQ(-1, s->fd());
s->ReleaseHCRelatedReference();
}
// StartHealthCheck is possibly still running. Spin until global_sock
// is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
Expand Down Expand Up @@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) {
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));

s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
brpc::Socket* s = NULL;
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
s = ptr.get();
}
global_sock = s;
ASSERT_NE(nullptr, s);
ASSERT_EQ(-1, s->fd());
ASSERT_EQ(point, s->remote_side());
ASSERT_EQ(id, s->id());
Expand Down Expand Up @@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) {
ASSERT_NE(0, ptr->fd());
}

s.release()->Dereference();
s->ReleaseHCRelatedReference();

// Must stop messenger before SetFailed the id otherwise StartHealthCheck
// still has chance to get reconnected and revive the id.
Expand All @@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) {
bthread_usleep(1000);
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
}
ASSERT_EQ(-1, brpc::Socket::Status(id));
nref = 0;
ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref;
// The id is invalid.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
Expand Down
Loading

0 comments on commit 15469ed

Please sign in to comment.