Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer: support ipv6 and uds endpoint. #11

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

#include <map>
#include <ostream>
#include <regex>
#include <set>
#include <string>
#include <vector>

namespace braft {

typedef std::string GroupId;
Expand Down Expand Up @@ -78,22 +78,47 @@ struct PeerId {

int parse(const std::string& str) {
reset();
char ip_str[64];
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str,
&addr.port, &idx, &value)) {
reset();
// peer_id format:
// endpoint:idx:role
// ^
// |- host:port
// |- ip:port
// |- [ipv6]:port
// |- unix:path/to/sock
// clang-format off
std::regex peerid_reg("((([^:]+)|(\\[.*\\])):[^:]+)(:(\\d)?)?(:(\\d+)?)?");
// ^ ^ ^ ^
// | | | |
// unix,host,ipv4 | idx(6)(opt) |
// ipv6 role(8)(opt)
// clang-format on
std::cmatch m;
auto ret = std::regex_match(str.c_str(), m, peerid_reg);
if (!ret || m.size() != 9) {
return -1;
}
role = (Role)value;
if (role > WITNESS) {

// Using str2endpoint to check endpoint is valid.
std::string enpoint_str = m[1];
if (butil::str2endpoint(enpoint_str.c_str(), &addr) != 0) {
reset();
return -1;
}
if (0 != butil::str2ip(ip_str, &addr.ip)) {
reset();
return -1;

if (m[6].matched) {
// Check idx.
idx = std::stoi(m[6]);
}

// Check role if it existed.
if (m[8].matched) {
role = static_cast<Role>(std::stoi(m[8]));
if (role > WITNESS) {
reset();
return -1;
}
}

return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "braft/snapshot_executor.h"
#include "braft/sync_point.h"
#include "braft/util.h"
#include "butil/endpoint.h"
#include "butil/logging.h"

namespace braft {
Expand Down Expand Up @@ -506,7 +507,8 @@ int NodeImpl::init(const NodeOptions& options) {
_options = options;

// check _server_id
if (butil::IP_ANY == _server_id.addr.ip) {
if (!butil::is_endpoint_extended(_server_id.addr) &&
butil::IP_ANY == _server_id.addr.ip) {
LOG(ERROR) << "Group " << _group_id
<< " Node can't started from IP_ANY";
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/braft/node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ NodeManager::~NodeManager() {}

bool NodeManager::server_exists(butil::EndPoint addr) {
BAIDU_SCOPED_LOCK(_mutex);
if (addr.ip != butil::IP_ANY) {
if (!is_endpoint_extended(addr) && addr.ip != butil::IP_ANY) {
butil::EndPoint any_addr(butil::IP_ANY, addr.port);
if (_addr_set.find(any_addr) != _addr_set.end()) {
return true;
Expand Down
27 changes: 27 additions & 0 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ TEST_F(TestUsageSuits, PeerId) {

PeerId id3("1.2.3.4:1000:0");
LOG(INFO) << "id:" << id3;

// UDS format
PeerId id4;
id4.parse("unix:/path/to/sock:1:1");
LOG(INFO) << "id:" << id4;
ASSERT_EQ(id4.idx, 1);
ASSERT_EQ(id4.role, 1);

PeerId id5;
ASSERT_EQ(id5.parse("invalid:1:1"), -1);

// ipv6 format
PeerId id6;
id6.parse("[::1]:1");
ASSERT_EQ(id6.idx, 0);
ASSERT_EQ(id6.role, 0);
LOG(INFO) << "id:" << id6;

PeerId id7;
id7.parse("[::1]:1:1:1");
ASSERT_EQ(id7.idx, 1);
ASSERT_EQ(id7.role, 1);
LOG(INFO) << "id:" << id7;

PeerId id8;
ASSERT_EQ(id8.parse("[:::1:1"), -1);
ASSERT_EQ(id8.parse("::]:1:1"), -1);
}

TEST_F(TestUsageSuits, Configuration) {
Expand Down
32 changes: 32 additions & 0 deletions test/test_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sys/types.h>

#include <algorithm>
#include <string>
#include <vector>

#include "../test/util.h"
Expand Down Expand Up @@ -61,6 +62,7 @@ class NodeTest : public testing::TestWithParam<const char*> {
}
LOG(INFO) << "Start unitests: " << GetParam();
::system("rm -rf data");
::system("mkdir data");
ASSERT_EQ(0, braft::g_num_nodes.get_value());
}
void TearDown() {
Expand Down Expand Up @@ -114,6 +116,36 @@ TEST_P(NodeTest, Server) {
server2.Start("0.0.0.0:5007", NULL);
}

TEST_P(NodeTest, UDSNode) {
braft::PeerId peer0("unix:data/0.sock");
braft::PeerId peer1("unix:data/1.sock");
braft::PeerId peer2("unix:data/2.sock");
std::vector<braft::PeerId> peers = {peer0, peer1, peer2};
Cluster cluster("unittest", peers);

for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}
cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, IPV6Node) {
std::vector<braft::PeerId> peers;
braft::PeerId peer0("[::1]:5006");
braft::PeerId peer1("[::1]:5007");
braft::PeerId peer2("[::1]:5008");
peers = {peer0, peer1, peer2};

Cluster cluster("unittest", peers);
for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}

cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, SingleNode) {
brpc::Server server;
int ret = braft::add_service(&server, 5006);
Expand Down
24 changes: 16 additions & 8 deletions test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#define PUBLIC_RAFT_TEST_UTIL_H

#include <gflags/gflags.h>
#include <algorithm>

#include "_deps/brpc/src/src/butil/endpoint.h"
#include "brpc/server.h"
#include "butil/time.h"
#include "braft/enum.pb.h"
#include "braft/errno.pb.h"
Expand Down Expand Up @@ -252,9 +255,11 @@ class Cluster {
int snapshot_interval_s = 30,
braft::Closure* leader_start_closure = NULL, bool witness = false) {
if (_server_map[listen_addr] == NULL) {
brpc::ServerOptions server_options;
server_options.server_info_name = butil::endpoint2str(listen_addr).c_str();
brpc::Server* server = new brpc::Server();
if (braft::add_service(server, listen_addr) != 0
|| server->Start(listen_addr, NULL) != 0) {
|| server->Start(listen_addr, &server_options) != 0) {
LOG(ERROR) << "Fail to start raft service";
delete server;
return -1;
Expand All @@ -276,12 +281,15 @@ class Cluster {
}
options.fsm = fsm;
options.node_owns_fsm = true;
std::string endpoint_str = butil::endpoint2str(listen_addr).c_str();
butil::string_printf(&options.log_uri, "local://./data/%s/log",
butil::endpoint2str(listen_addr).c_str());
butil::string_printf(&options.raft_meta_uri, "local://./data/%s/raft_meta",
butil::endpoint2str(listen_addr).c_str());
butil::string_printf(&options.snapshot_uri, "local://./data/%s/snapshot",
butil::endpoint2str(listen_addr).c_str());
endpoint_str.c_str());
butil::string_printf(&options.raft_meta_uri,
"local://./data/%s/raft_meta",
endpoint_str.c_str());
butil::string_printf(&options.snapshot_uri,
"local://./data/%s/snapshot",
endpoint_str.c_str());

options.snapshot_throttle = &_throttle;

Expand Down Expand Up @@ -388,7 +396,7 @@ class Cluster {

// return true if there is a leader, false when reach timeout.
void wait_leader(int64_t timeout_ms = 100 * 1000 /*100 seconds*/) {
int64_t deadline = butil::timespec_to_microseconds(
int64_t deadline = butil::timespec_to_milliseconds(
butil::milliseconds_from_now(timeout_ms));

while (butil::gettimeofday_ms() < deadline) {
Expand Down Expand Up @@ -521,7 +529,7 @@ class Cluster {
braft::Node* node = NULL;
std::vector<braft::Node*> new_nodes;
for (size_t i = 0; i < _nodes.size(); i++) {
if (addr.port == _nodes[i]->node_id().peer_id.addr.port) {
if (addr == _nodes[i]->node_id().peer_id.addr) {
node = _nodes[i];
} else {
new_nodes.push_back(_nodes[i]);
Expand Down
Loading