Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](brpc_client_cache) resolve hostname in DNS cache before passing to brpc #40074

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,19 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,

void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port);
std::string host = request.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(request.host)) {
Status status = dns_cache->get(request.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << request.host << ": "
<< status.to_string();
return;
}
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
Status st = Status::OK();
TStatus t_status;
std::shared_ptr<PBackendService_Stub> brpc_stub =
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status RowIDFetcher::init() {
if (!client) {
LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
<< ", port=" << node_info.brpc_port;
return Status::InternalError("RowIDFetcher failed to init rpc client");
return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
node_info.host, node_info.brpc_port);
}
_stubs.push_back(client);
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,9 +1123,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(addr));
if (!stub) {
std::string msg =
fmt::format("Get rpc stub failed, host={}, port=", addr.hostname, addr.port);
return Status::InternalError(msg);
return Status::InternalError("Get rpc stub failed, host={}, port={}", addr.hostname,
addr.port);
}

auto request = std::make_shared<PSendFilterSizeRequest>();
Expand Down Expand Up @@ -1158,7 +1157,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
if (!stub) {
return Status::InternalError(
fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port));
fmt::format("Get rpc stub failed, host={}, port={}", addr->hostname, addr->port));
}

auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Status SingleReplicaCompaction::_get_rowset_verisons_from_peer(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
addr.brpc_port);
if (stub == nullptr) {
return Status::Aborted("get rpc stub failed");
return Status::Aborted("get rpc stub failed, host={}, port={}", addr.host, addr.brpc_port);
}

brpc::Controller cntl;
Expand Down
18 changes: 16 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,17 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
cnt_val->global_size += request->filter_size();
cnt_val->source_addrs.push_back(request->source_addr());

Status st = Status::OK();
if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to " << addr.hostname() << ":" << addr.port();
st = Status::InternalError("Failed to init rpc to {}:{}", addr.hostname(),
addr.port());
continue;
}

auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
Expand All @@ -347,7 +354,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
closure.release();
}
}
return Status::OK();
return st;
}

Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
Expand Down Expand Up @@ -376,6 +383,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
int64_t start_merge = MonotonicMillis();
auto filter_id = request->filter_id();
std::map<int, CntlValwithLock>::iterator iter;
Status st = Status::OK();
{
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
Expand Down Expand Up @@ -460,14 +468,20 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
target.target_fragment_instance_addr));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to "
<< target.target_fragment_instance_addr.hostname << ":"
<< target.target_fragment_instance_addr.port;
st = Status::InternalError("Failed to init rpc to {}:{}",
target.target_fragment_instance_addr.hostname,
target.target_fragment_instance_addr.port);
continue;
}
stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(), closure.get());
closure.release();
}
}
return Status::OK();
return st;
}

Status RuntimeFilterMergeController::acquire(
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,11 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
host, brpc_port));
if (stub == nullptr) {
LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port;
st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port);
continue;
}
rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request,
Expand Down
35 changes: 25 additions & 10 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,47 @@ class BrpcClientCache {
}

std::shared_ptr<T> get_client(const std::string& host, int port) {
std::string realhost;
realhost = host;
if (!is_valid_ip(host)) {
Status status = ExecEnv::GetInstance()->dns_cache()->get(host, &realhost);
std::string realhost = host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(host)) {
Status status = dns_cache->get(host, &realhost);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host:" << status.to_string();
return nullptr;
}
}
std::string host_port = get_host_port(realhost, port);
return get_client(host_port);
}

std::shared_ptr<T> get_client(const std::string& host_port) {
std::shared_ptr<T> stub_ptr;
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
DCHECK(stub_ptr != nullptr);
return stub_ptr;
}

// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
if (stub != nullptr) {
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
}
return stub;
}

std::shared_ptr<T> get_client(const std::string& host_port) {
int pos = host_port.rfind(':');
std::string host = host_port.substr(0, pos);
int port = 0;
try {
port = stoi(host_port.substr(pos + 1));
} catch (const std::exception& err) {
LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what();
return nullptr;
}
return get_client(host, port);
}

std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol = "",
const std::string& connection_type = "",
Expand Down Expand Up @@ -143,6 +157,7 @@ class BrpcClientCache {
channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
}
if (ret_code) {
LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port;
return nullptr;
}
return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
Expand Down
15 changes: 15 additions & 0 deletions be/src/util/proto_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,26 @@ Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr<Closure> closure
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure));

std::string host = brpc_dest_addr.hostname;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(brpc_dest_addr.hostname)) {
Status status = dns_cache->get(brpc_dest_addr.hostname, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << brpc_dest_addr.hostname << ": "
<< status.to_string();
return Status::InternalError("failed to get ip from host {}", brpc_dest_addr.hostname);
}
}
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port);

std::shared_ptr<PBackendService_Stub> brpc_http_stub =
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http");
if (brpc_http_stub == nullptr) {
return Status::InternalError("failed to open brpc http client to {}", brpc_url);
}
closure->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/functions/function_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnN
size_t result, size_t input_rows_count) {
PFunctionCallRequest request;
PFunctionCallResponse response;
if (_client == nullptr) {
return Status::InternalError(
"call to rpc function {} failed: init rpc error, server addr = {}", _signature,
_server_addr);
}
request.set_function_name(_function_name);
RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, input_rows_count, &request));
brpc::Controller cntl;
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return _init_st;
}
_dst_id = node_info.id;
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = idle_timeout_ms;
Expand Down Expand Up @@ -185,7 +184,11 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming connections
const auto& stub = client_cache->get_client(host_port);
const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port);
if (stub == nullptr) {
return Status::InternalError("failed to init brpc client to {}:{}", node_info.host,
node_info.brpc_port);
}
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
Expand All @@ -200,7 +203,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
cntl.ErrorText());
return _init_st;
}
LOG(INFO) << "open load stream to " << host_port << ", " << *this;
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_init.store(true);
return Status::OK();
}
Expand Down
21 changes: 20 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,11 +717,30 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
return;
}

std::string host = _node_info.host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(_node_info.host)) {
Status status = dns_cache->get(_node_info.host, &host);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " << _node_info.host << ": "
<< status.to_string();
_send_block_callback->clear_in_flight();
return;
}
}
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(_node_info.host, _node_info.brpc_port);
std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
if (_brpc_http_stub == nullptr) {
cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(),
brpc_url));
_send_block_callback->clear_in_flight();
return;
}
_send_block_callback->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http";
_send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
Expand Down
Loading