diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 492a884f5f9c5e..63a535f0589931 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1146,6 +1146,10 @@ DEFINE_mString(ca_cert_file_paths, "/etc/pki/tls/certs/ca-bundle.crt;/etc/ssl/certs/ca-certificates.crt;" "/etc/ssl/ca-bundle.pem"); +// Number of open tries, default 1 means only try to open once. +// Retry the Open num_retries time waiting 100 milliseconds between retries. +DEFINE_mInt32(thrift_client_open_num_tries, "1"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 7af79880fe445d..1947f791cb55de 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1198,6 +1198,10 @@ DECLARE_String(trino_connector_plugin_dir); // the file paths(one or more) of CA cert, splite using ";" aws s3 lib use it to init s3client DECLARE_mString(ca_cert_file_paths); +// Number of open tries, default 1 means only try to open once. +// Retry the Open num_retries time waiting 100 milliseconds between retries. +DECLARE_mInt32(thrift_client_open_num_tries); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index 3da31caf5c8922..ea7b43b6102123 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -114,7 +114,7 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport, client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000); - Status status = client_impl->open(); + Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100); if (!status.ok()) { *client_key = nullptr; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index d66e3688d81831..158424b31741ac 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -72,6 +72,7 @@ class ClientCache; class HeartbeatFlags; class FrontendServiceClient; class FileMetaCache; +class DNSCache; // Execution environment for queries/plan fragments. // Contains all required global structures, and handles to @@ -182,6 +183,7 @@ class ExecEnv { HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } FileMetaCache* file_meta_cache() { return _file_meta_cache; } + DNSCache* dns_cache() { return _dns_cache; } // only for unit test void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } @@ -266,6 +268,8 @@ class ExecEnv { BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; + DNSCache* _dns_cache = nullptr; + RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5ff83652a954b2..0b044ce7fe9258 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -73,6 +73,7 @@ #include "util/bit_util.h" #include "util/brpc_client_cache.h" #include "util/cpu_info.h" +#include "util/dns_cache.h" #include "util/doris_metrics.h" #include "util/mem_info.h" #include "util/metrics.h" @@ -169,6 +170,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { _block_spill_mgr = new BlockSpillManager(_store_paths); _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); + _dns_cache = new DNSCache(); + _spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); _broker_client_cache->init_metrics("broker"); @@ -384,6 +387,8 @@ void ExecEnv::_destroy() { if (!_is_init) { return; } + SAFE_DELETE(_dns_cache); + _deregister_metrics(); SAFE_DELETE(_internal_client_cache); SAFE_DELETE(_function_client_cache); diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index e050ad53cd9aed..6d631c9023e682 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -41,6 +41,8 @@ // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" +#include "runtime/exec_env.h" +#include "util/dns_cache.h" #include "util/network_util.h" namespace doris { @@ -80,7 +82,7 @@ class BrpcClientCache { std::string realhost; realhost = host; if (!is_valid_ip(host)) { - Status status = hostname_to_ip(host, realhost); + Status status = ExecEnv::GetInstance()->dns_cache()->get(host, &realhost); if (!status.ok()) { LOG(WARNING) << "failed to get ip from host:" << status.to_string(); return nullptr; diff --git a/be/src/util/dns_cache.cpp b/be/src/util/dns_cache.cpp new file mode 100644 index 00000000000000..f2bd4ce91e6070 --- /dev/null +++ b/be/src/util/dns_cache.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/dns_cache.h" + +#include "service/backend_options.h" +#include "util/network_util.h" + +namespace doris { + +DNSCache::DNSCache() { + refresh_thread = std::thread(&DNSCache::_refresh_cache, this); + refresh_thread.detach(); +} + +DNSCache::~DNSCache() { + stop_refresh = true; + if (refresh_thread.joinable()) { + refresh_thread.join(); + } +} + +Status DNSCache::get(const std::string& hostname, std::string* ip) { + { + std::shared_lock lock(mutex); + auto it = cache.find(hostname); + if (it != cache.end()) { + *ip = it->second; + return Status::OK(); + } + } + // Update if not found + RETURN_IF_ERROR(_update(hostname)); + { + std::shared_lock lock(mutex); + *ip = cache[hostname]; + return Status::OK(); + } +} + +Status DNSCache::_update(const std::string& hostname) { + std::string real_ip = ""; + RETURN_IF_ERROR(hostname_to_ip(hostname, real_ip, BackendOptions::is_bind_ipv6())); + std::unique_lock lock(mutex); + auto it = cache.find(hostname); + if (it == cache.end() || it->second != real_ip) { + cache[hostname] = real_ip; + LOG(INFO) << "update hostname " << hostname << "'s ip to " << real_ip; + } + return Status::OK(); +} + +void DNSCache::_refresh_cache() { + while (!stop_refresh) { + // refresh every 1 min + std::this_thread::sleep_for(std::chrono::minutes(1)); + std::unordered_set keys; + { + std::shared_lock lock(mutex); + std::transform(cache.begin(), cache.end(), std::inserter(keys, keys.end()), + [](const auto& pair) { return pair.first; }); + } + Status st; + for (auto& key : keys) { + st = _update(key); + } + } +} + +} // end of namespace doris diff --git a/be/src/util/dns_cache.h b/be/src/util/dns_cache.h new file mode 100644 index 00000000000000..5dc413c53e2c32 --- /dev/null +++ b/be/src/util/dns_cache.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" + +namespace doris { + +// Same as +// fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java +class DNSCache { +public: + DNSCache(); + ~DNSCache(); + + // get ip by hostname + Status get(const std::string& hostname, std::string* ip); + +private: + // update the ip of hostname in cache + Status _update(const std::string& hostname); + + // a function for refresh daemon thread + // update cache at fix internal + void _refresh_cache(); + +private: + // hostname -> ip + std::unordered_map cache; + mutable std::shared_mutex mutex; + std::thread refresh_thread; + bool stop_refresh = false; +}; + +} // end of namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e6e70d3ee48320..c4a177d255ec76 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -98,6 +98,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigException; +import org.apache.doris.common.DNSCache; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -475,6 +476,8 @@ public class Env { private HiveTransactionMgr hiveTransactionMgr; + private DNSCache dnsCache; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -688,6 +691,15 @@ private Env(boolean isCheckpointCatalog) { this.binlogManager = new BinlogManager(); this.binlogGcer = new BinlogGcer(); this.columnIdFlusher = new ColumnIdFlushDaemon(); +<<<<<<< HEAD +======= + this.queryCancelWorker = new QueryCancelWorker(systemInfo); + this.topicPublisherThread = new TopicPublisherThread( + "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); + this.mtmvService = new MTMVService(); + this.insertOverwriteManager = new InsertOverwriteManager(); + this.dnsCache = new DNSCache(); +>>>>>>> 39d695c05c ([opt](fqdn) Add DNS Cache for FE and BE (#32869)) } public static void destroyCheckpoint() { @@ -819,6 +831,10 @@ public static HiveTransactionMgr getCurrentHiveTransactionMgr() { return getCurrentEnv().getHiveTransactionMgr(); } + public DNSCache getDnsCache() { + return dnsCache; + } + // Use tryLock to avoid potential dead lock private boolean tryLock(boolean mustLock) { while (true) { @@ -1533,8 +1549,13 @@ private void startMasterOnlyDaemonThreads() { columnIdFlusher.start(); } +<<<<<<< HEAD // start threads that should running on all FE private void startNonMasterDaemonThreads() { +======= + // start threads that should run on all FE + protected void startNonMasterDaemonThreads() { +>>>>>>> 39d695c05c ([opt](fqdn) Add DNS Cache for FE and BE (#32869)) // start load manager thread loadManager.start(); tabletStatMgr.start(); @@ -1544,6 +1565,16 @@ private void startNonMasterDaemonThreads() { getInternalCatalog().getEsRepository().start(); // domain resolver domainResolver.start(); +<<<<<<< HEAD +======= + // fe disk updater + feDiskUpdater.start(); + if (Config.enable_hms_events_incremental_sync) { + metastoreEventsProcessor.start(); + } + + dnsCache.start(); +>>>>>>> 39d695c05c ([opt](fqdn) Add DNS Cache for FE and BE (#32869)) } private void transferToNonMaster(FrontendNodeType newType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java new file mode 100644 index 00000000000000..1fe96eba20f5f9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.common.util.NetUtils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; + +/** + * DNSCache is a class that caches DNS lookups and periodically refreshes them. + * It uses a ConcurrentHashMap to store the hostname to IP address mappings and a ScheduledExecutorService + * to periodically refresh these mappings. + */ +public class DNSCache { + private static final Logger LOG = LogManager.getLogger(DNSCache.class); + + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = ThreadPoolManager.newDaemonScheduledThreadPool(1, + "dns_cache_pool", true); + + /** + * Check if the enable_fqdn_mode configuration is set. + * If it is, it schedules a task to refresh the DNS cache every 60 seconds, + * starting after an initial delay of 120 seconds. + */ + public void start() { + if (Config.enable_fqdn_mode) { + executor.scheduleAtFixedRate(this::refresh, 120, 60, java.util.concurrent.TimeUnit.SECONDS); + } + } + + /** + * The get method retrieves the IP address for a given hostname from the cache. + * If the hostname is not in the cache, it resolves the hostname to an IP address and stores it in the cache. + * + * @param hostname The hostname for which to get the IP address. + * @return The IP address for the given hostname. + */ + public String get(String hostname) { + return cache.computeIfAbsent(hostname, this::resolveHostname); + } + + /** + * The resolveHostname method resolves a hostname to an IP address. + * If the hostname cannot be resolved, it returns an empty string. + * + * @param hostname The hostname to resolve. + * @return The IP address for the given hostname, or an empty string if the hostname cannot be resolved. + */ + private String resolveHostname(String hostname) { + try { + return NetUtils.getIpByHost(hostname, 0); + } catch (UnknownHostException e) { + return ""; + } + } + + /** + * The refresh method periodically refreshes the DNS cache. + * It iterates over each hostname in the cache, resolves the hostname to an IP address, + * and compares it with the current IP address in the cache. + * If they are different, it updates the cache with the new IP address and logs the change. + */ + private void refresh() { + for (String hostname : cache.keySet()) { + String resolvedHostname = resolveHostname(hostname); + String currentHostname = cache.get(hostname); + if (!resolvedHostname.equals(currentHostname)) { + cache.put(hostname, resolvedHostname); + LOG.info("IP for hostname {} has changed from {} to {}", hostname, currentHostname, + resolvedHostname); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index 334dd11564d35f..f9ca99562ef5d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -93,9 +93,19 @@ public static String getHostnameByIp(String ip) { return hostName; } - public static String getIpByHost(String host) throws UnknownHostException { - InetAddress inetAddress = InetAddress.getByName(host); - return inetAddress.getHostAddress(); + public static String getIpByHost(String host, int retryTimes) throws UnknownHostException { + InetAddress inetAddress; + while (true) { + try { + inetAddress = InetAddress.getByName(host); + return inetAddress.getHostAddress(); + } catch (UnknownHostException e) { + LOG.warn("Get IP by host failed, hostname: {}, remaining retryTimes: {}", host, retryTimes, e); + if (retryTimes-- <= 0) { + throw e; + } + } + } } // This is the implementation is inspired by Apache camel project: diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 72afa75ffcc63d..a0ccf7ad493f60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -17,9 +17,9 @@ package org.apache.doris.rpc; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; @@ -107,7 +107,7 @@ public void removeProxy(TNetworkAddress address) { } private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException { - String realIp = NetUtils.getIpByHost(address.getHostname()); + String realIp = Env.getCurrentEnv().getDnsCache().get(address.hostname); BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp) && serviceClientExtIp.client.isNormalState()) {