diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index c9aa200e1d6656..b51e263d86bf07 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -71,7 +71,8 @@ Status RowIDFetcher::init() { if (!client) { LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host << ", port=" << node_info.brpc_port; - return Status::InternalError("RowIDFetcher failed to init rpc client"); + return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}", + node_info.host, node_info.brpc_port); } _stubs.push_back(client); } diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1c0cdffc0f56f8..4e9a12b5bc5fea 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1100,9 +1100,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt std::shared_ptr stub( _state->exec_env->brpc_internal_client_cache()->get_client(addr)); if (!stub) { - std::string msg = - fmt::format("Get rpc stub failed, host={}, port=", addr.hostname, addr.port); - return Status::InternalError(msg); + return Status::InternalError("Get rpc stub failed, host={}, port={}", addr.hostname, + addr.port); } auto request = std::make_shared(); @@ -1135,7 +1134,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo _state->exec_env->brpc_internal_client_cache()->get_client(*addr)); if (!stub) { return Status::InternalError( - fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port)); + fmt::format("Get rpc stub failed, host={}, port={}", addr->hostname, addr->port)); } auto merge_filter_request = std::make_shared(); diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index 393bfb99f7bdc7..8257a8182a09ce 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -175,7 +175,7 @@ Status SingleReplicaCompaction::_get_rowset_verisons_from_peer( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host, addr.brpc_port); if (stub == nullptr) { - return Status::Aborted("get rpc stub failed"); + return Status::Aborted("get rpc stub failed, host={}, port={}", addr.host, addr.brpc_port); } brpc::Controller cntl; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 5b61cc87361930..24baf9b6c971e4 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -342,10 +342,17 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz cnt_val->global_size += request->filter_size(); cnt_val->source_addrs.push_back(request->source_addr()); + Status st = Status::OK(); if (cnt_val->source_addrs.size() == cnt_val->producer_size) { for (auto addr : cnt_val->source_addrs) { std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init rpc to " << addr.hostname() << ":" << addr.port(); + st = Status::InternalError("Failed to init rpc to {}:{}", addr.hostname(), + addr.port()); + continue; + } auto closure = AutoReleaseClosure>:: @@ -365,7 +372,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz closure.release(); } } - return Status::OK(); + return st; } Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { @@ -395,6 +402,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ int64_t start_merge = MonotonicMillis(); auto filter_id = request->filter_id(); std::map::iterator iter; + Status st = Status::OK(); { std::shared_lock guard(_filter_map_mutex); iter = _filter_map.find(filter_id); @@ -587,7 +595,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ } } } - return Status::OK(); + return st; } Status RuntimeFilterMergeController::acquire( diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0801f30fb2e265..13faed18e61d10 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1110,6 +1110,11 @@ void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( host, brpc_port)); + if (stub == nullptr) { + LOG(WARNING) << "Failed to init rpc to " << host << ":" << brpc_port; + st = Status::InternalError("Failed to init rpc to {}:{}", host, brpc_port); + continue; + } rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id(); rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms); stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request, diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 09c92fb398e085..24bd284f302fb9 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -83,33 +83,47 @@ class BrpcClientCache { } std::shared_ptr get_client(const std::string& host, int port) { - std::string realhost; - realhost = host; - if (!is_valid_ip(host)) { - Status status = ExecEnv::GetInstance()->dns_cache()->get(host, &realhost); + std::string realhost = host; + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(host)) { + Status status = dns_cache->get(host, &realhost); if (!status.ok()) { LOG(WARNING) << "failed to get ip from host:" << status.to_string(); return nullptr; } } std::string host_port = get_host_port(realhost, port); - return get_client(host_port); - } - - std::shared_ptr get_client(const std::string& host_port) { std::shared_ptr stub_ptr; auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; }; if (LIKELY(_stub_map.if_contains(host_port, get_value))) { + DCHECK(stub_ptr != nullptr); return stub_ptr; } // new one stub and insert into map auto stub = get_new_client_no_cache(host_port); - _stub_map.try_emplace_l( - host_port, [&stub](const auto& v) { stub = v.second; }, stub); + if (stub != nullptr) { + _stub_map.try_emplace_l( + host_port, [&stub](const auto& v) { stub = v.second; }, stub); + } return stub; } + std::shared_ptr get_client(const std::string& host_port) { + int pos = host_port.rfind(':'); + std::string host = host_port.substr(0, pos); + int port = 0; + try { + port = stoi(host_port.substr(pos + 1)); + } catch (const std::exception& err) { + LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what(); + return nullptr; + } + return get_client(host, port); + } + std::shared_ptr get_new_client_no_cache(const std::string& host_port, const std::string& protocol = "", const std::string& connection_type = "", @@ -143,6 +157,7 @@ class BrpcClientCache { channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options); } if (ret_code) { + LOG(WARNING) << "Failed to initialize brpc Channel to " << host_port; return nullptr; } return std::make_shared(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL); diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index f77a1f637f3762..c1994d1feeb39d 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -71,11 +71,26 @@ Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr closure TNetworkAddress brpc_dest_addr) { RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure)); + std::string host = brpc_dest_addr.hostname; + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(brpc_dest_addr.hostname)) { + Status status = dns_cache->get(brpc_dest_addr.hostname, &host); + if (!status.ok()) { + LOG(WARNING) << "failed to get ip from host " << brpc_dest_addr.hostname << ": " + << status.to_string(); + return Status::InternalError("failed to get ip from host {}", brpc_dest_addr.hostname); + } + } //format an ipv6 address std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port); std::shared_ptr brpc_http_stub = exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); + if (brpc_http_stub == nullptr) { + return Status::InternalError("failed to open brpc http client to {}", brpc_url); + } closure->cntl_->http_request().uri() = brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST); diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index ba171ffbbc99ec..c27383dac62b08 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -46,6 +46,11 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnN size_t result, size_t input_rows_count) { PFunctionCallRequest request; PFunctionCallResponse response; + if (_client == nullptr) { + return Status::InternalError( + "call to rpc function {} failed: init rpc error, server addr = {}", _signature, + _server_addr); + } request.set_function_name(_function_name); RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, input_rows_count, &request)); brpc::Controller cntl; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index e899486e854d18..e29d64118b9fa9 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -152,7 +152,6 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, return _init_st; } _dst_id = node_info.id; - std::string host_port = get_host_port(node_info.host, node_info.brpc_port); brpc::StreamOptions opt; opt.max_buf_size = config::load_stream_max_buf_size; opt.idle_timeout_ms = idle_timeout_ms; @@ -185,7 +184,11 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } POpenLoadStreamResponse response; // set connection_group "streaming" to distinguish with non-streaming connections - const auto& stub = client_cache->get_client(host_port); + const auto& stub = client_cache->get_client(node_info.host, node_info.brpc_port); + if (stub == nullptr) { + return Status::InternalError("failed to init brpc client to {}:{}", node_info.host, + node_info.brpc_port); + } stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); @@ -200,7 +203,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, cntl.ErrorText()); return _init_st; } - LOG(INFO) << "open load stream to " << host_port << ", " << *this; + LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port + << ", " << *this; _is_init.store(true); return Status::OK(); } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 1e6b8f7b8687b6..e946a73bfedd70 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -700,11 +700,30 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { return; } + std::string host = _node_info.host; + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(_node_info.host)) { + Status status = dns_cache->get(_node_info.host, &host); + if (!status.ok()) { + LOG(WARNING) << "failed to get ip from host " << _node_info.host << ": " + << status.to_string(); + _send_block_callback->clear_in_flight(); + return; + } + } //format an ipv6 address - std::string brpc_url = get_brpc_http_url(_node_info.host, _node_info.brpc_port); + std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port); std::shared_ptr _brpc_http_stub = _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); + if (_brpc_http_stub == nullptr) { + cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(), + brpc_url)); + _send_block_callback->clear_in_flight(); + return; + } _send_block_callback->cntl_->http_request().uri() = brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; _send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);