Skip to content

Commit

Permalink
[feature](cloud) Add priority network support for meta-service regist…
Browse files Browse the repository at this point in the history
…ry (#33931)
  • Loading branch information
gavinchou authored and dataroaring committed Apr 22, 2024
1 parent fa53713 commit 65d8959
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 7 deletions.
1 change: 1 addition & 0 deletions cloud/src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ set(COMMON_FILES
encryption_util.cpp
metric.cpp
kms.cpp
network_util.cpp
)

if (USE_JEMALLOC)
Expand Down
8 changes: 8 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,12 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");

CONF_mBool(enable_distinguish_hdfs_path, "true");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
// e.g. 10.10.10.0/24
// e.g. 10.10.10.0/24;192.168.0.1/24
// If no IP match this rule, a random IP is used (usually it is the IP binded to hostname).
CONF_String(priority_networks, "");

} // namespace doris::cloud::config
238 changes: 238 additions & 0 deletions cloud/src/common/network_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "common/network_util.h"

#include <arpa/inet.h>
#include <butil/endpoint.h>
#include <butil/strings/string_split.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>

#include <sstream>
#include <vector>

#include "common/logging.h"

namespace doris::cloud {

class CIDR {
public:
CIDR() : address_(0), netmask_(0xffffffff) {}
bool reset(const std::string& cidr_str) {
address_ = 0;
netmask_ = 0xffffffff;

// check if have mask
std::string cidr_format_str = cidr_str;
int32_t have_mask = cidr_str.find("/");
if (have_mask == -1) {
cidr_format_str.assign(cidr_str + "/32");
}
VLOG_DEBUG << "cidr format str: " << cidr_format_str;

std::vector<std::string> cidr_items;
butil::SplitString(cidr_format_str, '/', &cidr_items);
if (cidr_items.size() != 2) {
LOG(WARNING) << "wrong CIDR format. network=" << cidr_str;
return false;
}

if (cidr_items[1].empty()) {
LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str;
return false;
}

char* endptr = nullptr;
int32_t mask_length = strtol(cidr_items[1].c_str(), &endptr, 10);
if (errno != 0 && mask_length == 0) {
char errmsg[64];
// Ignore unused return value
auto ret = strerror_r(errno, errmsg, 64);
LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
<< ", mask_length=" << mask_length << ", errno=" << errno
<< ", errmsg=" << errmsg << ", strerror_r returns=" << ret;
return false;
}
if (mask_length <= 0 || mask_length > 32) {
LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
<< ", mask_length=" << mask_length;
return false;
}

uint32_t address = 0;
if (!ip_to_int(cidr_items[0], &address)) {
LOG(WARNING) << "wrong CIDR IP value. network=" << cidr_str;
return false;
}
address_ = address;

netmask_ = 0xffffffff;
netmask_ = netmask_ << (32 - mask_length);
return true;
}
bool contains(const std::string& ip) {
uint32_t ip_int = 0;
if (!ip_to_int(ip, &ip_int)) {
return false;
}
if ((address_ & netmask_) == (ip_int & netmask_)) {
return true;
}
return false;
}

private:
bool ip_to_int(const std::string& ip_str, uint32_t* value) {
struct in_addr addr;
int flag = inet_aton(ip_str.c_str(), &addr);
if (flag == 0) {
return false;
}
*value = ntohl(addr.s_addr);
return true;
}

uint32_t address_;
uint32_t netmask_;
};

class InetAddress {
public:
InetAddress(struct sockaddr* addr) { this->addr_ = *(struct sockaddr_in*)addr; }
bool is_address_v4() const { return addr_.sin_family == AF_INET; }
bool is_loopback_v4() const {
in_addr_t s_addr = addr_.sin_addr.s_addr;
return (ntohl(s_addr) & 0xFF000000) == 0x7F000000;
}
std::string get_host_address_v4() {
char addr_buf[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(addr_.sin_addr), addr_buf, INET_ADDRSTRLEN);
return std::string(addr_buf);
}

private:
struct sockaddr_in addr_;
};

static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
ifaddrs* if_addrs = nullptr;
if (getifaddrs(&if_addrs)) {
std::stringstream ss;
char buf[64];
LOG(FATAL) << "getifaddrs failed because " << strerror_r(errno, buf, sizeof(buf));
return false;
}

for (ifaddrs* if_addr = if_addrs; if_addr != nullptr; if_addr = if_addr->ifa_next) {
if (!if_addr->ifa_addr) {
continue;
}
if (if_addr->ifa_addr->sa_family == AF_INET) { // check it is IP4
// is a valid IP4 Address
hosts->emplace_back(if_addr->ifa_addr);
}
}

if (if_addrs != nullptr) {
freeifaddrs(if_addrs);
}

return true;
}

std::string get_local_ip(const std::string& priority_networks) {
std::string localhost_str = butil::my_ip_cstr();
if (priority_networks == "") {
LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << localhost_str;
return localhost_str;
}
std::vector<CIDR> priority_cidrs;
LOG(INFO) << "priority CIDRs: " << priority_networks;
std::vector<std::string> cidr_strs;
butil::SplitString(priority_networks, ';', &cidr_strs);
for (auto& cidr_str : cidr_strs) {
CIDR cidr;
if (!cidr.reset(cidr_str)) {
LOG(FATAL) << "wrong cidr format. cidr_str=" << cidr_str;
return localhost_str;
}
priority_cidrs.push_back(cidr);
}

std::vector<InetAddress> hosts;
if (!get_hosts_v4(&hosts)) {
LOG(FATAL) << "failed to getifaddrs";
return localhost_str;
}

if (hosts.empty()) {
LOG(FATAL) << "failed to get host";
return localhost_str;
}

auto is_in_prior_network = [&priority_cidrs](const std::string& ip) {
for (auto& cidr : priority_cidrs) {
if (cidr.contains(ip)) {
return true;
}
}
return false;
};

std::string loopback;
localhost_str = "";
for (auto addr_it = hosts.begin(); addr_it != hosts.end(); ++addr_it) {
if ((*addr_it).is_address_v4()) {
VLOG_DEBUG << "check ip=" << addr_it->get_host_address_v4();
if ((*addr_it).is_loopback_v4()) {
loopback = addr_it->get_host_address_v4();
} else if (!priority_cidrs.empty()) {
if (is_in_prior_network(addr_it->get_host_address_v4())) {
localhost_str = addr_it->get_host_address_v4();

break;
}
} else {
localhost_str = addr_it->get_host_address_v4();
break;
}
}
}
if (!localhost_str.empty()) {
LOG(INFO) << "local host ip=" << localhost_str;
return localhost_str;
}

if (!loopback.empty()) {
localhost_str = loopback;
LOG(WARNING) << "fail to find one valid non-loopback address, use loopback address: "
<< localhost_str;
} else {
localhost_str = butil::my_ip_cstr();
LOG(WARNING)
<< "fail to find valid address of priority cidrs in conf, use butil::my_ip_cstr(): "
<< localhost_str;
}

return localhost_str;
}

} // namespace doris::cloud
34 changes: 34 additions & 0 deletions cloud/src/common/network_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

namespace doris::cloud {

/**
* Gets local IP with give CIDR mask.
* * If the priority_networks is empty, the IP binded to hostname is returned
* * If no IP fit the priority_networks CIDR mask, loop back IP is returned
*
* @param priority_networks CIDR mask, can be empty
* @return the IP string
*/
std::string get_local_ip(const std::string& priority_networks);

} // namespace doris::cloud
3 changes: 2 additions & 1 deletion cloud/src/meta-service/meta_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "common/config.h"
#include "common/metric.h"
#include "common/network_util.h"
#include "common/sync_point.h"
#include "common/util.h"
#include "meta-service/keys.h"
Expand Down Expand Up @@ -96,7 +97,7 @@ void MetaServer::stop() {
void MetaServerRegister::prepare_registry(ServiceRegistryPB* reg) {
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
std::string ip = butil::my_ip_cstr();
std::string ip = get_local_ip(config::priority_networks);
int32_t port = config::brpc_listen_port;
std::string id = ip + ":" + std::to_string(port);
ServiceRegistryPB::Item item;
Expand Down
10 changes: 4 additions & 6 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "common/encryption_util.h"
#include "common/logging.h"
#include "common/network_util.h"
#include "common/string_util.h"
#include "common/sync_point.h"
#include "meta-service/keys.h"
Expand Down Expand Up @@ -3270,12 +3271,9 @@ void notify_refresh_instance(std::shared_ptr<TxnKv> txn_kv, const std::string& i
<< " err=" << err;
return;
}
std::string self_endpoint;
if (config::hostname.empty()) {
self_endpoint = fmt::format("{}:{}", butil::my_ip_cstr(), config::brpc_listen_port);
} else {
self_endpoint = fmt::format("{}:{}", config::hostname, config::brpc_listen_port);
}
std::string self_endpoint =
config::hostname.empty() ? get_local_ip(config::priority_networks) : config::hostname;
self_endpoint = fmt::format("{}:{}", self_endpoint, config::brpc_listen_port);
ServiceRegistryPB reg;
reg.ParseFromString(val);

Expand Down
4 changes: 4 additions & 0 deletions cloud/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ add_executable(hdfs_accessor_test hdfs_accessor_test.cpp)

add_executable(stopwatch_test stopwatch_test.cpp)

add_executable(network_util_test network_util_test.cpp)

message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
target_link_libraries(sync_point_test ${TEST_LINK_LIBS})

Expand Down Expand Up @@ -78,6 +80,8 @@ target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS})

target_link_libraries(stopwatch_test ${TEST_LINK_LIBS})

target_link_libraries(network_util_test ${TEST_LINK_LIBS})

# FDB related tests need to be linked with libfdb_c
set(FDB_LINKER_FLAGS "-lfdb_c -L${THIRDPARTY_DIR}/lib")

Expand Down
Loading

0 comments on commit 65d8959

Please sign in to comment.